2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 as published
7 * by the Free Software Foundation.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
23 #include "element-glue.h"
25 #include "directtcp.h"
26 #include "sockaddr-util.h"
32 typedef struct XferElementGlue_ {
33 XferElement __parent__;
35 /* instructions to push_buffer_impl */
38 PUSH_TO_FD, /* write to *write_fdp */
41 PUSH_ACCEPT_FIRST = (1 << 16),
42 PUSH_CONNECT_FIRST = (2 << 16),
45 /* instructions to pull_buffer_impl */
47 PULL_FROM_RING_BUFFER,
48 PULL_FROM_FD, /* read from *read_fdp */
51 PULL_ACCEPT_FIRST = (1 << 16),
52 PULL_CONNECT_FIRST = (2 << 16),
60 /* the stuff we might use, depending on what flavor of glue we're
63 int input_listen_socket, output_listen_socket;
64 int input_data_socket, output_data_socket;
66 /* a ring buffer of ptr/size pairs with semaphores */
67 struct { gpointer buf; size_t size; } *ring;
68 semaphore_t *ring_used_sem, *ring_free_sem;
69 gint ring_head, ring_tail;
72 GThreadFunc threadfunc;
79 typedef struct XferElementGlueClass_ {
80 XferElementClass __parent__;
81 } XferElementGlueClass;
83 static GObjectClass *parent_class = NULL;
86 * Utility functions, etc.
91 XferElementGlue *self)
93 if (pipe(self->pipe) < 0)
94 g_critical(_("Could not create pipe: %s"), strerror(errno));
99 XferElementGlue *self)
101 xfer_queue_message(XFER_ELEMENT(self)->xfer,
102 xmsg_new((XferElement *)self, XMSG_DONE, 0));
109 DirectTCPAddr **addrsp)
113 DirectTCPAddr *addrs;
116 sock = *sockp = socket(AF_INET, SOCK_STREAM, 0);
118 xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
122 if (listen(sock, 1) < 0) {
123 xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
127 /* TODO: which addresses should this display? all ifaces? localhost? */
129 if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
130 error("getsockname(): %s", strerror(errno));
131 g_assert(SU_GET_FAMILY(&addr) == AF_INET);
133 addrs = g_new0(DirectTCPAddr, 2);
134 addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); /* TODO: be smarter! */
135 addrs[0].port = SU_GET_PORT(&addr);
143 XferElementGlue *self,
147 g_assert(*socketp != -1);
149 if ((sock = accept(*socketp, NULL, NULL)) == -1) {
150 xfer_cancel_with_error(XFER_ELEMENT(self),
151 _("Error accepting incoming connection: %s"), strerror(errno));
152 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
156 /* close the listening socket now, for good measure */
164 do_directtcp_connect(
165 XferElementGlue *self,
166 DirectTCPAddr *addrs)
168 XferElement *elt = XFER_ELEMENT(self);
173 g_debug("element-glue got no directtcp addresses to connect to!");
174 if (!elt->cancelled) {
175 xfer_cancel_with_error(elt,
176 "%s got no directtcp addresses to connect to",
177 xfer_element_repr(elt));
182 /* set up the sockaddr -- IPv4 only */
183 SU_INIT(&addr, AF_INET);
184 SU_SET_PORT(&addr, addrs->port);
185 ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
187 g_debug("making data connection to %s", str_sockaddr(&addr));
188 sock = socket(AF_INET, SOCK_STREAM, 0);
190 xfer_cancel_with_error(elt,
191 "socket(): %s", strerror(errno));
194 if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
195 xfer_cancel_with_error(elt,
196 "connect(): %s", strerror(errno));
200 g_debug("connected to %s", str_sockaddr(&addr));
205 wait_until_xfer_cancelled(elt->xfer);
209 #define GLUE_BUFFER_SIZE 32768
210 #define GLUE_RING_BUFFER_SIZE 32
212 #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
214 /* if self->read_fdp or self->write_fdp are pointing to this integer, then
215 * they should be redirected to point to the upstream's output_fd or
216 * downstream's input_fd, respectively, at start() */
217 static int neighboring_element_fd = -1;
220 * Worker thread utility functions
224 pull_and_write(XferElementGlue *self)
226 XferElement *elt = XFER_ELEMENT(self);
227 int fd = *self->write_fdp;
229 while (!elt->cancelled) {
233 /* get a buffer from upstream */
234 buf = xfer_element_pull_buffer(elt->upstream, &len);
239 if (full_write(fd, buf, len) < len) {
240 if (!elt->cancelled) {
241 xfer_cancel_with_error(elt,
242 _("Error writing to fd %d: %s"), fd, strerror(errno));
243 wait_until_xfer_cancelled(elt->xfer);
252 if (elt->cancelled && elt->expect_eof)
253 xfer_element_drain_by_pulling(elt->upstream);
255 /* close the fd we've been writing, as an EOF signal to downstream, and
256 * set it to -1 to avoid accidental re-use */
258 *self->write_fdp = -1;
262 read_and_write(XferElementGlue *self)
264 XferElement *elt = XFER_ELEMENT(self);
265 int rfd = *self->read_fdp;
266 int wfd = *self->write_fdp;
268 /* dynamically allocate a buffer, in case this thread has
269 * a limited amount of stack allocated */
270 char *buf = g_malloc(GLUE_BUFFER_SIZE);
272 while (!elt->cancelled) {
275 /* read from upstream */
276 len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
277 if (len < GLUE_BUFFER_SIZE) {
279 if (!elt->cancelled) {
280 xfer_cancel_with_error(elt,
281 _("Error reading from fd %d: %s"), rfd, strerror(errno));
282 wait_until_xfer_cancelled(elt->xfer);
285 } else if (len == 0) { /* we only count a zero-length read as EOF */
290 /* write the buffer fully */
291 if (full_write(wfd, buf, len) < len) {
292 if (!elt->cancelled) {
293 xfer_cancel_with_error(elt,
294 _("Could not write to fd %d: %s"), wfd, strerror(errno));
295 wait_until_xfer_cancelled(elt->xfer);
301 if (elt->cancelled && elt->expect_eof)
302 xfer_element_drain_by_pulling(elt->upstream);
304 /* close the read fd, if it's at EOF, and set it to -1 to avoid accidental
306 if (!elt->cancelled || elt->expect_eof) {
308 *self->read_fdp = -1;
311 /* close the fd we've been writing, as an EOF signal to downstream, and
312 * set it to -1 to avoid accidental re-use */
314 *self->write_fdp = -1;
321 XferElementGlue *self)
323 XferElement *elt = XFER_ELEMENT(self);
324 int fd = *self->read_fdp;
326 while (!elt->cancelled) {
327 char *buf = g_malloc(GLUE_BUFFER_SIZE);
330 /* read a buffer from upstream */
331 len = full_read(fd, buf, GLUE_BUFFER_SIZE);
332 if (len < GLUE_BUFFER_SIZE) {
334 if (!elt->cancelled) {
335 int saved_errno = errno;
336 xfer_cancel_with_error(elt,
337 _("Error reading from fd %d: %s"), fd, strerror(saved_errno));
338 g_debug("element-glue: error reading from fd %d: %s",
339 fd, strerror(saved_errno));
340 wait_until_xfer_cancelled(elt->xfer);
343 } else if (len == 0) { /* we only count a zero-length read as EOF */
349 xfer_element_push_buffer(elt->downstream, buf, len);
352 if (elt->cancelled && elt->expect_eof)
353 xfer_element_drain_by_reading(fd);
355 /* send an EOF indication downstream */
356 xfer_element_push_buffer(elt->downstream, NULL, 0);
358 /* close the read fd, since it's at EOF, and set it to -1 to avoid accidental
361 *self->read_fdp = -1;
365 pull_and_push(XferElementGlue *self)
367 XferElement *elt = XFER_ELEMENT(self);
368 gboolean eof_sent = FALSE;
370 while (!elt->cancelled) {
374 /* get a buffer from upstream */
375 buf = xfer_element_pull_buffer(elt->upstream, &len);
377 /* and push it downstream */
378 xfer_element_push_buffer(elt->downstream, buf, len);
386 if (elt->cancelled && elt->expect_eof)
387 xfer_element_drain_by_pulling(elt->upstream);
390 xfer_element_push_buffer(elt->downstream, NULL, 0);
397 XferElement *elt = XFER_ELEMENT(data);
398 XferElementGlue *self = XFER_ELEMENT_GLUE(data);
400 switch (mech_pair(elt->input_mech, elt->output_mech)) {
401 case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
402 read_and_write(self);
405 case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
406 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
410 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
411 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
412 pull_and_write(self);
415 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
419 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
420 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
421 if ((self->output_data_socket = do_directtcp_connect(self,
422 elt->downstream->input_listen_addrs)) == -1)
424 self->write_fdp = &self->output_data_socket;
425 read_and_write(self);
428 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
429 if ((self->output_data_socket = do_directtcp_connect(self,
430 elt->downstream->input_listen_addrs)) == -1)
432 self->write_fdp = &self->output_data_socket;
433 pull_and_write(self);
436 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
437 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
438 if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
440 self->read_fdp = &self->input_data_socket;
441 read_and_write(self);
444 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
445 if ((self->input_data_socket = do_directtcp_accept(self,
446 &self->input_listen_socket)) == -1)
448 self->read_fdp = &self->input_data_socket;
452 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
453 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
454 case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
455 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
456 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
457 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
458 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
459 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
460 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
461 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
463 g_assert_not_reached();
466 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
467 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
468 if ((self->output_data_socket = do_directtcp_accept(self,
469 &self->output_listen_socket)) == -1)
471 self->write_fdp = &self->output_data_socket;
472 read_and_write(self);
475 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
476 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
477 if ((self->input_data_socket = do_directtcp_connect(self,
478 elt->upstream->output_listen_addrs)) == -1)
480 self->read_fdp = &self->input_data_socket;
481 read_and_write(self);
484 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
485 if ((self->input_data_socket = do_directtcp_connect(self,
486 elt->upstream->output_listen_addrs)) == -1)
488 self->read_fdp = &self->input_data_socket;
492 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
493 if ((self->output_data_socket = do_directtcp_accept(self,
494 &self->output_listen_socket)) == -1)
496 self->write_fdp = &self->output_data_socket;
497 pull_and_write(self);
500 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
501 /* TODO: use async accept's here to avoid order dependency */
502 if ((self->output_data_socket = do_directtcp_accept(self,
503 &self->output_listen_socket)) == -1)
505 self->write_fdp = &self->output_data_socket;
506 if ((self->input_data_socket = do_directtcp_accept(self,
507 &self->input_listen_socket)) == -1)
509 self->read_fdp = &self->input_data_socket;
510 read_and_write(self);
513 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
514 /* TODO: use async connects and select() to avoid order dependency here */
515 if ((self->input_data_socket = do_directtcp_connect(self,
516 elt->upstream->output_listen_addrs)) == -1)
518 self->read_fdp = &self->input_data_socket;
519 if ((self->output_data_socket = do_directtcp_connect(self,
520 elt->downstream->input_listen_addrs)) == -1)
522 self->write_fdp = &self->output_data_socket;
523 read_and_write(self);
527 send_xfer_done(self);
540 XferElementGlue *self = (XferElementGlue *)elt;
541 gboolean need_ring = FALSE;
542 gboolean need_listen_input = FALSE;
543 gboolean need_listen_output = FALSE;
545 g_assert(elt->input_mech != XFER_MECH_NONE);
546 g_assert(elt->output_mech != XFER_MECH_NONE);
547 g_assert(elt->input_mech != elt->output_mech);
549 self->read_fdp = NULL;
550 self->write_fdp = NULL;
551 self->on_push = PUSH_INVALID;
552 self->on_pull = PULL_INVALID;
553 self->need_thread = FALSE;
555 switch (mech_pair(elt->input_mech, elt->output_mech)) {
556 case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
557 /* thread will read from one fd and write to the other */
558 self->read_fdp = &neighboring_element_fd;
559 self->write_fdp = &neighboring_element_fd;
560 self->need_thread = TRUE;
563 case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
564 /* thread will read from one fd and call push_buffer downstream */
565 self->read_fdp = &neighboring_element_fd;
566 self->need_thread = TRUE;
569 case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
570 self->read_fdp = &neighboring_element_fd;
571 self->on_pull = PULL_FROM_FD;
574 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
575 /* thread will connect for output, then read from fd and write to the
577 self->read_fdp = &neighboring_element_fd;
578 self->need_thread = TRUE;
581 case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
582 /* thread will accept output conn, then read from upstream and write to socket */
583 self->read_fdp = &neighboring_element_fd;
584 self->need_thread = TRUE;
585 need_listen_output = TRUE;
588 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
590 elt->input_fd = self->pipe[1];
591 self->pipe[1] = -1; /* upstream will close this for us */
592 elt->output_fd = self->pipe[0];
593 self->pipe[0] = -1; /* downstream will close this for us */
596 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
597 /* thread will read from pipe and call downstream's push_buffer */
599 elt->input_fd = self->pipe[1];
600 self->pipe[1] = -1; /* upstream will close this for us */
601 self->read_fdp = &self->pipe[0];
602 self->need_thread = TRUE;
605 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
607 elt->input_fd = self->pipe[1];
608 self->pipe[1] = -1; /* upstream will close this for us */
609 self->on_pull = PULL_FROM_FD;
610 self->read_fdp = &self->pipe[0];
613 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
614 /* thread will connect for output, then read from pipe and write to socket */
616 elt->input_fd = self->pipe[1];
617 self->pipe[1] = -1; /* upstream will close this for us */
618 self->read_fdp = &self->pipe[0];
619 self->need_thread = TRUE;
622 case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
623 /* thread will accept output conn, then read from pipe and write to socket */
625 elt->input_fd = self->pipe[1];
626 self->pipe[1] = -1; /* upstream will close this for us */
627 self->read_fdp = &self->pipe[0];
628 self->need_thread = TRUE;
629 need_listen_output = TRUE;
632 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
634 elt->output_fd = self->pipe[0];
635 self->pipe[0] = -1; /* downstream will close this for us */
636 self->on_push = PUSH_TO_FD;
637 self->write_fdp = &self->pipe[1];
640 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
641 self->on_push = PUSH_TO_FD;
642 self->write_fdp = &neighboring_element_fd;
645 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
646 self->on_push = PUSH_TO_RING_BUFFER;
647 self->on_pull = PULL_FROM_RING_BUFFER;
651 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
652 /* push will connect for output first */
653 self->on_push = PUSH_TO_FD | PUSH_CONNECT_FIRST;
656 case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
657 /* push will accept for output first */
658 self->on_push = PUSH_TO_FD | PUSH_ACCEPT_FIRST;
659 need_listen_output = TRUE;
662 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
663 /* thread will pull from upstream and write to pipe */
665 elt->output_fd = self->pipe[0];
666 self->pipe[0] = -1; /* downstream will close this for us */
667 self->write_fdp = &self->pipe[1];
668 self->need_thread = TRUE;
671 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
672 /* thread will pull from upstream and write to downstream */
673 self->write_fdp = &neighboring_element_fd;
674 self->need_thread = TRUE;
677 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
678 /* thread will pull from upstream and push to downstream */
679 self->need_thread = TRUE;
682 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
683 /* thread will connect for output, then pull from upstream and write to socket */
684 self->need_thread = TRUE;
687 case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
688 /* thread will accept for output, then pull from upstream and write to socket */
689 self->need_thread = TRUE;
690 need_listen_output = TRUE;
693 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
694 /* thread will accept for input, then read from socket and write to pipe */
696 elt->output_fd = self->pipe[0];
697 self->pipe[0] = -1; /* downstream will close this for us */
698 self->write_fdp = &self->pipe[1];
699 self->need_thread = TRUE;
700 need_listen_input = TRUE;
703 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
704 /* thread will accept for input, then read from socket and write to downstream */
705 self->write_fdp = &neighboring_element_fd;
706 self->need_thread = TRUE;
707 need_listen_input = TRUE;
710 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
711 /* thread will accept for input, then read from socket and push downstream */
712 self->need_thread = TRUE;
713 need_listen_input = TRUE;
716 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
717 /* first pull will accept for input, then read from socket */
718 self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
719 need_listen_input = TRUE;
722 case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
723 /* thread will accept on both sides, then copy from socket to socket */
724 self->need_thread = TRUE;
725 need_listen_input = TRUE;
726 need_listen_output = TRUE;
729 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
730 /* thread will connect for input, then read from socket and write to pipe */
732 elt->output_fd = self->pipe[0];
733 self->pipe[0] = -1; /* downstream will close this for us */
734 self->write_fdp = &self->pipe[1];
735 self->need_thread = TRUE;
738 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
739 /* thread will connect for input, then read from socket and write to downstream */
740 self->write_fdp = &neighboring_element_fd;
741 self->need_thread = TRUE;
744 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
745 /* thread will connect for input, then read from socket and push downstream */
746 self->need_thread = TRUE;
749 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
750 /* first pull will connect for input, then read from socket */
751 self->on_pull = PULL_FROM_FD | PULL_CONNECT_FIRST;
754 case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
755 /* thread will connect on both sides, then copy from socket to socket */
756 self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
757 self->need_thread = TRUE;
761 g_assert_not_reached();
765 /* set up ring if desired */
767 self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
768 self->ring_used_sem = semaphore_new_with_value(0);
769 self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
772 if (need_listen_input) {
773 if (!do_directtcp_listen(elt,
774 &self->input_listen_socket, &elt->input_listen_addrs))
777 if (need_listen_output) {
778 if (!do_directtcp_listen(elt,
779 &self->output_listen_socket, &elt->output_listen_addrs))
790 XferElementGlue *self = (XferElementGlue *)elt;
792 /* upstream and downstream are now set, so we can point our fdp's to them */
793 if (self->write_fdp == &neighboring_element_fd)
794 self->write_fdp = &elt->downstream->input_fd;
795 if (self->read_fdp == &neighboring_element_fd)
796 self->read_fdp = &elt->upstream->output_fd;
798 if (self->need_thread)
799 self->thread = g_thread_create(worker_thread, (gpointer)self, FALSE, NULL);
801 /* we're active if we have a thread that will eventually die */
802 return self->need_thread;
810 XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
812 /* accept first, if required */
813 if (self->on_pull & PULL_ACCEPT_FIRST) {
814 /* don't accept the next time around */
815 self->on_pull &= ~PULL_ACCEPT_FIRST;
817 if (elt->cancelled) {
822 if ((self->input_data_socket = do_directtcp_accept(self,
823 &self->input_listen_socket)) == -1) {
824 /* do_directtcp_accept already signalled an error; xfer
830 /* read from this new socket */
831 self->read_fdp = &self->input_data_socket;
834 /* or connect first, if required */
835 if (self->on_pull & PULL_CONNECT_FIRST) {
836 /* don't connect the next time around */
837 self->on_pull &= ~PULL_CONNECT_FIRST;
839 if (elt->cancelled) {
844 if ((self->input_data_socket = do_directtcp_connect(self,
845 elt->upstream->output_listen_addrs)) == -1) {
846 /* do_directtcp_connect already signalled an error; xfer
852 /* read from this new socket */
853 self->read_fdp = &self->input_data_socket;
856 switch (self->on_pull) {
857 case PULL_FROM_RING_BUFFER: {
860 if (elt->cancelled) {
861 /* the finalize method will empty the ring buffer */
866 /* make sure there's at least one element available */
867 semaphore_down(self->ring_used_sem);
870 buf = self->ring[self->ring_tail].buf;
871 *size = self->ring[self->ring_tail].size;
872 self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
874 /* and mark this element as free to be overwritten */
875 semaphore_up(self->ring_free_sem);
881 int fd = *self->read_fdp;
882 char *buf = g_malloc(GLUE_BUFFER_SIZE);
885 if (elt->cancelled) {
887 xfer_element_drain_by_reading(fd);
890 *self->read_fdp = -1;
896 /* read from upstream */
897 len = full_read(fd, buf, GLUE_BUFFER_SIZE);
898 if (len < GLUE_BUFFER_SIZE) {
900 if (!elt->cancelled) {
901 xfer_cancel_with_error(elt,
902 _("Error reading from fd %d: %s"), fd, strerror(errno));
903 wait_until_xfer_cancelled(elt->xfer);
910 /* and finish off the upstream */
911 if (elt->expect_eof) {
912 xfer_element_drain_by_reading(fd);
915 *self->read_fdp = -1;
916 } else if (len == 0) {
922 /* signal EOF to downstream */
924 *self->read_fdp = -1;
935 g_assert_not_reached();
946 XferElementGlue *self = (XferElementGlue *)elt;
948 /* accept first, if required */
949 if (self->on_push & PUSH_ACCEPT_FIRST) {
950 /* don't accept the next time around */
951 self->on_push &= ~PUSH_ACCEPT_FIRST;
953 if (elt->cancelled) {
957 if ((self->output_data_socket = do_directtcp_accept(self,
958 &self->output_listen_socket)) == -1) {
959 /* do_directtcp_accept already signalled an error; xfer
964 /* write to this new socket */
965 self->write_fdp = &self->output_data_socket;
968 /* or connect first, if required */
969 if (self->on_push & PUSH_CONNECT_FIRST) {
970 /* don't accept the next time around */
971 self->on_push &= ~PUSH_CONNECT_FIRST;
973 if (elt->cancelled) {
977 if ((self->output_data_socket = do_directtcp_connect(self,
978 elt->downstream->input_listen_addrs)) == -1) {
979 /* do_directtcp_connect already signalled an error; xfer
984 /* read from this new socket */
985 self->write_fdp = &self->output_data_socket;
988 switch (self->on_push) {
989 case PUSH_TO_RING_BUFFER:
990 /* just drop packets if the transfer has been cancelled */
991 if (elt->cancelled) {
996 /* make sure there's at least one element free */
997 semaphore_down(self->ring_free_sem);
1000 self->ring[self->ring_head].buf = buf;
1001 self->ring[self->ring_head].size = len;
1002 self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
1004 /* and mark this element as available for reading */
1005 semaphore_up(self->ring_used_sem);
1010 int fd = *self->write_fdp;
1012 if (elt->cancelled) {
1013 if (!elt->expect_eof || !buf) {
1015 *self->write_fdp = -1;
1017 /* hack to ensure we won't close the fd again, if we get another push */
1018 elt->expect_eof = TRUE;
1026 /* write the full buffer to the fd, or close on EOF */
1028 if (full_write(fd, buf, len) < len) {
1029 if (!elt->cancelled) {
1030 xfer_cancel_with_error(elt,
1031 _("Error writing to fd %d: %s"), fd, strerror(errno));
1032 wait_until_xfer_cancelled(elt->xfer);
1034 /* nothing special to do to handle a cancellation */
1039 *self->write_fdp = -1;
1047 g_assert_not_reached();
1054 XferElementGlue *self)
1056 XferElement *elt = (XferElement *)self;
1057 elt->can_generate_eof = TRUE;
1058 self->pipe[0] = self->pipe[1] = -1;
1059 self->input_listen_socket = -1;
1060 self->output_listen_socket = -1;
1061 self->input_data_socket = -1;
1062 self->output_data_socket = -1;
1069 XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
1071 /* close our pipes if they're still open (they shouldn't be!) */
1072 if (self->pipe[0] != -1) close(self->pipe[0]);
1073 if (self->pipe[1] != -1) close(self->pipe[1]);
1074 if (self->input_listen_socket != -1) close(self->input_listen_socket);
1075 if (self->output_listen_socket != -1) close(self->output_listen_socket);
1076 if (self->input_data_socket != -1) close(self->input_data_socket);
1077 if (self->output_data_socket != -1) close(self->output_data_socket);
1080 /* empty the ring buffer, ignoring syncronization issues */
1081 while (self->ring_used_sem->value) {
1082 if (self->ring[self->ring_tail].buf)
1083 amfree(self->ring[self->ring_tail].buf);
1084 self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
1088 semaphore_free(self->ring_used_sem);
1089 semaphore_free(self->ring_free_sem);
1093 G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1096 static xfer_element_mech_pair_t _pairs[] = {
1097 { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
1098 { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
1099 { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
1100 { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
1101 { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
1103 { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
1104 { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
1105 { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
1106 { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* pipe + splice or copy*/
1107 { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy + pipe */
1109 { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
1110 { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
1111 { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
1112 { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 0 }, /* write on demand */
1113 { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 0 }, /* write on demand */
1115 { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
1116 { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
1117 { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
1118 { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 1 }, /* call and write */
1119 { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 1 }, /* call and write */
1121 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
1122 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
1123 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
1124 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
1125 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
1127 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
1128 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy + pipe */
1129 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
1130 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
1131 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
1134 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1136 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
1140 XferElementGlueClass * selfc)
1142 XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1143 GObjectClass *goc = G_OBJECT_CLASS(selfc);
1145 klass->setup = setup_impl;
1146 klass->start = start_impl;
1147 klass->push_buffer = push_buffer_impl;
1148 klass->pull_buffer = pull_buffer_impl;
1150 klass->perl_class = "Amanda::Xfer::Element::Glue";
1151 klass->mech_pairs = xfer_element_glue_mech_pairs;
1153 goc->finalize = finalize_impl;
1155 parent_class = g_type_class_peek_parent(selfc);
1159 xfer_element_glue_get_type (void)
1161 static GType type = 0;
1163 if G_UNLIKELY(type == 0) {
1164 static const GTypeInfo info = {
1165 sizeof (XferElementGlueClass),
1166 (GBaseInitFunc) NULL,
1167 (GBaseFinalizeFunc) NULL,
1168 (GClassInitFunc) class_init,
1169 (GClassFinalizeFunc) NULL,
1170 NULL /* class_data */,
1171 sizeof (XferElementGlue),
1172 0 /* n_preallocs */,
1173 (GInstanceInitFunc) instance_init,
1177 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
1183 /* create an element of this class; prototype is in xfer-element.h */
1185 xfer_element_glue(void)
1187 XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
1188 XferElement *elt = XFER_ELEMENT(self);