Imported Upstream version 3.2.0
[debian/amanda] / xfer-src / element-glue.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amanda.h"
23 #include "amxfer.h"
24 #include "element-glue.h"
25 #include "directtcp.h"
26 #include "util.h"
27 #include "sockaddr-util.h"
28
29 /*
30  * Instance definition
31  */
32
33 typedef struct XferElementGlue_ {
34     XferElement __parent__;
35
36     /* instructions to push_buffer_impl */
37     enum {
38         PUSH_TO_RING_BUFFER,
39         PUSH_TO_FD, /* write to write_fd */
40         PUSH_INVALID,
41
42         PUSH_ACCEPT_FIRST = (1 << 16),
43         PUSH_CONNECT_FIRST = (2 << 16),
44     } on_push;
45
46     /* instructions to pull_buffer_impl */
47     enum {
48         PULL_FROM_RING_BUFFER,
49         PULL_FROM_FD, /* read from read_fd */
50         PULL_INVALID,
51
52         PULL_ACCEPT_FIRST = (1 << 16),
53         PULL_CONNECT_FIRST = (2 << 16),
54     } on_pull;
55
56     int *write_fdp;
57     int *read_fdp;
58
59     gboolean need_thread;
60
61     /* the stuff we might use, depending on what flavor of glue we're
62      * providing.. */
63     int pipe[2];
64     int input_listen_socket, output_listen_socket;
65     int input_data_socket, output_data_socket;
66     int read_fd, write_fd;
67
68     /* a ring buffer of ptr/size pairs with semaphores */
69     struct { gpointer buf; size_t size; } *ring;
70     semaphore_t *ring_used_sem, *ring_free_sem;
71     gint ring_head, ring_tail;
72
73     GThread *thread;
74     GThreadFunc threadfunc;
75 } XferElementGlue;
76
77 /*
78  * Class definition
79  */
80
81 typedef struct XferElementGlueClass_ {
82     XferElementClass __parent__;
83 } XferElementGlueClass;
84
85 static GObjectClass *parent_class = NULL;
86
87 /*
88  * Utility functions, etc.
89  */
90
91 static void
92 make_pipe(
93     XferElementGlue *self)
94 {
95     if (pipe(self->pipe) < 0)
96         g_critical(_("Could not create pipe: %s"), strerror(errno));
97 }
98
99 static void
100 send_xfer_done(
101     XferElementGlue *self)
102 {
103     xfer_queue_message(XFER_ELEMENT(self)->xfer,
104             xmsg_new((XferElement *)self, XMSG_DONE, 0));
105 }
106
107 static gboolean
108 do_directtcp_listen(
109     XferElement *elt,
110     int *sockp,
111     DirectTCPAddr **addrsp)
112 {
113     int sock;
114     sockaddr_union addr;
115     DirectTCPAddr *addrs;
116     socklen_t len;
117
118     sock = *sockp = socket(AF_INET, SOCK_STREAM, 0);
119     if (sock < 0) {
120         xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
121         return FALSE;
122     }
123
124     if (listen(sock, 1) < 0) {
125         xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
126         return FALSE;
127     }
128
129     /* TODO: which addresses should this display? all ifaces? localhost? */
130     len = sizeof(addr);
131     if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
132         error("getsockname(): %s", strerror(errno));
133     g_assert(SU_GET_FAMILY(&addr) == AF_INET);
134
135     addrs = g_new0(DirectTCPAddr, 2);
136     addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); /* TODO: be smarter! */
137     addrs[0].port = SU_GET_PORT(&addr);
138     *addrsp = addrs;
139
140     return TRUE;
141 }
142
143 static gboolean
144 prolong_accept(
145     gpointer data)
146 {
147     return !XFER_ELEMENT(data)->cancelled;
148 }
149
150 static int
151 do_directtcp_accept(
152     XferElementGlue *self,
153     int *socketp)
154 {
155     int sock;
156     g_assert(*socketp != -1);
157
158     if ((sock = interruptible_accept(*socketp, NULL, NULL,
159                                      prolong_accept, self)) == -1) {
160         /* if the accept was interrupted due to a cancellation, then do not
161          * add a further error message */
162         if (errno == 0 && XFER_ELEMENT(self)->cancelled)
163             return -1;
164
165         xfer_cancel_with_error(XFER_ELEMENT(self),
166             _("Error accepting incoming connection: %s"), strerror(errno));
167         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
168         return -1;
169     }
170
171     /* close the listening socket now, for good measure */
172     close(*socketp);
173     *socketp = -1;
174
175     return sock;
176 }
177
178 static int
179 do_directtcp_connect(
180     XferElementGlue *self,
181     DirectTCPAddr *addrs)
182 {
183     XferElement *elt = XFER_ELEMENT(self);
184     sockaddr_union addr;
185     int sock;
186
187     if (!addrs) {
188         g_debug("element-glue got no directtcp addresses to connect to!");
189         if (!elt->cancelled) {
190             xfer_cancel_with_error(elt,
191                 "%s got no directtcp addresses to connect to",
192                 xfer_element_repr(elt));
193         }
194         goto cancel_wait;
195     }
196
197     /* set up the sockaddr -- IPv4 only */
198     SU_INIT(&addr, AF_INET);
199     SU_SET_PORT(&addr, addrs->port);
200     ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
201
202     g_debug("making data connection to %s", str_sockaddr(&addr));
203     sock = socket(AF_INET, SOCK_STREAM, 0);
204     if (sock < 0) {
205         xfer_cancel_with_error(elt,
206             "socket(): %s", strerror(errno));
207         goto cancel_wait;
208     }
209     if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
210         xfer_cancel_with_error(elt,
211             "connect(): %s", strerror(errno));
212         goto cancel_wait;
213     }
214
215     g_debug("connected to %s", str_sockaddr(&addr));
216
217     return sock;
218
219 cancel_wait:
220     wait_until_xfer_cancelled(elt->xfer);
221     return -1;
222 }
223
224 #define GLUE_BUFFER_SIZE 32768
225 #define GLUE_RING_BUFFER_SIZE 32
226
227 #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
228
229 /*
230  * fd handling
231  */
232
233 /* if self->read_fdp or self->write_fdp are pointing to this integer, then they
234  * should be redirected to point to the upstream's output_fd or downstream's
235  * input_fd, respectively, at the first call to get_read_fd or get_write_fd,
236  * respectively. */
237 static int neighboring_element_fd = -1;
238
239 #define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd)
240 static int
241 _get_read_fd(XferElementGlue *self)
242 {
243     if (!self->read_fdp)
244         return -1; /* shouldn't happen.. */
245
246     if (self->read_fdp == &neighboring_element_fd) {
247         XferElement *elt = XFER_ELEMENT(self);
248         self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1);
249     } else {
250         self->read_fd = *self->read_fdp;
251         *self->read_fdp = -1;
252     }
253     self->read_fdp = NULL;
254     return self->read_fd;
255 }
256
257 #define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd)
258 static int
259 _get_write_fd(XferElementGlue *self)
260 {
261     if (!self->write_fdp)
262         return -1; /* shouldn't happen.. */
263
264     if (self->write_fdp == &neighboring_element_fd) {
265         XferElement *elt = XFER_ELEMENT(self);
266         self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1);
267     } else {
268         self->write_fd = *self->write_fdp;
269         *self->write_fdp = -1;
270     }
271     self->write_fdp = NULL;
272     return self->write_fd;
273 }
274
275 static int
276 close_read_fd(XferElementGlue *self)
277 {
278     int fd = get_read_fd(self);
279     self->read_fd = -1;
280     return close(fd);
281 }
282
283 static int
284 close_write_fd(XferElementGlue *self)
285 {
286     int fd = get_write_fd(self);
287     self->write_fd = -1;
288     return close(fd);
289 }
290
291 /*
292  * Worker thread utility functions
293  */
294
295 static void
296 pull_and_write(XferElementGlue *self)
297 {
298     XferElement *elt = XFER_ELEMENT(self);
299     int fd = get_write_fd(self);
300     self->write_fdp = NULL;
301
302     while (!elt->cancelled) {
303         size_t len;
304         char *buf;
305
306         /* get a buffer from upstream */
307         buf = xfer_element_pull_buffer(elt->upstream, &len);
308         if (!buf)
309             break;
310
311         /* write it */
312         if (full_write(fd, buf, len) < len) {
313             if (!elt->cancelled) {
314                 xfer_cancel_with_error(elt,
315                     _("Error writing to fd %d: %s"), fd, strerror(errno));
316                 wait_until_xfer_cancelled(elt->xfer);
317             }
318             amfree(buf);
319             break;
320         }
321
322         amfree(buf);
323     }
324
325     if (elt->cancelled && elt->expect_eof)
326         xfer_element_drain_by_pulling(elt->upstream);
327
328     /* close the fd we've been writing, as an EOF signal to downstream, and
329      * set it to -1 to avoid accidental re-use */
330     close_write_fd(self);
331 }
332
333 static void
334 read_and_write(XferElementGlue *self)
335 {
336     XferElement *elt = XFER_ELEMENT(self);
337     /* dynamically allocate a buffer, in case this thread has
338      * a limited amount of stack allocated */
339     char *buf = g_malloc(GLUE_BUFFER_SIZE);
340     int rfd = get_read_fd(self);
341     int wfd = get_write_fd(self);
342
343     while (!elt->cancelled) {
344         size_t len;
345
346         /* read from upstream */
347         len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
348         if (len < GLUE_BUFFER_SIZE) {
349             if (errno) {
350                 if (!elt->cancelled) {
351                     xfer_cancel_with_error(elt,
352                         _("Error reading from fd %d: %s"), rfd, strerror(errno));
353                     wait_until_xfer_cancelled(elt->xfer);
354                 }
355                 break;
356             } else if (len == 0) { /* we only count a zero-length read as EOF */
357                 break;
358             }
359         }
360
361         /* write the buffer fully */
362         if (full_write(wfd, buf, len) < len) {
363             if (!elt->cancelled) {
364                 xfer_cancel_with_error(elt,
365                     _("Could not write to fd %d: %s"), wfd, strerror(errno));
366                 wait_until_xfer_cancelled(elt->xfer);
367             }
368             break;
369         }
370     }
371
372     if (elt->cancelled && elt->expect_eof)
373         xfer_element_drain_by_reading(rfd);
374
375     /* close the read fd.  If it's not at EOF, then upstream will get EPIPE, which will hopefully
376      * kill it and complete the cancellation */
377     close_read_fd(self);
378
379     /* close the fd we've been writing, as an EOF signal to downstream */
380     close_write_fd(self);
381
382     amfree(buf);
383 }
384
385 static void
386 read_and_push(
387     XferElementGlue *self)
388 {
389     XferElement *elt = XFER_ELEMENT(self);
390     int fd = get_read_fd(self);
391
392     while (!elt->cancelled) {
393         char *buf = g_malloc(GLUE_BUFFER_SIZE);
394         size_t len;
395
396         /* read a buffer from upstream */
397         len = full_read(fd, buf, GLUE_BUFFER_SIZE);
398         if (len < GLUE_BUFFER_SIZE) {
399             if (errno) {
400                 if (!elt->cancelled) {
401                     int saved_errno = errno;
402                     xfer_cancel_with_error(elt,
403                         _("Error reading from fd %d: %s"), fd, strerror(saved_errno));
404                     g_debug("element-glue: error reading from fd %d: %s",
405                             fd, strerror(saved_errno));
406                     wait_until_xfer_cancelled(elt->xfer);
407                 }
408                 break;
409             } else if (len == 0) { /* we only count a zero-length read as EOF */
410                 amfree(buf);
411                 break;
412             }
413         }
414
415         xfer_element_push_buffer(elt->downstream, buf, len);
416     }
417
418     if (elt->cancelled && elt->expect_eof)
419         xfer_element_drain_by_reading(fd);
420
421     /* send an EOF indication downstream */
422     xfer_element_push_buffer(elt->downstream, NULL, 0);
423
424     /* close the read fd, since it's at EOF */
425     close_read_fd(self);
426 }
427
428 static void
429 pull_and_push(XferElementGlue *self)
430 {
431     XferElement *elt = XFER_ELEMENT(self);
432     gboolean eof_sent = FALSE;
433
434     while (!elt->cancelled) {
435         char *buf;
436         size_t len;
437
438         /* get a buffer from upstream */
439         buf = xfer_element_pull_buffer(elt->upstream, &len);
440
441         /* and push it downstream */
442         xfer_element_push_buffer(elt->downstream, buf, len);
443
444         if (!buf) {
445             eof_sent = TRUE;
446             break;
447         }
448     }
449
450     if (elt->cancelled && elt->expect_eof)
451         xfer_element_drain_by_pulling(elt->upstream);
452
453     if (!eof_sent)
454         xfer_element_push_buffer(elt->downstream, NULL, 0);
455 }
456
457 static gpointer
458 worker_thread(
459     gpointer data)
460 {
461     XferElement *elt = XFER_ELEMENT(data);
462     XferElementGlue *self = XFER_ELEMENT_GLUE(data);
463
464     switch (mech_pair(elt->input_mech, elt->output_mech)) {
465     case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
466         read_and_write(self);
467         break;
468
469     case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
470     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
471         read_and_push(self);
472         break;
473
474     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
475     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
476         pull_and_write(self);
477         break;
478
479     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
480         pull_and_push(self);
481         break;
482
483     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
484     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
485         if ((self->output_data_socket = do_directtcp_connect(self,
486                                     elt->downstream->input_listen_addrs)) == -1)
487             break;
488         self->write_fdp = &self->output_data_socket;
489         read_and_write(self);
490         break;
491
492     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
493         if ((self->output_data_socket = do_directtcp_connect(self,
494                                     elt->downstream->input_listen_addrs)) == -1)
495             break;
496         self->write_fdp = &self->output_data_socket;
497         pull_and_write(self);
498         break;
499
500     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
501     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
502         if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
503             break;
504         self->read_fdp = &self->input_data_socket;
505         read_and_write(self);
506         break;
507
508     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
509         if ((self->input_data_socket = do_directtcp_accept(self,
510                                             &self->input_listen_socket)) == -1)
511             break;
512         self->read_fdp = &self->input_data_socket;
513         read_and_push(self);
514         break;
515
516     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
517     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
518     case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
519     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
520     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
521     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
522     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
523     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
524     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
525     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
526     default:
527         g_assert_not_reached();
528         break;
529
530     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
531     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
532         if ((self->output_data_socket = do_directtcp_accept(self,
533                                             &self->output_listen_socket)) == -1)
534             break;
535         self->write_fdp = &self->output_data_socket;
536         read_and_write(self);
537         break;
538
539     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
540     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
541         if ((self->input_data_socket = do_directtcp_connect(self,
542                                     elt->upstream->output_listen_addrs)) == -1)
543             break;
544         self->read_fdp = &self->input_data_socket;
545         read_and_write(self);
546         break;
547
548     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
549         if ((self->input_data_socket = do_directtcp_connect(self,
550                                     elt->upstream->output_listen_addrs)) == -1)
551             break;
552         self->read_fdp = &self->input_data_socket;
553         read_and_push(self);
554         break;
555
556     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
557         if ((self->output_data_socket = do_directtcp_accept(self,
558                                             &self->output_listen_socket)) == -1)
559             break;
560         self->write_fdp = &self->output_data_socket;
561         pull_and_write(self);
562         break;
563
564     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
565         /* TODO: use async accept's here to avoid order dependency */
566         if ((self->output_data_socket = do_directtcp_accept(self,
567                                             &self->output_listen_socket)) == -1)
568             break;
569         self->write_fdp = &self->output_data_socket;
570         if ((self->input_data_socket = do_directtcp_accept(self,
571                                             &self->input_listen_socket)) == -1)
572             break;
573         self->read_fdp = &self->input_data_socket;
574         read_and_write(self);
575         break;
576
577     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
578         /* TODO: use async connects and select() to avoid order dependency here */
579         if ((self->input_data_socket = do_directtcp_connect(self,
580                                     elt->upstream->output_listen_addrs)) == -1)
581             break;
582         self->read_fdp = &self->input_data_socket;
583         if ((self->output_data_socket = do_directtcp_connect(self,
584                                     elt->downstream->input_listen_addrs)) == -1)
585             break;
586         self->write_fdp = &self->output_data_socket;
587         read_and_write(self);
588         break;
589     }
590
591     send_xfer_done(self);
592
593     return NULL;
594 }
595
596 /*
597  * Implementation
598  */
599
600 static gboolean
601 setup_impl(
602     XferElement *elt)
603 {
604     XferElementGlue *self = (XferElementGlue *)elt;
605     gboolean need_ring = FALSE;
606     gboolean need_listen_input = FALSE;
607     gboolean need_listen_output = FALSE;
608
609     g_assert(elt->input_mech != XFER_MECH_NONE);
610     g_assert(elt->output_mech != XFER_MECH_NONE);
611     g_assert(elt->input_mech != elt->output_mech);
612
613     self->read_fdp = NULL;
614     self->write_fdp = NULL;
615     self->on_push = PUSH_INVALID;
616     self->on_pull = PULL_INVALID;
617     self->need_thread = FALSE;
618
619     switch (mech_pair(elt->input_mech, elt->output_mech)) {
620     case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
621         /* thread will read from one fd and write to the other */
622         self->read_fdp = &neighboring_element_fd;
623         self->write_fdp = &neighboring_element_fd;
624         self->need_thread = TRUE;
625         break;
626
627     case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
628         /* thread will read from one fd and call push_buffer downstream */
629         self->read_fdp = &neighboring_element_fd;
630         self->need_thread = TRUE;
631         break;
632
633     case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
634         self->read_fdp = &neighboring_element_fd;
635         self->on_pull = PULL_FROM_FD;
636         break;
637
638     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
639         /* thread will connect for output, then read from fd and write to the
640          * socket. */
641         self->read_fdp = &neighboring_element_fd;
642         self->need_thread = TRUE;
643         break;
644
645     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
646         /* thread will accept output conn, then read from upstream and write to socket */
647         self->read_fdp = &neighboring_element_fd;
648         self->need_thread = TRUE;
649         need_listen_output = TRUE;
650         break;
651
652     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
653         make_pipe(self);
654         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
655         self->pipe[1] = -1; /* upstream will close this for us */
656         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
657         self->pipe[0] = -1; /* downstream will close this for us */
658         break;
659
660     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
661         /* thread will read from pipe and call downstream's push_buffer */
662         make_pipe(self);
663         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
664         self->pipe[1] = -1; /* upstream will close this for us */
665         self->read_fdp = &self->pipe[0];
666         self->need_thread = TRUE;
667         break;
668
669     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
670         make_pipe(self);
671         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
672         self->pipe[1] = -1; /* upstream will close this for us */
673         self->on_pull = PULL_FROM_FD;
674         self->read_fdp = &self->pipe[0];
675         break;
676
677     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
678         /* thread will connect for output, then read from pipe and write to socket */
679         make_pipe(self);
680         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
681         self->pipe[1] = -1; /* upstream will close this for us */
682         self->read_fdp = &self->pipe[0];
683         self->need_thread = TRUE;
684         break;
685
686     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
687         /* thread will accept output conn, then read from pipe and write to socket */
688         make_pipe(self);
689         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
690         self->pipe[1] = -1; /* upstream will close this for us */
691         self->read_fdp = &self->pipe[0];
692         self->need_thread = TRUE;
693         need_listen_output = TRUE;
694         break;
695
696     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
697         make_pipe(self);
698         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
699         self->pipe[0] = -1; /* downstream will close this for us */
700         self->on_push = PUSH_TO_FD;
701         self->write_fdp = &self->pipe[1];
702         break;
703
704     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
705         self->on_push = PUSH_TO_FD;
706         self->write_fdp = &neighboring_element_fd;
707         break;
708
709     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
710         self->on_push = PUSH_TO_RING_BUFFER;
711         self->on_pull = PULL_FROM_RING_BUFFER;
712         need_ring = TRUE;
713         break;
714
715     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
716         /* push will connect for output first */
717         self->on_push = PUSH_TO_FD | PUSH_CONNECT_FIRST;
718         break;
719
720     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
721         /* push will accept for output first */
722         self->on_push = PUSH_TO_FD | PUSH_ACCEPT_FIRST;
723         need_listen_output = TRUE;
724         break;
725
726     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
727         /* thread will pull from upstream and write to pipe */
728         make_pipe(self);
729         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
730         self->pipe[0] = -1; /* downstream will close this for us */
731         self->write_fdp = &self->pipe[1];
732         self->need_thread = TRUE;
733         break;
734
735     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
736         /* thread will pull from upstream and write to downstream */
737         self->write_fdp = &neighboring_element_fd;
738         self->need_thread = TRUE;
739         break;
740
741     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
742         /* thread will pull from upstream and push to downstream */
743         self->need_thread = TRUE;
744         break;
745
746     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
747         /* thread will connect for output, then pull from upstream and write to socket */
748         self->need_thread = TRUE;
749         break;
750
751     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
752         /* thread will accept for output, then pull from upstream and write to socket */
753         self->need_thread = TRUE;
754         need_listen_output = TRUE;
755         break;
756
757     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
758         /* thread will accept for input, then read from socket and write to pipe */
759         make_pipe(self);
760         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
761         self->pipe[0] = -1; /* downstream will close this for us */
762         self->write_fdp = &self->pipe[1];
763         self->need_thread = TRUE;
764         need_listen_input = TRUE;
765         break;
766
767     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
768         /* thread will accept for input, then read from socket and write to downstream */
769         self->write_fdp = &neighboring_element_fd;
770         self->need_thread = TRUE;
771         need_listen_input = TRUE;
772         break;
773
774     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
775         /* thread will accept for input, then read from socket and push downstream */
776         self->need_thread = TRUE;
777         need_listen_input = TRUE;
778         break;
779
780     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
781         /* first pull will accept for input, then read from socket */
782         self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
783         need_listen_input = TRUE;
784         break;
785
786     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
787         /* thread will accept on both sides, then copy from socket to socket */
788         self->need_thread = TRUE;
789         need_listen_input = TRUE;
790         need_listen_output = TRUE;
791         break;
792
793     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
794         /* thread will connect for input, then read from socket and write to pipe */
795         make_pipe(self);
796         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
797         self->pipe[0] = -1; /* downstream will close this for us */
798         self->write_fdp = &self->pipe[1];
799         self->need_thread = TRUE;
800         break;
801
802     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
803         /* thread will connect for input, then read from socket and write to downstream */
804         self->write_fdp = &neighboring_element_fd;
805         self->need_thread = TRUE;
806         break;
807
808     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
809         /* thread will connect for input, then read from socket and push downstream */
810         self->need_thread = TRUE;
811         break;
812
813     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
814         /* first pull will connect for input, then read from socket */
815         self->on_pull = PULL_FROM_FD | PULL_CONNECT_FIRST;
816         break;
817
818     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
819         /* thread will connect on both sides, then copy from socket to socket */
820         self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
821         self->need_thread = TRUE;
822         break;
823
824     default:
825         g_assert_not_reached();
826         break;
827     }
828
829     /* set up ring if desired */
830     if (need_ring) {
831         self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
832         self->ring_used_sem = semaphore_new_with_value(0);
833         self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
834     }
835
836     if (need_listen_input) {
837         if (!do_directtcp_listen(elt,
838                     &self->input_listen_socket, &elt->input_listen_addrs))
839             return FALSE;
840     }
841     if (need_listen_output) {
842         if (!do_directtcp_listen(elt,
843                     &self->output_listen_socket, &elt->output_listen_addrs))
844             return FALSE;
845     }
846
847     return TRUE;
848 }
849
850 static gboolean
851 start_impl(
852     XferElement *elt)
853 {
854     XferElementGlue *self = (XferElementGlue *)elt;
855
856     if (self->need_thread)
857         self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL);
858
859     /* we're active if we have a thread that will eventually die */
860     return self->need_thread;
861 }
862
863 static gpointer
864 pull_buffer_impl(
865     XferElement *elt,
866     size_t *size)
867 {
868     XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
869
870     /* accept first, if required */
871     if (self->on_pull & PULL_ACCEPT_FIRST) {
872         /* don't accept the next time around */
873         self->on_pull &= ~PULL_ACCEPT_FIRST;
874
875         if (elt->cancelled) {
876             *size = 0;
877             return NULL;
878         }
879
880         if ((self->input_data_socket = do_directtcp_accept(self,
881                                             &self->input_listen_socket)) == -1) {
882             /* do_directtcp_accept already signalled an error; xfer
883              * is cancelled */
884             *size = 0;
885             return NULL;
886         }
887
888         /* read from this new socket */
889         self->read_fdp = &self->input_data_socket;
890     }
891
892     /* or connect first, if required */
893     if (self->on_pull & PULL_CONNECT_FIRST) {
894         /* don't connect the next time around */
895         self->on_pull &= ~PULL_CONNECT_FIRST;
896
897         if (elt->cancelled) {
898             *size = 0;
899             return NULL;
900         }
901
902         if ((self->input_data_socket = do_directtcp_connect(self,
903                                     elt->upstream->output_listen_addrs)) == -1) {
904             /* do_directtcp_connect already signalled an error; xfer
905              * is cancelled */
906             *size = 0;
907             return NULL;
908         }
909
910         /* read from this new socket */
911         self->read_fdp = &self->input_data_socket;
912     }
913
914     switch (self->on_pull) {
915         case PULL_FROM_RING_BUFFER: {
916             gpointer buf;
917
918             if (elt->cancelled) {
919                 /* the finalize method will empty the ring buffer */
920                 *size = 0;
921                 return NULL;
922             }
923
924             /* make sure there's at least one element available */
925             semaphore_down(self->ring_used_sem);
926
927             /* get it */
928             buf = self->ring[self->ring_tail].buf;
929             *size = self->ring[self->ring_tail].size;
930             self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
931
932             /* and mark this element as free to be overwritten */
933             semaphore_up(self->ring_free_sem);
934
935             return buf;
936         }
937
938         case PULL_FROM_FD: {
939             int fd = get_read_fd(self);
940             char *buf = g_malloc(GLUE_BUFFER_SIZE);
941             ssize_t len;
942
943             /* if the fd is already closed, it's possible upstream bailed out
944              * so quickly that we didn't even get a look at the fd */
945             if (elt->cancelled || fd == -1) {
946                 if (fd != -1) {
947                     if (elt->expect_eof)
948                         xfer_element_drain_by_reading(fd);
949
950                     close_read_fd(self);
951                 }
952
953                 *size = 0;
954                 return NULL;
955             }
956
957             /* read from upstream */
958             len = full_read(fd, buf, GLUE_BUFFER_SIZE);
959             if (len < GLUE_BUFFER_SIZE) {
960                 if (errno) {
961                     if (!elt->cancelled) {
962                         xfer_cancel_with_error(elt,
963                             _("Error reading from fd %d: %s"), fd, strerror(errno));
964                         wait_until_xfer_cancelled(elt->xfer);
965                     }
966
967                     /* return an EOF */
968                     amfree(buf);
969                     len = 0;
970
971                     /* and finish off the upstream */
972                     if (elt->expect_eof) {
973                         xfer_element_drain_by_reading(fd);
974                     }
975                     close_read_fd(self);
976                 } else if (len == 0) {
977                     /* EOF */
978                     g_free(buf);
979                     buf = NULL;
980                     *size = 0;
981
982                     /* signal EOF to downstream */
983                     close_read_fd(self);
984                 }
985             }
986
987             *size = (size_t)len;
988
989             return buf;
990         }
991
992         default:
993         case PULL_INVALID:
994             g_assert_not_reached();
995             return NULL;
996     }
997 }
998
999 static void
1000 push_buffer_impl(
1001     XferElement *elt,
1002     gpointer buf,
1003     size_t len)
1004 {
1005     XferElementGlue *self = (XferElementGlue *)elt;
1006
1007     /* accept first, if required */
1008     if (self->on_push & PUSH_ACCEPT_FIRST) {
1009         /* don't accept the next time around */
1010         self->on_push &= ~PUSH_ACCEPT_FIRST;
1011
1012         if (elt->cancelled) {
1013             return;
1014         }
1015
1016         if ((self->output_data_socket = do_directtcp_accept(self,
1017                                             &self->output_listen_socket)) == -1) {
1018             /* do_directtcp_accept already signalled an error; xfer
1019              * is cancelled */
1020             return;
1021         }
1022
1023         /* write to this new socket */
1024         self->write_fdp = &self->output_data_socket;
1025     }
1026
1027     /* or connect first, if required */
1028     if (self->on_push & PUSH_CONNECT_FIRST) {
1029         /* don't accept the next time around */
1030         self->on_push &= ~PUSH_CONNECT_FIRST;
1031
1032         if (elt->cancelled) {
1033             return;
1034         }
1035
1036         if ((self->output_data_socket = do_directtcp_connect(self,
1037                                     elt->downstream->input_listen_addrs)) == -1) {
1038             /* do_directtcp_connect already signalled an error; xfer
1039              * is cancelled */
1040             return;
1041         }
1042
1043         /* read from this new socket */
1044         self->write_fdp = &self->output_data_socket;
1045     }
1046
1047     switch (self->on_push) {
1048         case PUSH_TO_RING_BUFFER:
1049             /* just drop packets if the transfer has been cancelled */
1050             if (elt->cancelled) {
1051                 amfree(buf);
1052                 return;
1053             }
1054
1055             /* make sure there's at least one element free */
1056             semaphore_down(self->ring_free_sem);
1057
1058             /* set it */
1059             self->ring[self->ring_head].buf = buf;
1060             self->ring[self->ring_head].size = len;
1061             self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
1062
1063             /* and mark this element as available for reading */
1064             semaphore_up(self->ring_used_sem);
1065
1066             return;
1067
1068         case PUSH_TO_FD: {
1069             int fd = get_write_fd(self);
1070
1071             /* if the fd is already closed, it's possible upstream bailed out
1072              * so quickly that we didn't even get a look at the fd.  In this
1073              * case we can assume the xfer has been cancelled and just discard
1074              * the data. */
1075             if (fd == -1)
1076                 return;
1077
1078             if (elt->cancelled) {
1079                 if (!elt->expect_eof || !buf) {
1080                     close_write_fd(self);
1081
1082                     /* hack to ensure we won't close the fd again, if we get another push */
1083                     elt->expect_eof = TRUE;
1084                 }
1085
1086                 amfree(buf);
1087
1088                 return;
1089             }
1090
1091             /* write the full buffer to the fd, or close on EOF */
1092             if (buf) {
1093                 if (full_write(fd, buf, len) < len) {
1094                     if (!elt->cancelled) {
1095                         xfer_cancel_with_error(elt,
1096                             _("Error writing to fd %d: %s"), fd, strerror(errno));
1097                         wait_until_xfer_cancelled(elt->xfer);
1098                     }
1099                     /* nothing special to do to handle a cancellation */
1100                 }
1101                 amfree(buf);
1102             } else {
1103                 close_write_fd(self);
1104             }
1105
1106             return;
1107         }
1108
1109         default:
1110         case PUSH_INVALID:
1111             g_assert_not_reached();
1112             break;
1113     }
1114 }
1115
1116 static void
1117 instance_init(
1118     XferElementGlue *self)
1119 {
1120     XferElement *elt = (XferElement *)self;
1121     elt->can_generate_eof = TRUE;
1122     self->pipe[0] = self->pipe[1] = -1;
1123     self->input_listen_socket = -1;
1124     self->output_listen_socket = -1;
1125     self->input_data_socket = -1;
1126     self->output_data_socket = -1;
1127     self->read_fd = -1;
1128     self->write_fd = -1;
1129 }
1130
1131 static void
1132 finalize_impl(
1133     GObject * obj_self)
1134 {
1135     XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
1136
1137     /* first make sure the worker thread has finished up */
1138     if (self->thread)
1139         g_thread_join(self->thread);
1140
1141     /* close our pipes and fd's if they're still open */
1142     if (self->pipe[0] != -1) close(self->pipe[0]);
1143     if (self->pipe[1] != -1) close(self->pipe[1]);
1144     if (self->input_listen_socket != -1) close(self->input_listen_socket);
1145     if (self->output_listen_socket != -1) close(self->output_listen_socket);
1146     if (self->input_data_socket != -1) close(self->input_data_socket);
1147     if (self->output_data_socket != -1) close(self->output_data_socket);
1148     if (self->read_fd != -1) close(self->read_fd);
1149     if (self->write_fd != -1) close(self->write_fd);
1150
1151     if (self->ring) {
1152         /* empty the ring buffer, ignoring syncronization issues */
1153         while (self->ring_used_sem->value) {
1154             if (self->ring[self->ring_tail].buf)
1155                 amfree(self->ring[self->ring_tail].buf);
1156             self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
1157         }
1158
1159         amfree(self->ring);
1160         semaphore_free(self->ring_used_sem);
1161         semaphore_free(self->ring_free_sem);
1162     }
1163
1164     /* chain up */
1165     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1166 }
1167
1168 static xfer_element_mech_pair_t _pairs[] = {
1169     { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
1170     { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
1171     { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
1172     { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
1173     { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
1174
1175     { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
1176     { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
1177     { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
1178     { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* pipe + splice or copy*/
1179     { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy + pipe */
1180
1181     { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
1182     { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
1183     { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
1184     { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 0 }, /* write on demand */
1185     { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 0 }, /* write on demand */
1186
1187     { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
1188     { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
1189     { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
1190     { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 1 }, /* call and write */
1191     { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 1 }, /* call and write */
1192
1193     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
1194     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
1195     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
1196     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
1197     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
1198
1199     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
1200     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy + pipe */
1201     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
1202     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
1203     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy  */
1204
1205     /* terminator */
1206     { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1207 };
1208 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
1209
1210 static void
1211 class_init(
1212     XferElementGlueClass * selfc)
1213 {
1214     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1215     GObjectClass *goc = G_OBJECT_CLASS(selfc);
1216
1217     klass->setup = setup_impl;
1218     klass->start = start_impl;
1219     klass->push_buffer = push_buffer_impl;
1220     klass->pull_buffer = pull_buffer_impl;
1221
1222     klass->perl_class = "Amanda::Xfer::Element::Glue";
1223     klass->mech_pairs = xfer_element_glue_mech_pairs;
1224
1225     goc->finalize = finalize_impl;
1226
1227     parent_class = g_type_class_peek_parent(selfc);
1228 }
1229
1230 GType
1231 xfer_element_glue_get_type (void)
1232 {
1233     static GType type = 0;
1234
1235     if G_UNLIKELY(type == 0) {
1236         static const GTypeInfo info = {
1237             sizeof (XferElementGlueClass),
1238             (GBaseInitFunc) NULL,
1239             (GBaseFinalizeFunc) NULL,
1240             (GClassInitFunc) class_init,
1241             (GClassFinalizeFunc) NULL,
1242             NULL /* class_data */,
1243             sizeof (XferElementGlue),
1244             0 /* n_preallocs */,
1245             (GInstanceInitFunc) instance_init,
1246             NULL
1247         };
1248
1249         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
1250     }
1251
1252     return type;
1253 }
1254
1255 /* create an element of this class; prototype is in xfer-element.h */
1256 XferElement *
1257 xfer_element_glue(void)
1258 {
1259     XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
1260     XferElement *elt = XFER_ELEMENT(self);
1261
1262     return elt;
1263 }