2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 as published
7 * by the Free Software Foundation.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
24 #include "element-glue.h"
25 #include "directtcp.h"
27 #include "sockaddr-util.h"
33 typedef struct XferElementGlue_ {
34 XferElement __parent__;
36 /* instructions to push_buffer_impl */
39 PUSH_TO_FD, /* write to write_fd */
42 PUSH_ACCEPT_FIRST = (1 << 16),
43 PUSH_CONNECT_FIRST = (2 << 16),
46 /* instructions to pull_buffer_impl */
48 PULL_FROM_RING_BUFFER,
49 PULL_FROM_FD, /* read from read_fd */
52 PULL_ACCEPT_FIRST = (1 << 16),
53 PULL_CONNECT_FIRST = (2 << 16),
61 /* the stuff we might use, depending on what flavor of glue we're
64 int input_listen_socket, output_listen_socket;
65 int input_data_socket, output_data_socket;
66 int read_fd, write_fd;
68 /* a ring buffer of ptr/size pairs with semaphores */
69 struct { gpointer buf; size_t size; } *ring;
70 semaphore_t *ring_used_sem, *ring_free_sem;
71 gint ring_head, ring_tail;
74 GThreadFunc threadfunc;
81 typedef struct XferElementGlueClass_ {
82 XferElementClass __parent__;
83 } XferElementGlueClass;
85 static GObjectClass *parent_class = NULL;
88 * Utility functions, etc.
93 XferElementGlue *self)
95 if (pipe(self->pipe) < 0)
96 g_critical(_("Could not create pipe: %s"), strerror(errno));
101 XferElementGlue *self)
103 xfer_queue_message(XFER_ELEMENT(self)->xfer,
104 xmsg_new((XferElement *)self, XMSG_DONE, 0));
111 DirectTCPAddr **addrsp)
115 DirectTCPAddr *addrs;
118 sock = *sockp = socket(AF_INET, SOCK_STREAM, 0);
120 xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
124 if (listen(sock, 1) < 0) {
125 xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
129 /* TODO: which addresses should this display? all ifaces? localhost? */
131 if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
132 error("getsockname(): %s", strerror(errno));
133 g_assert(SU_GET_FAMILY(&addr) == AF_INET);
135 addrs = g_new0(DirectTCPAddr, 2);
136 addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); /* TODO: be smarter! */
137 addrs[0].port = SU_GET_PORT(&addr);
147 return !XFER_ELEMENT(data)->cancelled;
152 XferElementGlue *self,
156 g_assert(*socketp != -1);
158 if ((sock = interruptible_accept(*socketp, NULL, NULL,
159 prolong_accept, self)) == -1) {
160 /* if the accept was interrupted due to a cancellation, then do not
161 * add a further error message */
162 if (errno == 0 && XFER_ELEMENT(self)->cancelled)
165 xfer_cancel_with_error(XFER_ELEMENT(self),
166 _("Error accepting incoming connection: %s"), strerror(errno));
167 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
171 /* close the listening socket now, for good measure */
179 do_directtcp_connect(
180 XferElementGlue *self,
181 DirectTCPAddr *addrs)
183 XferElement *elt = XFER_ELEMENT(self);
188 g_debug("element-glue got no directtcp addresses to connect to!");
189 if (!elt->cancelled) {
190 xfer_cancel_with_error(elt,
191 "%s got no directtcp addresses to connect to",
192 xfer_element_repr(elt));
197 /* set up the sockaddr -- IPv4 only */
198 SU_INIT(&addr, AF_INET);
199 SU_SET_PORT(&addr, addrs->port);
200 ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
202 g_debug("making data connection to %s", str_sockaddr(&addr));
203 sock = socket(AF_INET, SOCK_STREAM, 0);
205 xfer_cancel_with_error(elt,
206 "socket(): %s", strerror(errno));
209 if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
210 xfer_cancel_with_error(elt,
211 "connect(): %s", strerror(errno));
215 g_debug("connected to %s", str_sockaddr(&addr));
220 wait_until_xfer_cancelled(elt->xfer);
224 #define GLUE_BUFFER_SIZE 32768
225 #define GLUE_RING_BUFFER_SIZE 32
227 #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
233 /* if self->read_fdp or self->write_fdp are pointing to this integer, then they
234 * should be redirected to point to the upstream's output_fd or downstream's
235 * input_fd, respectively, at the first call to get_read_fd or get_write_fd,
237 static int neighboring_element_fd = -1;
239 #define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd)
241 _get_read_fd(XferElementGlue *self)
244 return -1; /* shouldn't happen.. */
246 if (self->read_fdp == &neighboring_element_fd) {
247 XferElement *elt = XFER_ELEMENT(self);
248 self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1);
250 self->read_fd = *self->read_fdp;
251 *self->read_fdp = -1;
253 self->read_fdp = NULL;
254 return self->read_fd;
257 #define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd)
259 _get_write_fd(XferElementGlue *self)
261 if (!self->write_fdp)
262 return -1; /* shouldn't happen.. */
264 if (self->write_fdp == &neighboring_element_fd) {
265 XferElement *elt = XFER_ELEMENT(self);
266 self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1);
268 self->write_fd = *self->write_fdp;
269 *self->write_fdp = -1;
271 self->write_fdp = NULL;
272 return self->write_fd;
276 close_read_fd(XferElementGlue *self)
278 int fd = get_read_fd(self);
284 close_write_fd(XferElementGlue *self)
286 int fd = get_write_fd(self);
292 * Worker thread utility functions
296 pull_and_write(XferElementGlue *self)
298 XferElement *elt = XFER_ELEMENT(self);
299 int fd = get_write_fd(self);
300 self->write_fdp = NULL;
302 while (!elt->cancelled) {
306 /* get a buffer from upstream */
307 buf = xfer_element_pull_buffer(elt->upstream, &len);
312 if (full_write(fd, buf, len) < len) {
313 if (!elt->cancelled) {
314 xfer_cancel_with_error(elt,
315 _("Error writing to fd %d: %s"), fd, strerror(errno));
316 wait_until_xfer_cancelled(elt->xfer);
325 if (elt->cancelled && elt->expect_eof)
326 xfer_element_drain_by_pulling(elt->upstream);
328 /* close the fd we've been writing, as an EOF signal to downstream, and
329 * set it to -1 to avoid accidental re-use */
330 close_write_fd(self);
334 read_and_write(XferElementGlue *self)
336 XferElement *elt = XFER_ELEMENT(self);
337 /* dynamically allocate a buffer, in case this thread has
338 * a limited amount of stack allocated */
339 char *buf = g_malloc(GLUE_BUFFER_SIZE);
340 int rfd = get_read_fd(self);
341 int wfd = get_write_fd(self);
343 while (!elt->cancelled) {
346 /* read from upstream */
347 len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
348 if (len < GLUE_BUFFER_SIZE) {
350 if (!elt->cancelled) {
351 xfer_cancel_with_error(elt,
352 _("Error reading from fd %d: %s"), rfd, strerror(errno));
353 wait_until_xfer_cancelled(elt->xfer);
356 } else if (len == 0) { /* we only count a zero-length read as EOF */
361 /* write the buffer fully */
362 if (full_write(wfd, buf, len) < len) {
363 if (!elt->cancelled) {
364 xfer_cancel_with_error(elt,
365 _("Could not write to fd %d: %s"), wfd, strerror(errno));
366 wait_until_xfer_cancelled(elt->xfer);
372 if (elt->cancelled && elt->expect_eof)
373 xfer_element_drain_by_reading(rfd);
375 /* close the read fd. If it's not at EOF, then upstream will get EPIPE, which will hopefully
376 * kill it and complete the cancellation */
379 /* close the fd we've been writing, as an EOF signal to downstream */
380 close_write_fd(self);
387 XferElementGlue *self)
389 XferElement *elt = XFER_ELEMENT(self);
390 int fd = get_read_fd(self);
392 while (!elt->cancelled) {
393 char *buf = g_malloc(GLUE_BUFFER_SIZE);
396 /* read a buffer from upstream */
397 len = full_read(fd, buf, GLUE_BUFFER_SIZE);
398 if (len < GLUE_BUFFER_SIZE) {
400 if (!elt->cancelled) {
401 int saved_errno = errno;
402 xfer_cancel_with_error(elt,
403 _("Error reading from fd %d: %s"), fd, strerror(saved_errno));
404 g_debug("element-glue: error reading from fd %d: %s",
405 fd, strerror(saved_errno));
406 wait_until_xfer_cancelled(elt->xfer);
409 } else if (len == 0) { /* we only count a zero-length read as EOF */
415 xfer_element_push_buffer(elt->downstream, buf, len);
418 if (elt->cancelled && elt->expect_eof)
419 xfer_element_drain_by_reading(fd);
421 /* send an EOF indication downstream */
422 xfer_element_push_buffer(elt->downstream, NULL, 0);
424 /* close the read fd, since it's at EOF */
429 pull_and_push(XferElementGlue *self)
431 XferElement *elt = XFER_ELEMENT(self);
432 gboolean eof_sent = FALSE;
434 while (!elt->cancelled) {
438 /* get a buffer from upstream */
439 buf = xfer_element_pull_buffer(elt->upstream, &len);
441 /* and push it downstream */
442 xfer_element_push_buffer(elt->downstream, buf, len);
450 if (elt->cancelled && elt->expect_eof)
451 xfer_element_drain_by_pulling(elt->upstream);
454 xfer_element_push_buffer(elt->downstream, NULL, 0);
461 XferElement *elt = XFER_ELEMENT(data);
462 XferElementGlue *self = XFER_ELEMENT_GLUE(data);
464 switch (mech_pair(elt->input_mech, elt->output_mech)) {
465 case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
466 read_and_write(self);
469 case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
470 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
474 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
475 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
476 pull_and_write(self);
479 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
483 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
484 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
485 if ((self->output_data_socket = do_directtcp_connect(self,
486 elt->downstream->input_listen_addrs)) == -1)
488 self->write_fdp = &self->output_data_socket;
489 read_and_write(self);
492 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
493 if ((self->output_data_socket = do_directtcp_connect(self,
494 elt->downstream->input_listen_addrs)) == -1)
496 self->write_fdp = &self->output_data_socket;
497 pull_and_write(self);
500 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
501 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
502 if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
504 self->read_fdp = &self->input_data_socket;
505 read_and_write(self);
508 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
509 if ((self->input_data_socket = do_directtcp_accept(self,
510 &self->input_listen_socket)) == -1)
512 self->read_fdp = &self->input_data_socket;
516 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
517 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
518 case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
519 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
520 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
521 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
522 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
523 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
524 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
525 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
527 g_assert_not_reached();
530 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
531 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
532 if ((self->output_data_socket = do_directtcp_accept(self,
533 &self->output_listen_socket)) == -1)
535 self->write_fdp = &self->output_data_socket;
536 read_and_write(self);
539 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
540 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
541 if ((self->input_data_socket = do_directtcp_connect(self,
542 elt->upstream->output_listen_addrs)) == -1)
544 self->read_fdp = &self->input_data_socket;
545 read_and_write(self);
548 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
549 if ((self->input_data_socket = do_directtcp_connect(self,
550 elt->upstream->output_listen_addrs)) == -1)
552 self->read_fdp = &self->input_data_socket;
556 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
557 if ((self->output_data_socket = do_directtcp_accept(self,
558 &self->output_listen_socket)) == -1)
560 self->write_fdp = &self->output_data_socket;
561 pull_and_write(self);
564 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
565 /* TODO: use async accept's here to avoid order dependency */
566 if ((self->output_data_socket = do_directtcp_accept(self,
567 &self->output_listen_socket)) == -1)
569 self->write_fdp = &self->output_data_socket;
570 if ((self->input_data_socket = do_directtcp_accept(self,
571 &self->input_listen_socket)) == -1)
573 self->read_fdp = &self->input_data_socket;
574 read_and_write(self);
577 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
578 /* TODO: use async connects and select() to avoid order dependency here */
579 if ((self->input_data_socket = do_directtcp_connect(self,
580 elt->upstream->output_listen_addrs)) == -1)
582 self->read_fdp = &self->input_data_socket;
583 if ((self->output_data_socket = do_directtcp_connect(self,
584 elt->downstream->input_listen_addrs)) == -1)
586 self->write_fdp = &self->output_data_socket;
587 read_and_write(self);
591 send_xfer_done(self);
604 XferElementGlue *self = (XferElementGlue *)elt;
605 gboolean need_ring = FALSE;
606 gboolean need_listen_input = FALSE;
607 gboolean need_listen_output = FALSE;
609 g_assert(elt->input_mech != XFER_MECH_NONE);
610 g_assert(elt->output_mech != XFER_MECH_NONE);
611 g_assert(elt->input_mech != elt->output_mech);
613 self->read_fdp = NULL;
614 self->write_fdp = NULL;
615 self->on_push = PUSH_INVALID;
616 self->on_pull = PULL_INVALID;
617 self->need_thread = FALSE;
619 switch (mech_pair(elt->input_mech, elt->output_mech)) {
620 case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
621 /* thread will read from one fd and write to the other */
622 self->read_fdp = &neighboring_element_fd;
623 self->write_fdp = &neighboring_element_fd;
624 self->need_thread = TRUE;
627 case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
628 /* thread will read from one fd and call push_buffer downstream */
629 self->read_fdp = &neighboring_element_fd;
630 self->need_thread = TRUE;
633 case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
634 self->read_fdp = &neighboring_element_fd;
635 self->on_pull = PULL_FROM_FD;
638 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
639 /* thread will connect for output, then read from fd and write to the
641 self->read_fdp = &neighboring_element_fd;
642 self->need_thread = TRUE;
645 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
646 /* thread will accept output conn, then read from upstream and write to socket */
647 self->read_fdp = &neighboring_element_fd;
648 self->need_thread = TRUE;
649 need_listen_output = TRUE;
652 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
654 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
655 self->pipe[1] = -1; /* upstream will close this for us */
656 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
657 self->pipe[0] = -1; /* downstream will close this for us */
660 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
661 /* thread will read from pipe and call downstream's push_buffer */
663 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
664 self->pipe[1] = -1; /* upstream will close this for us */
665 self->read_fdp = &self->pipe[0];
666 self->need_thread = TRUE;
669 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
671 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
672 self->pipe[1] = -1; /* upstream will close this for us */
673 self->on_pull = PULL_FROM_FD;
674 self->read_fdp = &self->pipe[0];
677 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
678 /* thread will connect for output, then read from pipe and write to socket */
680 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
681 self->pipe[1] = -1; /* upstream will close this for us */
682 self->read_fdp = &self->pipe[0];
683 self->need_thread = TRUE;
686 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
687 /* thread will accept output conn, then read from pipe and write to socket */
689 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
690 self->pipe[1] = -1; /* upstream will close this for us */
691 self->read_fdp = &self->pipe[0];
692 self->need_thread = TRUE;
693 need_listen_output = TRUE;
696 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
698 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
699 self->pipe[0] = -1; /* downstream will close this for us */
700 self->on_push = PUSH_TO_FD;
701 self->write_fdp = &self->pipe[1];
704 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
705 self->on_push = PUSH_TO_FD;
706 self->write_fdp = &neighboring_element_fd;
709 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
710 self->on_push = PUSH_TO_RING_BUFFER;
711 self->on_pull = PULL_FROM_RING_BUFFER;
715 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
716 /* push will connect for output first */
717 self->on_push = PUSH_TO_FD | PUSH_CONNECT_FIRST;
720 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
721 /* push will accept for output first */
722 self->on_push = PUSH_TO_FD | PUSH_ACCEPT_FIRST;
723 need_listen_output = TRUE;
726 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
727 /* thread will pull from upstream and write to pipe */
729 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
730 self->pipe[0] = -1; /* downstream will close this for us */
731 self->write_fdp = &self->pipe[1];
732 self->need_thread = TRUE;
735 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
736 /* thread will pull from upstream and write to downstream */
737 self->write_fdp = &neighboring_element_fd;
738 self->need_thread = TRUE;
741 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
742 /* thread will pull from upstream and push to downstream */
743 self->need_thread = TRUE;
746 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
747 /* thread will connect for output, then pull from upstream and write to socket */
748 self->need_thread = TRUE;
751 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
752 /* thread will accept for output, then pull from upstream and write to socket */
753 self->need_thread = TRUE;
754 need_listen_output = TRUE;
757 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
758 /* thread will accept for input, then read from socket and write to pipe */
760 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
761 self->pipe[0] = -1; /* downstream will close this for us */
762 self->write_fdp = &self->pipe[1];
763 self->need_thread = TRUE;
764 need_listen_input = TRUE;
767 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
768 /* thread will accept for input, then read from socket and write to downstream */
769 self->write_fdp = &neighboring_element_fd;
770 self->need_thread = TRUE;
771 need_listen_input = TRUE;
774 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
775 /* thread will accept for input, then read from socket and push downstream */
776 self->need_thread = TRUE;
777 need_listen_input = TRUE;
780 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
781 /* first pull will accept for input, then read from socket */
782 self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
783 need_listen_input = TRUE;
786 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
787 /* thread will accept on both sides, then copy from socket to socket */
788 self->need_thread = TRUE;
789 need_listen_input = TRUE;
790 need_listen_output = TRUE;
793 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
794 /* thread will connect for input, then read from socket and write to pipe */
796 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
797 self->pipe[0] = -1; /* downstream will close this for us */
798 self->write_fdp = &self->pipe[1];
799 self->need_thread = TRUE;
802 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
803 /* thread will connect for input, then read from socket and write to downstream */
804 self->write_fdp = &neighboring_element_fd;
805 self->need_thread = TRUE;
808 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
809 /* thread will connect for input, then read from socket and push downstream */
810 self->need_thread = TRUE;
813 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
814 /* first pull will connect for input, then read from socket */
815 self->on_pull = PULL_FROM_FD | PULL_CONNECT_FIRST;
818 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
819 /* thread will connect on both sides, then copy from socket to socket */
820 self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
821 self->need_thread = TRUE;
825 g_assert_not_reached();
829 /* set up ring if desired */
831 self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
832 self->ring_used_sem = semaphore_new_with_value(0);
833 self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
836 if (need_listen_input) {
837 if (!do_directtcp_listen(elt,
838 &self->input_listen_socket, &elt->input_listen_addrs))
841 if (need_listen_output) {
842 if (!do_directtcp_listen(elt,
843 &self->output_listen_socket, &elt->output_listen_addrs))
854 XferElementGlue *self = (XferElementGlue *)elt;
856 if (self->need_thread)
857 self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL);
859 /* we're active if we have a thread that will eventually die */
860 return self->need_thread;
868 XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
870 /* accept first, if required */
871 if (self->on_pull & PULL_ACCEPT_FIRST) {
872 /* don't accept the next time around */
873 self->on_pull &= ~PULL_ACCEPT_FIRST;
875 if (elt->cancelled) {
880 if ((self->input_data_socket = do_directtcp_accept(self,
881 &self->input_listen_socket)) == -1) {
882 /* do_directtcp_accept already signalled an error; xfer
888 /* read from this new socket */
889 self->read_fdp = &self->input_data_socket;
892 /* or connect first, if required */
893 if (self->on_pull & PULL_CONNECT_FIRST) {
894 /* don't connect the next time around */
895 self->on_pull &= ~PULL_CONNECT_FIRST;
897 if (elt->cancelled) {
902 if ((self->input_data_socket = do_directtcp_connect(self,
903 elt->upstream->output_listen_addrs)) == -1) {
904 /* do_directtcp_connect already signalled an error; xfer
910 /* read from this new socket */
911 self->read_fdp = &self->input_data_socket;
914 switch (self->on_pull) {
915 case PULL_FROM_RING_BUFFER: {
918 if (elt->cancelled) {
919 /* the finalize method will empty the ring buffer */
924 /* make sure there's at least one element available */
925 semaphore_down(self->ring_used_sem);
928 buf = self->ring[self->ring_tail].buf;
929 *size = self->ring[self->ring_tail].size;
930 self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
932 /* and mark this element as free to be overwritten */
933 semaphore_up(self->ring_free_sem);
939 int fd = get_read_fd(self);
940 char *buf = g_malloc(GLUE_BUFFER_SIZE);
943 /* if the fd is already closed, it's possible upstream bailed out
944 * so quickly that we didn't even get a look at the fd */
945 if (elt->cancelled || fd == -1) {
948 xfer_element_drain_by_reading(fd);
957 /* read from upstream */
958 len = full_read(fd, buf, GLUE_BUFFER_SIZE);
959 if (len < GLUE_BUFFER_SIZE) {
961 if (!elt->cancelled) {
962 xfer_cancel_with_error(elt,
963 _("Error reading from fd %d: %s"), fd, strerror(errno));
964 wait_until_xfer_cancelled(elt->xfer);
971 /* and finish off the upstream */
972 if (elt->expect_eof) {
973 xfer_element_drain_by_reading(fd);
976 } else if (len == 0) {
982 /* signal EOF to downstream */
994 g_assert_not_reached();
1005 XferElementGlue *self = (XferElementGlue *)elt;
1007 /* accept first, if required */
1008 if (self->on_push & PUSH_ACCEPT_FIRST) {
1009 /* don't accept the next time around */
1010 self->on_push &= ~PUSH_ACCEPT_FIRST;
1012 if (elt->cancelled) {
1016 if ((self->output_data_socket = do_directtcp_accept(self,
1017 &self->output_listen_socket)) == -1) {
1018 /* do_directtcp_accept already signalled an error; xfer
1023 /* write to this new socket */
1024 self->write_fdp = &self->output_data_socket;
1027 /* or connect first, if required */
1028 if (self->on_push & PUSH_CONNECT_FIRST) {
1029 /* don't accept the next time around */
1030 self->on_push &= ~PUSH_CONNECT_FIRST;
1032 if (elt->cancelled) {
1036 if ((self->output_data_socket = do_directtcp_connect(self,
1037 elt->downstream->input_listen_addrs)) == -1) {
1038 /* do_directtcp_connect already signalled an error; xfer
1043 /* read from this new socket */
1044 self->write_fdp = &self->output_data_socket;
1047 switch (self->on_push) {
1048 case PUSH_TO_RING_BUFFER:
1049 /* just drop packets if the transfer has been cancelled */
1050 if (elt->cancelled) {
1055 /* make sure there's at least one element free */
1056 semaphore_down(self->ring_free_sem);
1059 self->ring[self->ring_head].buf = buf;
1060 self->ring[self->ring_head].size = len;
1061 self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
1063 /* and mark this element as available for reading */
1064 semaphore_up(self->ring_used_sem);
1069 int fd = get_write_fd(self);
1071 /* if the fd is already closed, it's possible upstream bailed out
1072 * so quickly that we didn't even get a look at the fd. In this
1073 * case we can assume the xfer has been cancelled and just discard
1078 if (elt->cancelled) {
1079 if (!elt->expect_eof || !buf) {
1080 close_write_fd(self);
1082 /* hack to ensure we won't close the fd again, if we get another push */
1083 elt->expect_eof = TRUE;
1091 /* write the full buffer to the fd, or close on EOF */
1093 if (full_write(fd, buf, len) < len) {
1094 if (!elt->cancelled) {
1095 xfer_cancel_with_error(elt,
1096 _("Error writing to fd %d: %s"), fd, strerror(errno));
1097 wait_until_xfer_cancelled(elt->xfer);
1099 /* nothing special to do to handle a cancellation */
1103 close_write_fd(self);
1111 g_assert_not_reached();
1118 XferElementGlue *self)
1120 XferElement *elt = (XferElement *)self;
1121 elt->can_generate_eof = TRUE;
1122 self->pipe[0] = self->pipe[1] = -1;
1123 self->input_listen_socket = -1;
1124 self->output_listen_socket = -1;
1125 self->input_data_socket = -1;
1126 self->output_data_socket = -1;
1128 self->write_fd = -1;
1135 XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
1137 /* first make sure the worker thread has finished up */
1139 g_thread_join(self->thread);
1141 /* close our pipes and fd's if they're still open */
1142 if (self->pipe[0] != -1) close(self->pipe[0]);
1143 if (self->pipe[1] != -1) close(self->pipe[1]);
1144 if (self->input_listen_socket != -1) close(self->input_listen_socket);
1145 if (self->output_listen_socket != -1) close(self->output_listen_socket);
1146 if (self->input_data_socket != -1) close(self->input_data_socket);
1147 if (self->output_data_socket != -1) close(self->output_data_socket);
1148 if (self->read_fd != -1) close(self->read_fd);
1149 if (self->write_fd != -1) close(self->write_fd);
1152 /* empty the ring buffer, ignoring syncronization issues */
1153 while (self->ring_used_sem->value) {
1154 if (self->ring[self->ring_tail].buf)
1155 amfree(self->ring[self->ring_tail].buf);
1156 self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
1160 semaphore_free(self->ring_used_sem);
1161 semaphore_free(self->ring_free_sem);
1165 G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1168 static xfer_element_mech_pair_t _pairs[] = {
1169 { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
1170 { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
1171 { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
1172 { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
1173 { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
1175 { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
1176 { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
1177 { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
1178 { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* pipe + splice or copy*/
1179 { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy + pipe */
1181 { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
1182 { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
1183 { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
1184 { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 0 }, /* write on demand */
1185 { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 0 }, /* write on demand */
1187 { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
1188 { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
1189 { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
1190 { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 1 }, /* call and write */
1191 { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 1 }, /* call and write */
1193 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
1194 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
1195 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
1196 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
1197 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
1199 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
1200 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy + pipe */
1201 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
1202 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
1203 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
1206 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1208 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
1212 XferElementGlueClass * selfc)
1214 XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1215 GObjectClass *goc = G_OBJECT_CLASS(selfc);
1217 klass->setup = setup_impl;
1218 klass->start = start_impl;
1219 klass->push_buffer = push_buffer_impl;
1220 klass->pull_buffer = pull_buffer_impl;
1222 klass->perl_class = "Amanda::Xfer::Element::Glue";
1223 klass->mech_pairs = xfer_element_glue_mech_pairs;
1225 goc->finalize = finalize_impl;
1227 parent_class = g_type_class_peek_parent(selfc);
1231 xfer_element_glue_get_type (void)
1233 static GType type = 0;
1235 if G_UNLIKELY(type == 0) {
1236 static const GTypeInfo info = {
1237 sizeof (XferElementGlueClass),
1238 (GBaseInitFunc) NULL,
1239 (GBaseFinalizeFunc) NULL,
1240 (GClassInitFunc) class_init,
1241 (GClassFinalizeFunc) NULL,
1242 NULL /* class_data */,
1243 sizeof (XferElementGlue),
1244 0 /* n_preallocs */,
1245 (GInstanceInitFunc) instance_init,
1249 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
1255 /* create an element of this class; prototype is in xfer-element.h */
1257 xfer_element_glue(void)
1259 XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
1260 XferElement *elt = XFER_ELEMENT(self);