2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 as published
7 * by the Free Software Foundation.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
24 #include "element-glue.h"
25 #include "directtcp.h"
27 #include "sockaddr-util.h"
34 typedef struct XferElementGlue_ {
35 XferElement __parent__;
37 /* instructions to push_buffer_impl */
40 PUSH_TO_FD, /* write to write_fd */
43 PUSH_ACCEPT_FIRST = (1 << 16),
44 PUSH_CONNECT_FIRST = (2 << 16),
47 /* instructions to pull_buffer_impl */
49 PULL_FROM_RING_BUFFER,
50 PULL_FROM_FD, /* read from read_fd */
53 PULL_ACCEPT_FIRST = (1 << 16),
54 PULL_CONNECT_FIRST = (2 << 16),
62 /* the stuff we might use, depending on what flavor of glue we're
65 int input_listen_socket, output_listen_socket;
66 int input_data_socket, output_data_socket;
67 int read_fd, write_fd;
69 /* a ring buffer of ptr/size pairs with semaphores */
70 struct { gpointer buf; size_t size; } *ring;
71 amsemaphore_t *ring_used_sem, *ring_free_sem;
72 gint ring_head, ring_tail;
75 GThreadFunc threadfunc;
82 typedef struct XferElementGlueClass_ {
83 XferElementClass __parent__;
84 } XferElementGlueClass;
86 static GObjectClass *parent_class = NULL;
89 * Utility functions, etc.
94 XferElementGlue *self)
96 if (pipe(self->pipe) < 0)
97 g_critical(_("Could not create pipe: %s"), strerror(errno));
102 XferElementGlue *self)
104 xfer_queue_message(XFER_ELEMENT(self)->xfer,
105 xmsg_new((XferElement *)self, XMSG_DONE, 0));
112 DirectTCPAddr **addrsp)
115 sockaddr_union data_addr;
116 DirectTCPAddr *addrs;
118 struct addrinfo *res;
119 struct addrinfo *res_addr;
120 sockaddr_union *addr = NULL;
122 if (resolve_hostname("localhost", 0, &res, NULL) != 0) {
123 xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno));
126 for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
127 if (res_addr->ai_family == AF_INET) {
128 addr = (sockaddr_union *)res_addr->ai_addr;
133 addr = (sockaddr_union *)res->ai_addr;
136 sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0);
138 xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
143 if (bind(sock, (struct sockaddr *)addr, len) != 0) {
144 xfer_cancel_with_error(elt, "bind(): %s", strerror(errno));
149 if (listen(sock, 1) < 0) {
150 xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
154 /* TODO: which addresses should this display? all ifaces? localhost? */
155 len = sizeof(data_addr);
156 if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0)
157 error("getsockname(): %s", strerror(errno));
159 addrs = g_new0(DirectTCPAddr, 2);
160 copy_sockaddr(&addrs[0], &data_addr);
170 return !XFER_ELEMENT(data)->cancelled;
175 XferElementGlue *self,
179 g_assert(*socketp != -1);
181 if ((sock = interruptible_accept(*socketp, NULL, NULL,
182 prolong_accept, self)) == -1) {
183 /* if the accept was interrupted due to a cancellation, then do not
184 * add a further error message */
185 if (errno == 0 && XFER_ELEMENT(self)->cancelled)
188 xfer_cancel_with_error(XFER_ELEMENT(self),
189 _("Error accepting incoming connection: %s"), strerror(errno));
190 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
194 /* close the listening socket now, for good measure */
202 do_directtcp_connect(
203 XferElementGlue *self,
204 DirectTCPAddr *addrs)
206 XferElement *elt = XFER_ELEMENT(self);
211 g_debug("element-glue got no directtcp addresses to connect to!");
212 if (!elt->cancelled) {
213 xfer_cancel_with_error(elt,
214 "%s got no directtcp addresses to connect to",
215 xfer_element_repr(elt));
220 /* set up the sockaddr -- IPv4 only */
221 copy_sockaddr(&addr, addrs);
223 g_debug("making data connection to %s", str_sockaddr(&addr));
224 sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
226 xfer_cancel_with_error(elt,
227 "socket(): %s", strerror(errno));
230 if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
231 xfer_cancel_with_error(elt,
232 "connect(): %s", strerror(errno));
236 g_debug("connected to %s", str_sockaddr(&addr));
241 wait_until_xfer_cancelled(elt->xfer);
245 #define GLUE_BUFFER_SIZE 32768
246 #define GLUE_RING_BUFFER_SIZE 32
248 #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
254 /* if self->read_fdp or self->write_fdp are pointing to this integer, then they
255 * should be redirected to point to the upstream's output_fd or downstream's
256 * input_fd, respectively, at the first call to get_read_fd or get_write_fd,
258 static int neighboring_element_fd = -1;
260 #define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd)
262 _get_read_fd(XferElementGlue *self)
265 return -1; /* shouldn't happen.. */
267 if (self->read_fdp == &neighboring_element_fd) {
268 XferElement *elt = XFER_ELEMENT(self);
269 self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1);
271 self->read_fd = *self->read_fdp;
272 *self->read_fdp = -1;
274 self->read_fdp = NULL;
275 return self->read_fd;
278 #define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd)
280 _get_write_fd(XferElementGlue *self)
282 if (!self->write_fdp)
283 return -1; /* shouldn't happen.. */
285 if (self->write_fdp == &neighboring_element_fd) {
286 XferElement *elt = XFER_ELEMENT(self);
287 self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1);
289 self->write_fd = *self->write_fdp;
290 *self->write_fdp = -1;
292 self->write_fdp = NULL;
293 return self->write_fd;
297 close_read_fd(XferElementGlue *self)
299 int fd = get_read_fd(self);
305 close_write_fd(XferElementGlue *self)
307 int fd = get_write_fd(self);
313 * Worker thread utility functions
317 pull_and_write(XferElementGlue *self)
319 XferElement *elt = XFER_ELEMENT(self);
320 int fd = get_write_fd(self);
321 self->write_fdp = NULL;
323 while (!elt->cancelled) {
327 /* get a buffer from upstream */
328 buf = xfer_element_pull_buffer(elt->upstream, &len);
333 if (full_write(fd, buf, len) < len) {
334 if (!elt->cancelled) {
335 xfer_cancel_with_error(elt,
336 _("Error writing to fd %d: %s"), fd, strerror(errno));
337 wait_until_xfer_cancelled(elt->xfer);
346 if (elt->cancelled && elt->expect_eof)
347 xfer_element_drain_buffers(elt->upstream);
349 /* close the fd we've been writing, as an EOF signal to downstream, and
350 * set it to -1 to avoid accidental re-use */
351 close_write_fd(self);
355 read_and_write(XferElementGlue *self)
357 XferElement *elt = XFER_ELEMENT(self);
358 /* dynamically allocate a buffer, in case this thread has
359 * a limited amount of stack allocated */
360 char *buf = g_malloc(GLUE_BUFFER_SIZE);
361 int rfd = get_read_fd(self);
362 int wfd = get_write_fd(self);
364 while (!elt->cancelled) {
367 /* read from upstream */
368 len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
369 if (len < GLUE_BUFFER_SIZE) {
371 if (!elt->cancelled) {
372 xfer_cancel_with_error(elt,
373 _("Error reading from fd %d: %s"), rfd, strerror(errno));
374 wait_until_xfer_cancelled(elt->xfer);
377 } else if (len == 0) { /* we only count a zero-length read as EOF */
382 /* write the buffer fully */
383 if (full_write(wfd, buf, len) < len) {
384 if (!elt->cancelled) {
385 xfer_cancel_with_error(elt,
386 _("Could not write to fd %d: %s"), wfd, strerror(errno));
387 wait_until_xfer_cancelled(elt->xfer);
393 if (elt->cancelled && elt->expect_eof)
394 xfer_element_drain_fd(rfd);
396 /* close the read fd. If it's not at EOF, then upstream will get EPIPE, which will hopefully
397 * kill it and complete the cancellation */
400 /* close the fd we've been writing, as an EOF signal to downstream */
401 close_write_fd(self);
408 XferElementGlue *self)
410 XferElement *elt = XFER_ELEMENT(self);
411 int fd = get_read_fd(self);
413 while (!elt->cancelled) {
414 char *buf = g_malloc(GLUE_BUFFER_SIZE);
417 /* read a buffer from upstream */
418 len = full_read(fd, buf, GLUE_BUFFER_SIZE);
419 if (len < GLUE_BUFFER_SIZE) {
421 if (!elt->cancelled) {
422 int saved_errno = errno;
423 xfer_cancel_with_error(elt,
424 _("Error reading from fd %d: %s"), fd, strerror(saved_errno));
425 g_debug("element-glue: error reading from fd %d: %s",
426 fd, strerror(saved_errno));
427 wait_until_xfer_cancelled(elt->xfer);
431 } else if (len == 0) { /* we only count a zero-length read as EOF */
437 xfer_element_push_buffer(elt->downstream, buf, len);
440 if (elt->cancelled && elt->expect_eof)
441 xfer_element_drain_fd(fd);
443 /* send an EOF indication downstream */
444 xfer_element_push_buffer(elt->downstream, NULL, 0);
446 /* close the read fd, since it's at EOF */
451 pull_and_push(XferElementGlue *self)
453 XferElement *elt = XFER_ELEMENT(self);
454 gboolean eof_sent = FALSE;
456 while (!elt->cancelled) {
460 /* get a buffer from upstream */
461 buf = xfer_element_pull_buffer(elt->upstream, &len);
463 /* and push it downstream */
464 xfer_element_push_buffer(elt->downstream, buf, len);
472 if (elt->cancelled && elt->expect_eof)
473 xfer_element_drain_buffers(elt->upstream);
476 xfer_element_push_buffer(elt->downstream, NULL, 0);
483 XferElement *elt = XFER_ELEMENT(data);
484 XferElementGlue *self = XFER_ELEMENT_GLUE(data);
486 switch (mech_pair(elt->input_mech, elt->output_mech)) {
487 case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
488 read_and_write(self);
491 case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
492 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
496 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
497 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
498 pull_and_write(self);
501 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
505 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
506 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
507 if ((self->output_data_socket = do_directtcp_connect(self,
508 elt->downstream->input_listen_addrs)) == -1)
510 self->write_fdp = &self->output_data_socket;
511 read_and_write(self);
514 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
515 if ((self->output_data_socket = do_directtcp_connect(self,
516 elt->downstream->input_listen_addrs)) == -1)
518 self->write_fdp = &self->output_data_socket;
519 pull_and_write(self);
522 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
523 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
524 if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
526 self->read_fdp = &self->input_data_socket;
527 read_and_write(self);
530 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
531 if ((self->input_data_socket = do_directtcp_accept(self,
532 &self->input_listen_socket)) == -1)
534 self->read_fdp = &self->input_data_socket;
538 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
539 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
540 case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
541 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
542 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
543 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
544 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
545 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
546 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
547 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
549 g_assert_not_reached();
552 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
553 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
554 if ((self->output_data_socket = do_directtcp_accept(self,
555 &self->output_listen_socket)) == -1)
557 self->write_fdp = &self->output_data_socket;
558 read_and_write(self);
561 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
562 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
563 if ((self->input_data_socket = do_directtcp_connect(self,
564 elt->upstream->output_listen_addrs)) == -1)
566 self->read_fdp = &self->input_data_socket;
567 read_and_write(self);
570 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
571 if ((self->input_data_socket = do_directtcp_connect(self,
572 elt->upstream->output_listen_addrs)) == -1)
574 self->read_fdp = &self->input_data_socket;
578 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
579 if ((self->output_data_socket = do_directtcp_accept(self,
580 &self->output_listen_socket)) == -1)
582 self->write_fdp = &self->output_data_socket;
583 pull_and_write(self);
586 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
587 /* TODO: use async accept's here to avoid order dependency */
588 if ((self->output_data_socket = do_directtcp_accept(self,
589 &self->output_listen_socket)) == -1)
591 self->write_fdp = &self->output_data_socket;
592 if ((self->input_data_socket = do_directtcp_accept(self,
593 &self->input_listen_socket)) == -1)
595 self->read_fdp = &self->input_data_socket;
596 read_and_write(self);
599 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
600 /* TODO: use async connects and select() to avoid order dependency here */
601 if ((self->input_data_socket = do_directtcp_connect(self,
602 elt->upstream->output_listen_addrs)) == -1)
604 self->read_fdp = &self->input_data_socket;
605 if ((self->output_data_socket = do_directtcp_connect(self,
606 elt->downstream->input_listen_addrs)) == -1)
608 self->write_fdp = &self->output_data_socket;
609 read_and_write(self);
613 send_xfer_done(self);
626 XferElementGlue *self = (XferElementGlue *)elt;
627 gboolean need_ring = FALSE;
628 gboolean need_listen_input = FALSE;
629 gboolean need_listen_output = FALSE;
631 g_assert(elt->input_mech != XFER_MECH_NONE);
632 g_assert(elt->output_mech != XFER_MECH_NONE);
633 g_assert(elt->input_mech != elt->output_mech);
635 self->read_fdp = NULL;
636 self->write_fdp = NULL;
637 self->on_push = PUSH_INVALID;
638 self->on_pull = PULL_INVALID;
639 self->need_thread = FALSE;
641 switch (mech_pair(elt->input_mech, elt->output_mech)) {
642 case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
643 /* thread will read from one fd and write to the other */
644 self->read_fdp = &neighboring_element_fd;
645 self->write_fdp = &neighboring_element_fd;
646 self->need_thread = TRUE;
649 case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
650 /* thread will read from one fd and call push_buffer downstream */
651 self->read_fdp = &neighboring_element_fd;
652 self->need_thread = TRUE;
655 case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
656 self->read_fdp = &neighboring_element_fd;
657 self->on_pull = PULL_FROM_FD;
660 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
661 /* thread will connect for output, then read from fd and write to the
663 self->read_fdp = &neighboring_element_fd;
664 self->need_thread = TRUE;
667 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
668 /* thread will accept output conn, then read from upstream and write to socket */
669 self->read_fdp = &neighboring_element_fd;
670 self->need_thread = TRUE;
671 need_listen_output = TRUE;
674 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
676 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
677 self->pipe[1] = -1; /* upstream will close this for us */
678 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
679 self->pipe[0] = -1; /* downstream will close this for us */
682 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
683 /* thread will read from pipe and call downstream's push_buffer */
685 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
686 self->pipe[1] = -1; /* upstream will close this for us */
687 self->read_fdp = &self->pipe[0];
688 self->need_thread = TRUE;
691 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
693 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
694 self->pipe[1] = -1; /* upstream will close this for us */
695 self->on_pull = PULL_FROM_FD;
696 self->read_fdp = &self->pipe[0];
699 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
700 /* thread will connect for output, then read from pipe and write to socket */
702 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
703 self->pipe[1] = -1; /* upstream will close this for us */
704 self->read_fdp = &self->pipe[0];
705 self->need_thread = TRUE;
708 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
709 /* thread will accept output conn, then read from pipe and write to socket */
711 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
712 self->pipe[1] = -1; /* upstream will close this for us */
713 self->read_fdp = &self->pipe[0];
714 self->need_thread = TRUE;
715 need_listen_output = TRUE;
718 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
720 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
721 self->pipe[0] = -1; /* downstream will close this for us */
722 self->on_push = PUSH_TO_FD;
723 self->write_fdp = &self->pipe[1];
726 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
727 self->on_push = PUSH_TO_FD;
728 self->write_fdp = &neighboring_element_fd;
731 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
732 self->on_push = PUSH_TO_RING_BUFFER;
733 self->on_pull = PULL_FROM_RING_BUFFER;
737 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
738 /* push will connect for output first */
739 self->on_push = PUSH_TO_FD | PUSH_CONNECT_FIRST;
742 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
743 /* push will accept for output first */
744 self->on_push = PUSH_TO_FD | PUSH_ACCEPT_FIRST;
745 need_listen_output = TRUE;
748 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
749 /* thread will pull from upstream and write to pipe */
751 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
752 self->pipe[0] = -1; /* downstream will close this for us */
753 self->write_fdp = &self->pipe[1];
754 self->need_thread = TRUE;
757 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
758 /* thread will pull from upstream and write to downstream */
759 self->write_fdp = &neighboring_element_fd;
760 self->need_thread = TRUE;
763 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
764 /* thread will pull from upstream and push to downstream */
765 self->need_thread = TRUE;
768 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
769 /* thread will connect for output, then pull from upstream and write to socket */
770 self->need_thread = TRUE;
773 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
774 /* thread will accept for output, then pull from upstream and write to socket */
775 self->need_thread = TRUE;
776 need_listen_output = TRUE;
779 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
780 /* thread will accept for input, then read from socket and write to pipe */
782 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
783 self->pipe[0] = -1; /* downstream will close this for us */
784 self->write_fdp = &self->pipe[1];
785 self->need_thread = TRUE;
786 need_listen_input = TRUE;
789 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
790 /* thread will accept for input, then read from socket and write to downstream */
791 self->write_fdp = &neighboring_element_fd;
792 self->need_thread = TRUE;
793 need_listen_input = TRUE;
796 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
797 /* thread will accept for input, then read from socket and push downstream */
798 self->need_thread = TRUE;
799 need_listen_input = TRUE;
802 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
803 /* first pull will accept for input, then read from socket */
804 self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
805 need_listen_input = TRUE;
808 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
809 /* thread will accept on both sides, then copy from socket to socket */
810 self->need_thread = TRUE;
811 need_listen_input = TRUE;
812 need_listen_output = TRUE;
815 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
816 /* thread will connect for input, then read from socket and write to pipe */
818 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
819 self->pipe[0] = -1; /* downstream will close this for us */
820 self->write_fdp = &self->pipe[1];
821 self->need_thread = TRUE;
824 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
825 /* thread will connect for input, then read from socket and write to downstream */
826 self->write_fdp = &neighboring_element_fd;
827 self->need_thread = TRUE;
830 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
831 /* thread will connect for input, then read from socket and push downstream */
832 self->need_thread = TRUE;
835 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
836 /* first pull will connect for input, then read from socket */
837 self->on_pull = PULL_FROM_FD | PULL_CONNECT_FIRST;
840 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
841 /* thread will connect on both sides, then copy from socket to socket */
842 self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
843 self->need_thread = TRUE;
847 g_assert_not_reached();
851 /* set up ring if desired */
853 self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
854 self->ring_used_sem = amsemaphore_new_with_value(0);
855 self->ring_free_sem = amsemaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
858 if (need_listen_input) {
859 if (!do_directtcp_listen(elt,
860 &self->input_listen_socket, &elt->input_listen_addrs))
863 if (need_listen_output) {
864 if (!do_directtcp_listen(elt,
865 &self->output_listen_socket, &elt->output_listen_addrs))
876 XferElementGlue *self = (XferElementGlue *)elt;
878 if (self->need_thread)
879 self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL);
881 /* we're active if we have a thread that will eventually die */
882 return self->need_thread;
890 XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
892 /* accept first, if required */
893 if (self->on_pull & PULL_ACCEPT_FIRST) {
894 /* don't accept the next time around */
895 self->on_pull &= ~PULL_ACCEPT_FIRST;
897 if (elt->cancelled) {
902 if ((self->input_data_socket = do_directtcp_accept(self,
903 &self->input_listen_socket)) == -1) {
904 /* do_directtcp_accept already signalled an error; xfer
910 /* read from this new socket */
911 self->read_fdp = &self->input_data_socket;
914 /* or connect first, if required */
915 if (self->on_pull & PULL_CONNECT_FIRST) {
916 /* don't connect the next time around */
917 self->on_pull &= ~PULL_CONNECT_FIRST;
919 if (elt->cancelled) {
924 if ((self->input_data_socket = do_directtcp_connect(self,
925 elt->upstream->output_listen_addrs)) == -1) {
926 /* do_directtcp_connect already signalled an error; xfer
932 /* read from this new socket */
933 self->read_fdp = &self->input_data_socket;
936 switch (self->on_pull) {
937 case PULL_FROM_RING_BUFFER: {
940 if (elt->cancelled) {
941 /* the finalize method will empty the ring buffer */
946 /* make sure there's at least one element available */
947 amsemaphore_down(self->ring_used_sem);
950 buf = self->ring[self->ring_tail].buf;
951 *size = self->ring[self->ring_tail].size;
952 self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
954 /* and mark this element as free to be overwritten */
955 amsemaphore_up(self->ring_free_sem);
961 int fd = get_read_fd(self);
965 /* if the fd is already closed, it's possible upstream bailed out
966 * so quickly that we didn't even get a look at the fd */
967 if (elt->cancelled || fd == -1) {
970 xfer_element_drain_fd(fd);
979 buf = g_malloc(GLUE_BUFFER_SIZE);
981 /* read from upstream */
982 len = full_read(fd, buf, GLUE_BUFFER_SIZE);
983 if (len < GLUE_BUFFER_SIZE) {
985 if (!elt->cancelled) {
986 xfer_cancel_with_error(elt,
987 _("Error reading from fd %d: %s"), fd, strerror(errno));
988 wait_until_xfer_cancelled(elt->xfer);
995 /* and finish off the upstream */
996 if (elt->expect_eof) {
997 xfer_element_drain_fd(fd);
1000 } else if (len == 0) {
1006 /* signal EOF to downstream */
1007 close_read_fd(self);
1011 *size = (size_t)len;
1018 g_assert_not_reached();
1029 XferElementGlue *self = (XferElementGlue *)elt;
1031 /* accept first, if required */
1032 if (self->on_push & PUSH_ACCEPT_FIRST) {
1033 /* don't accept the next time around */
1034 self->on_push &= ~PUSH_ACCEPT_FIRST;
1036 if (elt->cancelled) {
1040 if ((self->output_data_socket = do_directtcp_accept(self,
1041 &self->output_listen_socket)) == -1) {
1042 /* do_directtcp_accept already signalled an error; xfer
1047 /* write to this new socket */
1048 self->write_fdp = &self->output_data_socket;
1051 /* or connect first, if required */
1052 if (self->on_push & PUSH_CONNECT_FIRST) {
1053 /* don't accept the next time around */
1054 self->on_push &= ~PUSH_CONNECT_FIRST;
1056 if (elt->cancelled) {
1060 if ((self->output_data_socket = do_directtcp_connect(self,
1061 elt->downstream->input_listen_addrs)) == -1) {
1062 /* do_directtcp_connect already signalled an error; xfer
1067 /* read from this new socket */
1068 self->write_fdp = &self->output_data_socket;
1071 switch (self->on_push) {
1072 case PUSH_TO_RING_BUFFER:
1073 /* just drop packets if the transfer has been cancelled */
1074 if (elt->cancelled) {
1079 /* make sure there's at least one element free */
1080 amsemaphore_down(self->ring_free_sem);
1083 self->ring[self->ring_head].buf = buf;
1084 self->ring[self->ring_head].size = len;
1085 self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
1087 /* and mark this element as available for reading */
1088 amsemaphore_up(self->ring_used_sem);
1093 int fd = get_write_fd(self);
1095 /* if the fd is already closed, it's possible upstream bailed out
1096 * so quickly that we didn't even get a look at the fd. In this
1097 * case we can assume the xfer has been cancelled and just discard
1102 if (elt->cancelled) {
1103 if (!elt->expect_eof || !buf) {
1104 close_write_fd(self);
1106 /* hack to ensure we won't close the fd again, if we get another push */
1107 elt->expect_eof = TRUE;
1115 /* write the full buffer to the fd, or close on EOF */
1117 if (full_write(fd, buf, len) < len) {
1118 if (!elt->cancelled) {
1119 xfer_cancel_with_error(elt,
1120 _("Error writing to fd %d: %s"), fd, strerror(errno));
1121 wait_until_xfer_cancelled(elt->xfer);
1123 /* nothing special to do to handle a cancellation */
1127 close_write_fd(self);
1135 g_assert_not_reached();
1142 XferElementGlue *self)
1144 XferElement *elt = (XferElement *)self;
1145 elt->can_generate_eof = TRUE;
1146 self->pipe[0] = self->pipe[1] = -1;
1147 self->input_listen_socket = -1;
1148 self->output_listen_socket = -1;
1149 self->input_data_socket = -1;
1150 self->output_data_socket = -1;
1152 self->write_fd = -1;
1159 XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
1161 /* first make sure the worker thread has finished up */
1163 g_thread_join(self->thread);
1165 /* close our pipes and fd's if they're still open */
1166 if (self->pipe[0] != -1) close(self->pipe[0]);
1167 if (self->pipe[1] != -1) close(self->pipe[1]);
1168 if (self->input_data_socket != -1) close(self->input_data_socket);
1169 if (self->output_data_socket != -1) close(self->output_data_socket);
1170 if (self->input_listen_socket != -1) close(self->input_listen_socket);
1171 if (self->output_listen_socket != -1) close(self->output_listen_socket);
1172 if (self->read_fd != -1) close(self->read_fd);
1173 if (self->write_fd != -1) close(self->write_fd);
1176 /* empty the ring buffer, ignoring syncronization issues */
1177 while (self->ring_used_sem->value) {
1178 if (self->ring[self->ring_tail].buf)
1179 amfree(self->ring[self->ring_tail].buf);
1180 self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
1184 amsemaphore_free(self->ring_used_sem);
1185 amsemaphore_free(self->ring_free_sem);
1189 G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1192 static xfer_element_mech_pair_t _pairs[] = {
1193 { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1194 { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1195 { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1196 { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1197 { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1199 { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */
1200 { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/
1201 { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */
1202 { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/
1203 { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1205 { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */
1206 { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1207 { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */
1208 { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1209 { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1211 { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */
1212 { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1213 { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */
1214 { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1215 { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1217 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1218 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1219 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1220 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1221 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1223 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1224 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1225 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1226 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1227 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1230 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1232 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
1236 XferElementGlueClass * selfc)
1238 XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1239 GObjectClass *goc = G_OBJECT_CLASS(selfc);
1241 klass->setup = setup_impl;
1242 klass->start = start_impl;
1243 klass->push_buffer = push_buffer_impl;
1244 klass->pull_buffer = pull_buffer_impl;
1246 klass->perl_class = "Amanda::Xfer::Element::Glue";
1247 klass->mech_pairs = xfer_element_glue_mech_pairs;
1249 goc->finalize = finalize_impl;
1251 parent_class = g_type_class_peek_parent(selfc);
1255 xfer_element_glue_get_type (void)
1257 static GType type = 0;
1259 if G_UNLIKELY(type == 0) {
1260 static const GTypeInfo info = {
1261 sizeof (XferElementGlueClass),
1262 (GBaseInitFunc) NULL,
1263 (GBaseFinalizeFunc) NULL,
1264 (GClassInitFunc) class_init,
1265 (GClassFinalizeFunc) NULL,
1266 NULL /* class_data */,
1267 sizeof (XferElementGlue),
1268 0 /* n_preallocs */,
1269 (GInstanceInitFunc) instance_init,
1273 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
1279 /* create an element of this class; prototype is in xfer-element.h */
1281 xfer_element_glue(void)
1283 XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
1284 XferElement *elt = XFER_ELEMENT(self);