2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
25 #include "element-glue.h"
26 #include "directtcp.h"
28 #include "sockaddr-util.h"
35 typedef struct XferElementGlue_ {
36 XferElement __parent__;
38 /* instructions to push_buffer_impl */
41 PUSH_TO_FD, /* write to write_fd */
44 PUSH_ACCEPT_FIRST = (1 << 16),
45 PUSH_CONNECT_FIRST = (2 << 16),
48 /* instructions to pull_buffer_impl */
50 PULL_FROM_RING_BUFFER,
51 PULL_FROM_FD, /* read from read_fd */
54 PULL_ACCEPT_FIRST = (1 << 16),
55 PULL_CONNECT_FIRST = (2 << 16),
63 /* the stuff we might use, depending on what flavor of glue we're
66 int input_listen_socket, output_listen_socket;
67 int input_data_socket, output_data_socket;
68 int read_fd, write_fd;
70 /* a ring buffer of ptr/size pairs with semaphores */
71 struct { gpointer buf; size_t size; } *ring;
72 amsemaphore_t *ring_used_sem, *ring_free_sem;
73 gint ring_head, ring_tail;
76 GThreadFunc threadfunc;
83 typedef struct XferElementGlueClass_ {
84 XferElementClass __parent__;
85 } XferElementGlueClass;
87 static GObjectClass *parent_class = NULL;
90 * Utility functions, etc.
95 XferElementGlue *self)
97 if (pipe(self->pipe) < 0)
98 g_critical(_("Could not create pipe: %s"), strerror(errno));
103 XferElementGlue *self)
105 xfer_queue_message(XFER_ELEMENT(self)->xfer,
106 xmsg_new((XferElement *)self, XMSG_DONE, 0));
113 DirectTCPAddr **addrsp)
116 sockaddr_union data_addr;
117 DirectTCPAddr *addrs;
119 struct addrinfo *res;
120 struct addrinfo *res_addr;
121 sockaddr_union *addr = NULL;
123 if (resolve_hostname("localhost", 0, &res, NULL) != 0) {
124 xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno));
127 for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
128 if (res_addr->ai_family == AF_INET) {
129 addr = (sockaddr_union *)res_addr->ai_addr;
134 addr = (sockaddr_union *)res->ai_addr;
137 sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0);
139 xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
144 if (bind(sock, (struct sockaddr *)addr, len) != 0) {
145 xfer_cancel_with_error(elt, "bind(): %s", strerror(errno));
150 if (listen(sock, 1) < 0) {
151 xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
155 /* TODO: which addresses should this display? all ifaces? localhost? */
156 len = sizeof(data_addr);
157 if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0)
158 error("getsockname(): %s", strerror(errno));
160 addrs = g_new0(DirectTCPAddr, 2);
161 copy_sockaddr(&addrs[0], &data_addr);
171 return !XFER_ELEMENT(data)->cancelled;
176 XferElementGlue *self,
180 g_assert(*socketp != -1);
182 if ((sock = interruptible_accept(*socketp, NULL, NULL,
183 prolong_accept, self)) == -1) {
184 /* if the accept was interrupted due to a cancellation, then do not
185 * add a further error message */
186 if (errno == 0 && XFER_ELEMENT(self)->cancelled)
189 xfer_cancel_with_error(XFER_ELEMENT(self),
190 _("Error accepting incoming connection: %s"), strerror(errno));
191 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
195 /* close the listening socket now, for good measure */
203 do_directtcp_connect(
204 XferElementGlue *self,
205 DirectTCPAddr *addrs)
207 XferElement *elt = XFER_ELEMENT(self);
212 g_debug("element-glue got no directtcp addresses to connect to!");
213 if (!elt->cancelled) {
214 xfer_cancel_with_error(elt,
215 "%s got no directtcp addresses to connect to",
216 xfer_element_repr(elt));
221 /* set up the sockaddr -- IPv4 only */
222 copy_sockaddr(&addr, addrs);
224 g_debug("do_directtcp_connect making data connection to %s", str_sockaddr(&addr));
225 sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
227 xfer_cancel_with_error(elt,
228 "socket(): %s", strerror(errno));
231 if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
232 xfer_cancel_with_error(elt,
233 "connect(): %s", strerror(errno));
237 g_debug("connected to %s", str_sockaddr(&addr));
242 wait_until_xfer_cancelled(elt->xfer);
246 #define GLUE_BUFFER_SIZE 32768
247 #define GLUE_RING_BUFFER_SIZE 32
249 #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
255 /* if self->read_fdp or self->write_fdp are pointing to this integer, then they
256 * should be redirected to point to the upstream's output_fd or downstream's
257 * input_fd, respectively, at the first call to get_read_fd or get_write_fd,
259 static int neighboring_element_fd = -1;
261 #define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd)
263 _get_read_fd(XferElementGlue *self)
266 return -1; /* shouldn't happen.. */
268 if (self->read_fdp == &neighboring_element_fd) {
269 XferElement *elt = XFER_ELEMENT(self);
270 self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1);
272 self->read_fd = *self->read_fdp;
273 *self->read_fdp = -1;
275 self->read_fdp = NULL;
276 return self->read_fd;
279 #define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd)
281 _get_write_fd(XferElementGlue *self)
283 if (!self->write_fdp)
284 return -1; /* shouldn't happen.. */
286 if (self->write_fdp == &neighboring_element_fd) {
287 XferElement *elt = XFER_ELEMENT(self);
288 self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1);
290 self->write_fd = *self->write_fdp;
291 *self->write_fdp = -1;
293 self->write_fdp = NULL;
294 return self->write_fd;
298 close_read_fd(XferElementGlue *self)
300 int fd = get_read_fd(self);
306 close_write_fd(XferElementGlue *self)
308 int fd = get_write_fd(self);
314 * Worker thread utility functions
318 pull_and_write(XferElementGlue *self)
320 XferElement *elt = XFER_ELEMENT(self);
321 int fd = get_write_fd(self);
322 self->write_fdp = NULL;
324 while (!elt->cancelled) {
328 /* get a buffer from upstream */
329 buf = xfer_element_pull_buffer(elt->upstream, &len);
334 if (full_write(fd, buf, len) < len) {
335 if (!elt->cancelled) {
336 xfer_cancel_with_error(elt,
337 _("Error writing to fd %d: %s"), fd, strerror(errno));
338 wait_until_xfer_cancelled(elt->xfer);
347 if (elt->cancelled && elt->expect_eof)
348 xfer_element_drain_buffers(elt->upstream);
350 /* close the fd we've been writing, as an EOF signal to downstream, and
351 * set it to -1 to avoid accidental re-use */
352 close_write_fd(self);
356 read_and_write(XferElementGlue *self)
358 XferElement *elt = XFER_ELEMENT(self);
359 /* dynamically allocate a buffer, in case this thread has
360 * a limited amount of stack allocated */
361 char *buf = g_malloc(GLUE_BUFFER_SIZE);
362 int rfd = get_read_fd(self);
363 int wfd = get_write_fd(self);
365 while (!elt->cancelled) {
368 /* read from upstream */
369 len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
370 if (len < GLUE_BUFFER_SIZE) {
372 if (!elt->cancelled) {
373 xfer_cancel_with_error(elt,
374 _("Error reading from fd %d: %s"), rfd, strerror(errno));
375 wait_until_xfer_cancelled(elt->xfer);
378 } else if (len == 0) { /* we only count a zero-length read as EOF */
383 /* write the buffer fully */
384 if (full_write(wfd, buf, len) < len) {
385 if (!elt->cancelled) {
386 xfer_cancel_with_error(elt,
387 _("Could not write to fd %d: %s"), wfd, strerror(errno));
388 wait_until_xfer_cancelled(elt->xfer);
394 if (elt->cancelled && elt->expect_eof)
395 xfer_element_drain_fd(rfd);
397 /* close the read fd. If it's not at EOF, then upstream will get EPIPE, which will hopefully
398 * kill it and complete the cancellation */
401 /* close the fd we've been writing, as an EOF signal to downstream */
402 close_write_fd(self);
409 XferElementGlue *self)
411 XferElement *elt = XFER_ELEMENT(self);
412 int fd = get_read_fd(self);
414 while (!elt->cancelled) {
415 char *buf = g_malloc(GLUE_BUFFER_SIZE);
418 /* read a buffer from upstream */
419 len = full_read(fd, buf, GLUE_BUFFER_SIZE);
420 if (len < GLUE_BUFFER_SIZE) {
422 if (!elt->cancelled) {
423 int saved_errno = errno;
424 xfer_cancel_with_error(elt,
425 _("Error reading from fd %d: %s"), fd, strerror(saved_errno));
426 g_debug("element-glue: error reading from fd %d: %s",
427 fd, strerror(saved_errno));
428 wait_until_xfer_cancelled(elt->xfer);
432 } else if (len == 0) { /* we only count a zero-length read as EOF */
438 xfer_element_push_buffer(elt->downstream, buf, len);
441 if (elt->cancelled && elt->expect_eof)
442 xfer_element_drain_fd(fd);
444 /* send an EOF indication downstream */
445 xfer_element_push_buffer(elt->downstream, NULL, 0);
447 /* close the read fd, since it's at EOF */
452 pull_and_push(XferElementGlue *self)
454 XferElement *elt = XFER_ELEMENT(self);
455 gboolean eof_sent = FALSE;
457 while (!elt->cancelled) {
461 /* get a buffer from upstream */
462 buf = xfer_element_pull_buffer(elt->upstream, &len);
464 /* and push it downstream */
465 xfer_element_push_buffer(elt->downstream, buf, len);
473 if (elt->cancelled && elt->expect_eof)
474 xfer_element_drain_buffers(elt->upstream);
477 xfer_element_push_buffer(elt->downstream, NULL, 0);
484 XferElement *elt = XFER_ELEMENT(data);
485 XferElementGlue *self = XFER_ELEMENT_GLUE(data);
487 switch (mech_pair(elt->input_mech, elt->output_mech)) {
488 case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
489 read_and_write(self);
492 case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
493 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
497 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
498 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
499 pull_and_write(self);
502 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
506 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
507 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
508 if ((self->output_data_socket = do_directtcp_connect(self,
509 elt->downstream->input_listen_addrs)) == -1)
511 self->write_fdp = &self->output_data_socket;
512 read_and_write(self);
515 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
516 if ((self->output_data_socket = do_directtcp_connect(self,
517 elt->downstream->input_listen_addrs)) == -1)
519 self->write_fdp = &self->output_data_socket;
520 pull_and_write(self);
523 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
524 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
525 if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
527 self->read_fdp = &self->input_data_socket;
528 read_and_write(self);
531 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
532 if ((self->input_data_socket = do_directtcp_accept(self,
533 &self->input_listen_socket)) == -1)
535 self->read_fdp = &self->input_data_socket;
539 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
540 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
541 case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
542 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
543 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
544 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
545 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
546 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
547 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
548 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
550 g_assert_not_reached();
553 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
554 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
555 if ((self->output_data_socket = do_directtcp_accept(self,
556 &self->output_listen_socket)) == -1)
558 self->write_fdp = &self->output_data_socket;
559 read_and_write(self);
562 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
563 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
564 if ((self->input_data_socket = do_directtcp_connect(self,
565 elt->upstream->output_listen_addrs)) == -1)
567 self->read_fdp = &self->input_data_socket;
568 read_and_write(self);
571 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
572 if ((self->input_data_socket = do_directtcp_connect(self,
573 elt->upstream->output_listen_addrs)) == -1)
575 self->read_fdp = &self->input_data_socket;
579 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
580 if ((self->output_data_socket = do_directtcp_accept(self,
581 &self->output_listen_socket)) == -1)
583 self->write_fdp = &self->output_data_socket;
584 pull_and_write(self);
587 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
588 /* TODO: use async accept's here to avoid order dependency */
589 if ((self->output_data_socket = do_directtcp_accept(self,
590 &self->output_listen_socket)) == -1)
592 self->write_fdp = &self->output_data_socket;
593 if ((self->input_data_socket = do_directtcp_accept(self,
594 &self->input_listen_socket)) == -1)
596 self->read_fdp = &self->input_data_socket;
597 read_and_write(self);
600 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
601 /* TODO: use async connects and select() to avoid order dependency here */
602 if ((self->input_data_socket = do_directtcp_connect(self,
603 elt->upstream->output_listen_addrs)) == -1)
605 self->read_fdp = &self->input_data_socket;
606 if ((self->output_data_socket = do_directtcp_connect(self,
607 elt->downstream->input_listen_addrs)) == -1)
609 self->write_fdp = &self->output_data_socket;
610 read_and_write(self);
614 send_xfer_done(self);
627 XferElementGlue *self = (XferElementGlue *)elt;
628 gboolean need_ring = FALSE;
629 gboolean need_listen_input = FALSE;
630 gboolean need_listen_output = FALSE;
632 g_assert(elt->input_mech != XFER_MECH_NONE);
633 g_assert(elt->output_mech != XFER_MECH_NONE);
634 g_assert(elt->input_mech != elt->output_mech);
636 self->read_fdp = NULL;
637 self->write_fdp = NULL;
638 self->on_push = PUSH_INVALID;
639 self->on_pull = PULL_INVALID;
640 self->need_thread = FALSE;
642 switch (mech_pair(elt->input_mech, elt->output_mech)) {
643 case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
644 /* thread will read from one fd and write to the other */
645 self->read_fdp = &neighboring_element_fd;
646 self->write_fdp = &neighboring_element_fd;
647 self->need_thread = TRUE;
650 case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
651 /* thread will read from one fd and call push_buffer downstream */
652 self->read_fdp = &neighboring_element_fd;
653 self->need_thread = TRUE;
656 case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
657 self->read_fdp = &neighboring_element_fd;
658 self->on_pull = PULL_FROM_FD;
661 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
662 /* thread will connect for output, then read from fd and write to the
664 self->read_fdp = &neighboring_element_fd;
665 self->need_thread = TRUE;
668 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
669 /* thread will accept output conn, then read from upstream and write to socket */
670 self->read_fdp = &neighboring_element_fd;
671 self->need_thread = TRUE;
672 need_listen_output = TRUE;
675 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
677 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
678 self->pipe[1] = -1; /* upstream will close this for us */
679 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
680 self->pipe[0] = -1; /* downstream will close this for us */
683 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
684 /* thread will read from pipe and call downstream's push_buffer */
686 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
687 self->pipe[1] = -1; /* upstream will close this for us */
688 self->read_fdp = &self->pipe[0];
689 self->need_thread = TRUE;
692 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
694 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
695 self->pipe[1] = -1; /* upstream will close this for us */
696 self->on_pull = PULL_FROM_FD;
697 self->read_fdp = &self->pipe[0];
700 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
701 /* thread will connect for output, then read from pipe and write to socket */
703 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
704 self->pipe[1] = -1; /* upstream will close this for us */
705 self->read_fdp = &self->pipe[0];
706 self->need_thread = TRUE;
709 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
710 /* thread will accept output conn, then read from pipe and write to socket */
712 g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
713 self->pipe[1] = -1; /* upstream will close this for us */
714 self->read_fdp = &self->pipe[0];
715 self->need_thread = TRUE;
716 need_listen_output = TRUE;
719 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
721 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
722 self->pipe[0] = -1; /* downstream will close this for us */
723 self->on_push = PUSH_TO_FD;
724 self->write_fdp = &self->pipe[1];
727 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
728 self->on_push = PUSH_TO_FD;
729 self->write_fdp = &neighboring_element_fd;
732 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
733 self->on_push = PUSH_TO_RING_BUFFER;
734 self->on_pull = PULL_FROM_RING_BUFFER;
738 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
739 /* push will connect for output first */
740 self->on_push = PUSH_TO_FD | PUSH_CONNECT_FIRST;
743 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
744 /* push will accept for output first */
745 self->on_push = PUSH_TO_FD | PUSH_ACCEPT_FIRST;
746 need_listen_output = TRUE;
749 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
750 /* thread will pull from upstream and write to pipe */
752 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
753 self->pipe[0] = -1; /* downstream will close this for us */
754 self->write_fdp = &self->pipe[1];
755 self->need_thread = TRUE;
758 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
759 /* thread will pull from upstream and write to downstream */
760 self->write_fdp = &neighboring_element_fd;
761 self->need_thread = TRUE;
764 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
765 /* thread will pull from upstream and push to downstream */
766 self->need_thread = TRUE;
769 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
770 /* thread will connect for output, then pull from upstream and write to socket */
771 self->need_thread = TRUE;
774 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
775 /* thread will accept for output, then pull from upstream and write to socket */
776 self->need_thread = TRUE;
777 need_listen_output = TRUE;
780 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
781 /* thread will accept for input, then read from socket and write to pipe */
783 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
784 self->pipe[0] = -1; /* downstream will close this for us */
785 self->write_fdp = &self->pipe[1];
786 self->need_thread = TRUE;
787 need_listen_input = TRUE;
790 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
791 /* thread will accept for input, then read from socket and write to downstream */
792 self->write_fdp = &neighboring_element_fd;
793 self->need_thread = TRUE;
794 need_listen_input = TRUE;
797 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
798 /* thread will accept for input, then read from socket and push downstream */
799 self->need_thread = TRUE;
800 need_listen_input = TRUE;
803 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
804 /* first pull will accept for input, then read from socket */
805 self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
806 need_listen_input = TRUE;
809 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
810 /* thread will accept on both sides, then copy from socket to socket */
811 self->need_thread = TRUE;
812 need_listen_input = TRUE;
813 need_listen_output = TRUE;
816 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
817 /* thread will connect for input, then read from socket and write to pipe */
819 g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
820 self->pipe[0] = -1; /* downstream will close this for us */
821 self->write_fdp = &self->pipe[1];
822 self->need_thread = TRUE;
825 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
826 /* thread will connect for input, then read from socket and write to downstream */
827 self->write_fdp = &neighboring_element_fd;
828 self->need_thread = TRUE;
831 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
832 /* thread will connect for input, then read from socket and push downstream */
833 self->need_thread = TRUE;
836 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
837 /* first pull will connect for input, then read from socket */
838 self->on_pull = PULL_FROM_FD | PULL_CONNECT_FIRST;
841 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
842 /* thread will connect on both sides, then copy from socket to socket */
843 self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
844 self->need_thread = TRUE;
848 g_assert_not_reached();
852 /* set up ring if desired */
854 self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
855 self->ring_used_sem = amsemaphore_new_with_value(0);
856 self->ring_free_sem = amsemaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
859 if (need_listen_input) {
860 if (!do_directtcp_listen(elt,
861 &self->input_listen_socket, &elt->input_listen_addrs))
864 if (need_listen_output) {
865 if (!do_directtcp_listen(elt,
866 &self->output_listen_socket, &elt->output_listen_addrs))
877 XferElementGlue *self = (XferElementGlue *)elt;
879 if (self->need_thread)
880 self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL);
882 /* we're active if we have a thread that will eventually die */
883 return self->need_thread;
891 XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
893 /* accept first, if required */
894 if (self->on_pull & PULL_ACCEPT_FIRST) {
895 /* don't accept the next time around */
896 self->on_pull &= ~PULL_ACCEPT_FIRST;
898 if (elt->cancelled) {
903 if ((self->input_data_socket = do_directtcp_accept(self,
904 &self->input_listen_socket)) == -1) {
905 /* do_directtcp_accept already signalled an error; xfer
911 /* read from this new socket */
912 self->read_fdp = &self->input_data_socket;
915 /* or connect first, if required */
916 if (self->on_pull & PULL_CONNECT_FIRST) {
917 /* don't connect the next time around */
918 self->on_pull &= ~PULL_CONNECT_FIRST;
920 if (elt->cancelled) {
925 if ((self->input_data_socket = do_directtcp_connect(self,
926 elt->upstream->output_listen_addrs)) == -1) {
927 /* do_directtcp_connect already signalled an error; xfer
933 /* read from this new socket */
934 self->read_fdp = &self->input_data_socket;
937 switch (self->on_pull) {
938 case PULL_FROM_RING_BUFFER: {
941 if (elt->cancelled) {
942 /* the finalize method will empty the ring buffer */
947 /* make sure there's at least one element available */
948 amsemaphore_down(self->ring_used_sem);
951 buf = self->ring[self->ring_tail].buf;
952 *size = self->ring[self->ring_tail].size;
953 self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
955 /* and mark this element as free to be overwritten */
956 amsemaphore_up(self->ring_free_sem);
962 int fd = get_read_fd(self);
966 /* if the fd is already closed, it's possible upstream bailed out
967 * so quickly that we didn't even get a look at the fd */
968 if (elt->cancelled || fd == -1) {
971 xfer_element_drain_fd(fd);
980 buf = g_malloc(GLUE_BUFFER_SIZE);
982 /* read from upstream */
983 len = full_read(fd, buf, GLUE_BUFFER_SIZE);
984 if (len < GLUE_BUFFER_SIZE) {
986 if (!elt->cancelled) {
987 xfer_cancel_with_error(elt,
988 _("Error reading from fd %d: %s"), fd, strerror(errno));
989 wait_until_xfer_cancelled(elt->xfer);
996 /* and finish off the upstream */
997 if (elt->expect_eof) {
998 xfer_element_drain_fd(fd);
1000 close_read_fd(self);
1001 } else if (len == 0) {
1007 /* signal EOF to downstream */
1008 close_read_fd(self);
1012 *size = (size_t)len;
1019 g_assert_not_reached();
1030 XferElementGlue *self = (XferElementGlue *)elt;
1032 /* accept first, if required */
1033 if (self->on_push & PUSH_ACCEPT_FIRST) {
1034 /* don't accept the next time around */
1035 self->on_push &= ~PUSH_ACCEPT_FIRST;
1037 if (elt->cancelled) {
1041 if ((self->output_data_socket = do_directtcp_accept(self,
1042 &self->output_listen_socket)) == -1) {
1043 /* do_directtcp_accept already signalled an error; xfer
1048 /* write to this new socket */
1049 self->write_fdp = &self->output_data_socket;
1052 /* or connect first, if required */
1053 if (self->on_push & PUSH_CONNECT_FIRST) {
1054 /* don't accept the next time around */
1055 self->on_push &= ~PUSH_CONNECT_FIRST;
1057 if (elt->cancelled) {
1061 if ((self->output_data_socket = do_directtcp_connect(self,
1062 elt->downstream->input_listen_addrs)) == -1) {
1063 /* do_directtcp_connect already signalled an error; xfer
1068 /* read from this new socket */
1069 self->write_fdp = &self->output_data_socket;
1072 switch (self->on_push) {
1073 case PUSH_TO_RING_BUFFER:
1074 /* just drop packets if the transfer has been cancelled */
1075 if (elt->cancelled) {
1080 /* make sure there's at least one element free */
1081 amsemaphore_down(self->ring_free_sem);
1084 self->ring[self->ring_head].buf = buf;
1085 self->ring[self->ring_head].size = len;
1086 self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
1088 /* and mark this element as available for reading */
1089 amsemaphore_up(self->ring_used_sem);
1094 int fd = get_write_fd(self);
1096 /* if the fd is already closed, it's possible upstream bailed out
1097 * so quickly that we didn't even get a look at the fd. In this
1098 * case we can assume the xfer has been cancelled and just discard
1103 if (elt->cancelled) {
1104 if (!elt->expect_eof || !buf) {
1105 close_write_fd(self);
1107 /* hack to ensure we won't close the fd again, if we get another push */
1108 elt->expect_eof = TRUE;
1116 /* write the full buffer to the fd, or close on EOF */
1118 if (full_write(fd, buf, len) < len) {
1119 if (!elt->cancelled) {
1120 xfer_cancel_with_error(elt,
1121 _("Error writing to fd %d: %s"), fd, strerror(errno));
1122 wait_until_xfer_cancelled(elt->xfer);
1124 /* nothing special to do to handle a cancellation */
1128 close_write_fd(self);
1136 g_assert_not_reached();
1143 XferElementGlue *self)
1145 XferElement *elt = (XferElement *)self;
1146 elt->can_generate_eof = TRUE;
1147 self->pipe[0] = self->pipe[1] = -1;
1148 self->input_listen_socket = -1;
1149 self->output_listen_socket = -1;
1150 self->input_data_socket = -1;
1151 self->output_data_socket = -1;
1153 self->write_fd = -1;
1160 XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
1162 /* first make sure the worker thread has finished up */
1164 g_thread_join(self->thread);
1166 /* close our pipes and fd's if they're still open */
1167 if (self->pipe[0] != -1) close(self->pipe[0]);
1168 if (self->pipe[1] != -1) close(self->pipe[1]);
1169 if (self->input_data_socket != -1) close(self->input_data_socket);
1170 if (self->output_data_socket != -1) close(self->output_data_socket);
1171 if (self->input_listen_socket != -1) close(self->input_listen_socket);
1172 if (self->output_listen_socket != -1) close(self->output_listen_socket);
1173 if (self->read_fd != -1) close(self->read_fd);
1174 if (self->write_fd != -1) close(self->write_fd);
1177 /* empty the ring buffer, ignoring syncronization issues */
1178 while (self->ring_used_sem->value) {
1179 if (self->ring[self->ring_tail].buf)
1180 amfree(self->ring[self->ring_tail].buf);
1181 self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
1185 amsemaphore_free(self->ring_used_sem);
1186 amsemaphore_free(self->ring_free_sem);
1190 G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1193 static xfer_element_mech_pair_t _pairs[] = {
1194 { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1195 { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1196 { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1197 { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1198 { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1200 { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */
1201 { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/
1202 { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */
1203 { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/
1204 { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1206 { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */
1207 { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1208 { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */
1209 { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1210 { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1212 { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */
1213 { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1214 { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */
1215 { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1216 { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1218 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1219 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1220 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1221 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1222 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1224 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1225 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1226 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1227 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1228 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1231 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1233 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
1237 XferElementGlueClass * selfc)
1239 XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1240 GObjectClass *goc = G_OBJECT_CLASS(selfc);
1242 klass->setup = setup_impl;
1243 klass->start = start_impl;
1244 klass->push_buffer = push_buffer_impl;
1245 klass->pull_buffer = pull_buffer_impl;
1247 klass->perl_class = "Amanda::Xfer::Element::Glue";
1248 klass->mech_pairs = xfer_element_glue_mech_pairs;
1250 goc->finalize = finalize_impl;
1252 parent_class = g_type_class_peek_parent(selfc);
1256 xfer_element_glue_get_type (void)
1258 static GType type = 0;
1260 if G_UNLIKELY(type == 0) {
1261 static const GTypeInfo info = {
1262 sizeof (XferElementGlueClass),
1263 (GBaseInitFunc) NULL,
1264 (GBaseFinalizeFunc) NULL,
1265 (GClassInitFunc) class_init,
1266 (GClassFinalizeFunc) NULL,
1267 NULL /* class_data */,
1268 sizeof (XferElementGlue),
1269 0 /* n_preallocs */,
1270 (GInstanceInitFunc) instance_init,
1274 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
1280 /* create an element of this class; prototype is in xfer-element.h */
1282 xfer_element_glue(void)
1284 XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
1285 XferElement *elt = XFER_ELEMENT(self);