29e639ad5aac16ff89bf6421b300a5138c1cd9e4
[debian/amanda] / xfer-src / element-glue.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amxfer.h"
23 #include "element-glue.h"
24 #include "amanda.h"
25 #include "directtcp.h"
26 #include "sockaddr-util.h"
27
28 /*
29  * Instance definition
30  */
31
32 typedef struct XferElementGlue_ {
33     XferElement __parent__;
34
35     /* instructions to push_buffer_impl */
36     enum {
37         PUSH_TO_RING_BUFFER,
38         PUSH_TO_FD, /* write to *write_fdp */
39         PUSH_INVALID,
40
41         PUSH_ACCEPT_FIRST = (1 << 16),
42         PUSH_CONNECT_FIRST = (2 << 16),
43     } on_push;
44
45     /* instructions to pull_buffer_impl */
46     enum {
47         PULL_FROM_RING_BUFFER,
48         PULL_FROM_FD, /* read from *read_fdp */
49         PULL_INVALID,
50
51         PULL_ACCEPT_FIRST = (1 << 16),
52         PULL_CONNECT_FIRST = (2 << 16),
53     } on_pull;
54
55     int *write_fdp;
56     int *read_fdp;
57
58     gboolean need_thread;
59
60     /* the stuff we might use, depending on what flavor of glue we're
61      * providing.. */
62     int pipe[2];
63     int input_listen_socket, output_listen_socket;
64     int input_data_socket, output_data_socket;
65
66     /* a ring buffer of ptr/size pairs with semaphores */
67     struct { gpointer buf; size_t size; } *ring;
68     semaphore_t *ring_used_sem, *ring_free_sem;
69     gint ring_head, ring_tail;
70
71     GThread *thread;
72     GThreadFunc threadfunc;
73 } XferElementGlue;
74
75 /*
76  * Class definition
77  */
78
79 typedef struct XferElementGlueClass_ {
80     XferElementClass __parent__;
81 } XferElementGlueClass;
82
83 static GObjectClass *parent_class = NULL;
84
85 /*
86  * Utility functions, etc.
87  */
88
89 static void
90 make_pipe(
91     XferElementGlue *self)
92 {
93     if (pipe(self->pipe) < 0)
94         g_critical(_("Could not create pipe: %s"), strerror(errno));
95 }
96
97 static void
98 send_xfer_done(
99     XferElementGlue *self)
100 {
101     xfer_queue_message(XFER_ELEMENT(self)->xfer,
102             xmsg_new((XferElement *)self, XMSG_DONE, 0));
103 }
104
105 static gboolean
106 do_directtcp_listen(
107     XferElement *elt,
108     int *sockp,
109     DirectTCPAddr **addrsp)
110 {
111     int sock;
112     sockaddr_union addr;
113     DirectTCPAddr *addrs;
114     socklen_t len;
115
116     sock = *sockp = socket(AF_INET, SOCK_STREAM, 0);
117     if (sock < 0) {
118         xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
119         return FALSE;
120     }
121
122     if (listen(sock, 1) < 0) {
123         xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
124         return FALSE;
125     }
126
127     /* TODO: which addresses should this display? all ifaces? localhost? */
128     len = sizeof(addr);
129     if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
130         error("getsockname(): %s", strerror(errno));
131     g_assert(SU_GET_FAMILY(&addr) == AF_INET);
132
133     addrs = g_new0(DirectTCPAddr, 2);
134     addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); /* TODO: be smarter! */
135     addrs[0].port = SU_GET_PORT(&addr);
136     *addrsp = addrs;
137
138     return TRUE;
139 }
140
141 static int
142 do_directtcp_accept(
143     XferElementGlue *self,
144     int *socketp)
145 {
146     int sock;
147     g_assert(*socketp != -1);
148
149     if ((sock = accept(*socketp, NULL, NULL)) == -1) {
150         xfer_cancel_with_error(XFER_ELEMENT(self),
151             _("Error accepting incoming connection: %s"), strerror(errno));
152         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
153         return -1;
154     }
155
156     /* close the listening socket now, for good measure */
157     close(*socketp);
158     *socketp = -1;
159
160     return sock;
161 }
162
163 static int
164 do_directtcp_connect(
165     XferElementGlue *self,
166     DirectTCPAddr *addrs)
167 {
168     XferElement *elt = XFER_ELEMENT(self);
169     sockaddr_union addr;
170     int sock;
171
172     if (!addrs) {
173         g_debug("element-glue got no directtcp addresses to connect to!");
174         if (!elt->cancelled) {
175             xfer_cancel_with_error(elt,
176                 "%s got no directtcp addresses to connect to",
177                 xfer_element_repr(elt));
178         }
179         goto cancel_wait;
180     }
181
182     /* set up the sockaddr -- IPv4 only */
183     SU_INIT(&addr, AF_INET);
184     SU_SET_PORT(&addr, addrs->port);
185     ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
186
187     g_debug("making data connection to %s", str_sockaddr(&addr));
188     sock = socket(AF_INET, SOCK_STREAM, 0);
189     if (sock < 0) {
190         xfer_cancel_with_error(elt,
191             "socket(): %s", strerror(errno));
192         goto cancel_wait;
193     }
194     if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
195         xfer_cancel_with_error(elt,
196             "connect(): %s", strerror(errno));
197         goto cancel_wait;
198     }
199
200     g_debug("connected to %s", str_sockaddr(&addr));
201
202     return sock;
203
204 cancel_wait:
205     wait_until_xfer_cancelled(elt->xfer);
206     return -1;
207 }
208
209 #define GLUE_BUFFER_SIZE 32768
210 #define GLUE_RING_BUFFER_SIZE 32
211
212 #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
213
214 /* if self->read_fdp or self->write_fdp are pointing to this integer, then
215  * they should be redirected to point to the upstream's output_fd or
216  * downstream's input_fd, respectively, at start() */
217 static int neighboring_element_fd = -1;
218
219 /*
220  * Worker thread utility functions
221  */
222
223 static void
224 pull_and_write(XferElementGlue *self)
225 {
226     XferElement *elt = XFER_ELEMENT(self);
227     int fd = *self->write_fdp;
228
229     while (!elt->cancelled) {
230         size_t len;
231         char *buf;
232
233         /* get a buffer from upstream */
234         buf = xfer_element_pull_buffer(elt->upstream, &len);
235         if (!buf)
236             break;
237
238         /* write it */
239         if (full_write(fd, buf, len) < len) {
240             if (!elt->cancelled) {
241                 xfer_cancel_with_error(elt,
242                     _("Error writing to fd %d: %s"), fd, strerror(errno));
243                 wait_until_xfer_cancelled(elt->xfer);
244             }
245             amfree(buf);
246             break;
247         }
248
249         amfree(buf);
250     }
251
252     if (elt->cancelled && elt->expect_eof)
253         xfer_element_drain_by_pulling(elt->upstream);
254
255     /* close the fd we've been writing, as an EOF signal to downstream, and
256      * set it to -1 to avoid accidental re-use */
257     close(fd);
258     *self->write_fdp = -1;
259 }
260
261 static void
262 read_and_write(XferElementGlue *self)
263 {
264     XferElement *elt = XFER_ELEMENT(self);
265     int rfd = *self->read_fdp;
266     int wfd = *self->write_fdp;
267
268     /* dynamically allocate a buffer, in case this thread has
269      * a limited amount of stack allocated */
270     char *buf = g_malloc(GLUE_BUFFER_SIZE);
271
272     while (!elt->cancelled) {
273         size_t len;
274
275         /* read from upstream */
276         len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
277         if (len < GLUE_BUFFER_SIZE) {
278             if (errno) {
279                 if (!elt->cancelled) {
280                     xfer_cancel_with_error(elt,
281                         _("Error reading from fd %d: %s"), rfd, strerror(errno));
282                     wait_until_xfer_cancelled(elt->xfer);
283                 }
284                 break;
285             } else if (len == 0) { /* we only count a zero-length read as EOF */
286                 break;
287             }
288         }
289
290         /* write the buffer fully */
291         if (full_write(wfd, buf, len) < len) {
292             if (!elt->cancelled) {
293                 xfer_cancel_with_error(elt,
294                     _("Could not write to fd %d: %s"), wfd, strerror(errno));
295                 wait_until_xfer_cancelled(elt->xfer);
296             }
297             break;
298         }
299     }
300
301     if (elt->cancelled && elt->expect_eof)
302         xfer_element_drain_by_pulling(elt->upstream);
303
304     /* close the read fd, if it's at EOF, and set it to -1 to avoid accidental
305      * re-use */
306     if (!elt->cancelled || elt->expect_eof) {
307         close(rfd);
308         *self->read_fdp = -1;
309     }
310
311     /* close the fd we've been writing, as an EOF signal to downstream, and
312      * set it to -1 to avoid accidental re-use */
313     close(wfd);
314     *self->write_fdp = -1;
315
316     amfree(buf);
317 }
318
319 static void
320 read_and_push(
321     XferElementGlue *self)
322 {
323     XferElement *elt = XFER_ELEMENT(self);
324     int fd = *self->read_fdp;
325
326     while (!elt->cancelled) {
327         char *buf = g_malloc(GLUE_BUFFER_SIZE);
328         size_t len;
329
330         /* read a buffer from upstream */
331         len = full_read(fd, buf, GLUE_BUFFER_SIZE);
332         if (len < GLUE_BUFFER_SIZE) {
333             if (errno) {
334                 if (!elt->cancelled) {
335                     int saved_errno = errno;
336                     xfer_cancel_with_error(elt,
337                         _("Error reading from fd %d: %s"), fd, strerror(saved_errno));
338                     g_debug("element-glue: error reading from fd %d: %s",
339                             fd, strerror(saved_errno));
340                     wait_until_xfer_cancelled(elt->xfer);
341                 }
342                 break;
343             } else if (len == 0) { /* we only count a zero-length read as EOF */
344                 amfree(buf);
345                 break;
346             }
347         }
348
349         xfer_element_push_buffer(elt->downstream, buf, len);
350     }
351
352     if (elt->cancelled && elt->expect_eof)
353         xfer_element_drain_by_reading(fd);
354
355     /* send an EOF indication downstream */
356     xfer_element_push_buffer(elt->downstream, NULL, 0);
357
358     /* close the read fd, since it's at EOF, and set it to -1 to avoid accidental
359      * re-use */
360     close(fd);
361     *self->read_fdp = -1;
362 }
363
364 static void
365 pull_and_push(XferElementGlue *self)
366 {
367     XferElement *elt = XFER_ELEMENT(self);
368     gboolean eof_sent = FALSE;
369
370     while (!elt->cancelled) {
371         char *buf;
372         size_t len;
373
374         /* get a buffer from upstream */
375         buf = xfer_element_pull_buffer(elt->upstream, &len);
376
377         /* and push it downstream */
378         xfer_element_push_buffer(elt->downstream, buf, len);
379
380         if (!buf) {
381             eof_sent = TRUE;
382             break;
383         }
384     }
385
386     if (elt->cancelled && elt->expect_eof)
387         xfer_element_drain_by_pulling(elt->upstream);
388
389     if (!eof_sent)
390         xfer_element_push_buffer(elt->downstream, NULL, 0);
391 }
392
393 static gpointer
394 worker_thread(
395     gpointer data)
396 {
397     XferElement *elt = XFER_ELEMENT(data);
398     XferElementGlue *self = XFER_ELEMENT_GLUE(data);
399
400     switch (mech_pair(elt->input_mech, elt->output_mech)) {
401     case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
402         read_and_write(self);
403         break;
404
405     case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
406     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
407         read_and_push(self);
408         break;
409
410     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
411     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
412         pull_and_write(self);
413         break;
414
415     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
416         pull_and_push(self);
417         break;
418
419     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
420     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
421         if ((self->output_data_socket = do_directtcp_connect(self,
422                                     elt->downstream->input_listen_addrs)) == -1)
423             break;
424         self->write_fdp = &self->output_data_socket;
425         read_and_write(self);
426         break;
427
428     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
429         if ((self->output_data_socket = do_directtcp_connect(self,
430                                     elt->downstream->input_listen_addrs)) == -1)
431             break;
432         self->write_fdp = &self->output_data_socket;
433         pull_and_write(self);
434         break;
435
436     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
437     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
438         if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
439             break;
440         self->read_fdp = &self->input_data_socket;
441         read_and_write(self);
442         break;
443
444     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
445         if ((self->input_data_socket = do_directtcp_accept(self,
446                                             &self->input_listen_socket)) == -1)
447             break;
448         self->read_fdp = &self->input_data_socket;
449         read_and_push(self);
450         break;
451
452     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
453     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
454     case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
455     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
456     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
457     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
458     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
459     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
460     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
461     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
462     default:
463         g_assert_not_reached();
464         break;
465
466     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
467     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
468         if ((self->output_data_socket = do_directtcp_accept(self,
469                                             &self->output_listen_socket)) == -1)
470             break;
471         self->write_fdp = &self->output_data_socket;
472         read_and_write(self);
473         break;
474
475     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
476     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
477         if ((self->input_data_socket = do_directtcp_connect(self,
478                                     elt->upstream->output_listen_addrs)) == -1)
479             break;
480         self->read_fdp = &self->input_data_socket;
481         read_and_write(self);
482         break;
483
484     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
485         if ((self->input_data_socket = do_directtcp_connect(self,
486                                     elt->upstream->output_listen_addrs)) == -1)
487             break;
488         self->read_fdp = &self->input_data_socket;
489         read_and_push(self);
490         break;
491
492     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
493         if ((self->output_data_socket = do_directtcp_accept(self,
494                                             &self->output_listen_socket)) == -1)
495             break;
496         self->write_fdp = &self->output_data_socket;
497         pull_and_write(self);
498         break;
499
500     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
501         /* TODO: use async accept's here to avoid order dependency */
502         if ((self->output_data_socket = do_directtcp_accept(self,
503                                             &self->output_listen_socket)) == -1)
504             break;
505         self->write_fdp = &self->output_data_socket;
506         if ((self->input_data_socket = do_directtcp_accept(self,
507                                             &self->input_listen_socket)) == -1)
508             break;
509         self->read_fdp = &self->input_data_socket;
510         read_and_write(self);
511         break;
512
513     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
514         /* TODO: use async connects and select() to avoid order dependency here */
515         if ((self->input_data_socket = do_directtcp_connect(self,
516                                     elt->upstream->output_listen_addrs)) == -1)
517             break;
518         self->read_fdp = &self->input_data_socket;
519         if ((self->output_data_socket = do_directtcp_connect(self,
520                                     elt->downstream->input_listen_addrs)) == -1)
521             break;
522         self->write_fdp = &self->output_data_socket;
523         read_and_write(self);
524         break;
525     }
526
527     send_xfer_done(self);
528
529     return NULL;
530 }
531
532 /*
533  * Implementation
534  */
535
536 static gboolean
537 setup_impl(
538     XferElement *elt)
539 {
540     XferElementGlue *self = (XferElementGlue *)elt;
541     gboolean need_ring = FALSE;
542     gboolean need_listen_input = FALSE;
543     gboolean need_listen_output = FALSE;
544
545     g_assert(elt->input_mech != XFER_MECH_NONE);
546     g_assert(elt->output_mech != XFER_MECH_NONE);
547     g_assert(elt->input_mech != elt->output_mech);
548
549     self->read_fdp = NULL;
550     self->write_fdp = NULL;
551     self->on_push = PUSH_INVALID;
552     self->on_pull = PULL_INVALID;
553     self->need_thread = FALSE;
554
555     switch (mech_pair(elt->input_mech, elt->output_mech)) {
556     case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
557         /* thread will read from one fd and write to the other */
558         self->read_fdp = &neighboring_element_fd;
559         self->write_fdp = &neighboring_element_fd;
560         self->need_thread = TRUE;
561         break;
562
563     case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
564         /* thread will read from one fd and call push_buffer downstream */
565         self->read_fdp = &neighboring_element_fd;
566         self->need_thread = TRUE;
567         break;
568
569     case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
570         self->read_fdp = &neighboring_element_fd;
571         self->on_pull = PULL_FROM_FD;
572         break;
573
574     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
575         /* thread will connect for output, then read from fd and write to the
576          * socket. */
577         self->read_fdp = &neighboring_element_fd;
578         self->need_thread = TRUE;
579         break;
580
581     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
582         /* thread will accept output conn, then read from upstream and write to socket */
583         self->read_fdp = &neighboring_element_fd;
584         self->need_thread = TRUE;
585         need_listen_output = TRUE;
586         break;
587
588     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
589         make_pipe(self);
590         elt->input_fd = self->pipe[1];
591         self->pipe[1] = -1; /* upstream will close this for us */
592         elt->output_fd = self->pipe[0];
593         self->pipe[0] = -1; /* downstream will close this for us */
594         break;
595
596     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
597         /* thread will read from pipe and call downstream's push_buffer */
598         make_pipe(self);
599         elt->input_fd = self->pipe[1];
600         self->pipe[1] = -1; /* upstream will close this for us */
601         self->read_fdp = &self->pipe[0];
602         self->need_thread = TRUE;
603         break;
604
605     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
606         make_pipe(self);
607         elt->input_fd = self->pipe[1];
608         self->pipe[1] = -1; /* upstream will close this for us */
609         self->on_pull = PULL_FROM_FD;
610         self->read_fdp = &self->pipe[0];
611         break;
612
613     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
614         /* thread will connect for output, then read from pipe and write to socket */
615         make_pipe(self);
616         elt->input_fd = self->pipe[1];
617         self->pipe[1] = -1; /* upstream will close this for us */
618         self->read_fdp = &self->pipe[0];
619         self->need_thread = TRUE;
620         break;
621
622     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
623         /* thread will accept output conn, then read from pipe and write to socket */
624         make_pipe(self);
625         elt->input_fd = self->pipe[1];
626         self->pipe[1] = -1; /* upstream will close this for us */
627         self->read_fdp = &self->pipe[0];
628         self->need_thread = TRUE;
629         need_listen_output = TRUE;
630         break;
631
632     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
633         make_pipe(self);
634         elt->output_fd = self->pipe[0];
635         self->pipe[0] = -1; /* downstream will close this for us */
636         self->on_push = PUSH_TO_FD;
637         self->write_fdp = &self->pipe[1];
638         break;
639
640     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
641         self->on_push = PUSH_TO_FD;
642         self->write_fdp = &neighboring_element_fd;
643         break;
644
645     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
646         self->on_push = PUSH_TO_RING_BUFFER;
647         self->on_pull = PULL_FROM_RING_BUFFER;
648         need_ring = TRUE;
649         break;
650
651     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
652         /* push will connect for output first */
653         self->on_push = PUSH_TO_FD | PUSH_CONNECT_FIRST;
654         break;
655
656     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
657         /* push will accept for output first */
658         self->on_push = PUSH_TO_FD | PUSH_ACCEPT_FIRST;
659         need_listen_output = TRUE;
660         break;
661
662     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
663         /* thread will pull from upstream and write to pipe */
664         make_pipe(self);
665         elt->output_fd = self->pipe[0];
666         self->pipe[0] = -1; /* downstream will close this for us */
667         self->write_fdp = &self->pipe[1];
668         self->need_thread = TRUE;
669         break;
670
671     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
672         /* thread will pull from upstream and write to downstream */
673         self->write_fdp = &neighboring_element_fd;
674         self->need_thread = TRUE;
675         break;
676
677     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
678         /* thread will pull from upstream and push to downstream */
679         self->need_thread = TRUE;
680         break;
681
682     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
683         /* thread will connect for output, then pull from upstream and write to socket */
684         self->need_thread = TRUE;
685         break;
686
687     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
688         /* thread will accept for output, then pull from upstream and write to socket */
689         self->need_thread = TRUE;
690         need_listen_output = TRUE;
691         break;
692
693     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
694         /* thread will accept for input, then read from socket and write to pipe */
695         make_pipe(self);
696         elt->output_fd = self->pipe[0];
697         self->pipe[0] = -1; /* downstream will close this for us */
698         self->write_fdp = &self->pipe[1];
699         self->need_thread = TRUE;
700         need_listen_input = TRUE;
701         break;
702
703     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
704         /* thread will accept for input, then read from socket and write to downstream */
705         self->write_fdp = &neighboring_element_fd;
706         self->need_thread = TRUE;
707         need_listen_input = TRUE;
708         break;
709
710     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
711         /* thread will accept for input, then read from socket and push downstream */
712         self->need_thread = TRUE;
713         need_listen_input = TRUE;
714         break;
715
716     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
717         /* first pull will accept for input, then read from socket */
718         self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
719         need_listen_input = TRUE;
720         break;
721
722     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
723         /* thread will accept on both sides, then copy from socket to socket */
724         self->need_thread = TRUE;
725         need_listen_input = TRUE;
726         need_listen_output = TRUE;
727         break;
728
729     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
730         /* thread will connect for input, then read from socket and write to pipe */
731         make_pipe(self);
732         elt->output_fd = self->pipe[0];
733         self->pipe[0] = -1; /* downstream will close this for us */
734         self->write_fdp = &self->pipe[1];
735         self->need_thread = TRUE;
736         break;
737
738     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
739         /* thread will connect for input, then read from socket and write to downstream */
740         self->write_fdp = &neighboring_element_fd;
741         self->need_thread = TRUE;
742         break;
743
744     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
745         /* thread will connect for input, then read from socket and push downstream */
746         self->need_thread = TRUE;
747         break;
748
749     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
750         /* first pull will connect for input, then read from socket */
751         self->on_pull = PULL_FROM_FD | PULL_CONNECT_FIRST;
752         break;
753
754     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
755         /* thread will connect on both sides, then copy from socket to socket */
756         self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
757         self->need_thread = TRUE;
758         break;
759
760     default:
761         g_assert_not_reached();
762         break;
763     }
764
765     /* set up ring if desired */
766     if (need_ring) {
767         self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
768         self->ring_used_sem = semaphore_new_with_value(0);
769         self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
770     }
771
772     if (need_listen_input) {
773         if (!do_directtcp_listen(elt,
774                     &self->input_listen_socket, &elt->input_listen_addrs))
775             return FALSE;
776     }
777     if (need_listen_output) {
778         if (!do_directtcp_listen(elt,
779                     &self->output_listen_socket, &elt->output_listen_addrs))
780             return FALSE;
781     }
782
783     return TRUE;
784 }
785
786 static gboolean
787 start_impl(
788     XferElement *elt)
789 {
790     XferElementGlue *self = (XferElementGlue *)elt;
791
792     /* upstream and downstream are now set, so we can point our fdp's to them */
793     if (self->write_fdp == &neighboring_element_fd)
794         self->write_fdp = &elt->downstream->input_fd;
795     if (self->read_fdp == &neighboring_element_fd)
796         self->read_fdp = &elt->upstream->output_fd;
797
798     if (self->need_thread)
799         self->thread = g_thread_create(worker_thread, (gpointer)self, FALSE, NULL);
800
801     /* we're active if we have a thread that will eventually die */
802     return self->need_thread;
803 }
804
805 static gpointer
806 pull_buffer_impl(
807     XferElement *elt,
808     size_t *size)
809 {
810     XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
811
812     /* accept first, if required */
813     if (self->on_pull & PULL_ACCEPT_FIRST) {
814         /* don't accept the next time around */
815         self->on_pull &= ~PULL_ACCEPT_FIRST;
816
817         if (elt->cancelled) {
818             *size = 0;
819             return NULL;
820         }
821
822         if ((self->input_data_socket = do_directtcp_accept(self,
823                                             &self->input_listen_socket)) == -1) {
824             /* do_directtcp_accept already signalled an error; xfer
825              * is cancelled */
826             *size = 0;
827             return NULL;
828         }
829
830         /* read from this new socket */
831         self->read_fdp = &self->input_data_socket;
832     }
833
834     /* or connect first, if required */
835     if (self->on_pull & PULL_CONNECT_FIRST) {
836         /* don't connect the next time around */
837         self->on_pull &= ~PULL_CONNECT_FIRST;
838
839         if (elt->cancelled) {
840             *size = 0;
841             return NULL;
842         }
843
844         if ((self->input_data_socket = do_directtcp_connect(self,
845                                     elt->upstream->output_listen_addrs)) == -1) {
846             /* do_directtcp_connect already signalled an error; xfer
847              * is cancelled */
848             *size = 0;
849             return NULL;
850         }
851
852         /* read from this new socket */
853         self->read_fdp = &self->input_data_socket;
854     }
855
856     switch (self->on_pull) {
857         case PULL_FROM_RING_BUFFER: {
858             gpointer buf;
859
860             if (elt->cancelled) {
861                 /* the finalize method will empty the ring buffer */
862                 *size = 0;
863                 return NULL;
864             }
865
866             /* make sure there's at least one element available */
867             semaphore_down(self->ring_used_sem);
868
869             /* get it */
870             buf = self->ring[self->ring_tail].buf;
871             *size = self->ring[self->ring_tail].size;
872             self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
873
874             /* and mark this element as free to be overwritten */
875             semaphore_up(self->ring_free_sem);
876
877             return buf;
878         }
879
880         case PULL_FROM_FD: {
881             int fd = *self->read_fdp;
882             char *buf = g_malloc(GLUE_BUFFER_SIZE);
883             ssize_t len;
884
885             if (elt->cancelled) {
886                 if (elt->expect_eof)
887                     xfer_element_drain_by_reading(fd);
888
889                 close(fd);
890                 *self->read_fdp = -1;
891
892                 *size = 0;
893                 return NULL;
894             }
895
896             /* read from upstream */
897             len = full_read(fd, buf, GLUE_BUFFER_SIZE);
898             if (len < GLUE_BUFFER_SIZE) {
899                 if (errno) {
900                     if (!elt->cancelled) {
901                         xfer_cancel_with_error(elt,
902                             _("Error reading from fd %d: %s"), fd, strerror(errno));
903                         wait_until_xfer_cancelled(elt->xfer);
904                     }
905
906                     /* return an EOF */
907                     amfree(buf);
908                     len = 0;
909
910                     /* and finish off the upstream */
911                     if (elt->expect_eof) {
912                         xfer_element_drain_by_reading(fd);
913                     }
914                     close(fd);
915                     *self->read_fdp = -1;
916                 } else if (len == 0) {
917                     /* EOF */
918                     g_free(buf);
919                     buf = NULL;
920                     *size = 0;
921
922                     /* signal EOF to downstream */
923                     close(fd);
924                     *self->read_fdp = -1;
925                 }
926             }
927
928             *size = (size_t)len;
929
930             return buf;
931         }
932
933         default:
934         case PULL_INVALID:
935             g_assert_not_reached();
936             return NULL;
937     }
938 }
939
940 static void
941 push_buffer_impl(
942     XferElement *elt,
943     gpointer buf,
944     size_t len)
945 {
946     XferElementGlue *self = (XferElementGlue *)elt;
947
948     /* accept first, if required */
949     if (self->on_push & PUSH_ACCEPT_FIRST) {
950         /* don't accept the next time around */
951         self->on_push &= ~PUSH_ACCEPT_FIRST;
952
953         if (elt->cancelled) {
954             return;
955         }
956
957         if ((self->output_data_socket = do_directtcp_accept(self,
958                                             &self->output_listen_socket)) == -1) {
959             /* do_directtcp_accept already signalled an error; xfer
960              * is cancelled */
961             return;
962         }
963
964         /* write to this new socket */
965         self->write_fdp = &self->output_data_socket;
966     }
967
968     /* or connect first, if required */
969     if (self->on_push & PUSH_CONNECT_FIRST) {
970         /* don't accept the next time around */
971         self->on_push &= ~PUSH_CONNECT_FIRST;
972
973         if (elt->cancelled) {
974             return;
975         }
976
977         if ((self->output_data_socket = do_directtcp_connect(self,
978                                     elt->downstream->input_listen_addrs)) == -1) {
979             /* do_directtcp_connect already signalled an error; xfer
980              * is cancelled */
981             return;
982         }
983
984         /* read from this new socket */
985         self->write_fdp = &self->output_data_socket;
986     }
987
988     switch (self->on_push) {
989         case PUSH_TO_RING_BUFFER:
990             /* just drop packets if the transfer has been cancelled */
991             if (elt->cancelled) {
992                 amfree(buf);
993                 return;
994             }
995
996             /* make sure there's at least one element free */
997             semaphore_down(self->ring_free_sem);
998
999             /* set it */
1000             self->ring[self->ring_head].buf = buf;
1001             self->ring[self->ring_head].size = len;
1002             self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
1003
1004             /* and mark this element as available for reading */
1005             semaphore_up(self->ring_used_sem);
1006
1007             return;
1008
1009         case PUSH_TO_FD: {
1010             int fd = *self->write_fdp;
1011
1012             if (elt->cancelled) {
1013                 if (!elt->expect_eof || !buf) {
1014                     close(fd);
1015                     *self->write_fdp = -1;
1016
1017                     /* hack to ensure we won't close the fd again, if we get another push */
1018                     elt->expect_eof = TRUE;
1019                 }
1020
1021                 amfree(buf);
1022
1023                 return;
1024             }
1025
1026             /* write the full buffer to the fd, or close on EOF */
1027             if (buf) {
1028                 if (full_write(fd, buf, len) < len) {
1029                     if (!elt->cancelled) {
1030                         xfer_cancel_with_error(elt,
1031                             _("Error writing to fd %d: %s"), fd, strerror(errno));
1032                         wait_until_xfer_cancelled(elt->xfer);
1033                     }
1034                     /* nothing special to do to handle a cancellation */
1035                 }
1036                 amfree(buf);
1037             } else {
1038                 close(fd);
1039                 *self->write_fdp = -1;
1040             }
1041
1042             return;
1043         }
1044
1045         default:
1046         case PUSH_INVALID:
1047             g_assert_not_reached();
1048             break;
1049     }
1050 }
1051
1052 static void
1053 instance_init(
1054     XferElementGlue *self)
1055 {
1056     XferElement *elt = (XferElement *)self;
1057     elt->can_generate_eof = TRUE;
1058     self->pipe[0] = self->pipe[1] = -1;
1059     self->input_listen_socket = -1;
1060     self->output_listen_socket = -1;
1061     self->input_data_socket = -1;
1062     self->output_data_socket = -1;
1063 }
1064
1065 static void
1066 finalize_impl(
1067     GObject * obj_self)
1068 {
1069     XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
1070
1071     /* close our pipes if they're still open (they shouldn't be!) */
1072     if (self->pipe[0] != -1) close(self->pipe[0]);
1073     if (self->pipe[1] != -1) close(self->pipe[1]);
1074     if (self->input_listen_socket != -1) close(self->input_listen_socket);
1075     if (self->output_listen_socket != -1) close(self->output_listen_socket);
1076     if (self->input_data_socket != -1) close(self->input_data_socket);
1077     if (self->output_data_socket != -1) close(self->output_data_socket);
1078
1079     if (self->ring) {
1080         /* empty the ring buffer, ignoring syncronization issues */
1081         while (self->ring_used_sem->value) {
1082             if (self->ring[self->ring_tail].buf)
1083                 amfree(self->ring[self->ring_tail].buf);
1084             self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
1085         }
1086
1087         amfree(self->ring);
1088         semaphore_free(self->ring_used_sem);
1089         semaphore_free(self->ring_free_sem);
1090     }
1091
1092     /* chain up */
1093     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1094 }
1095
1096 static xfer_element_mech_pair_t _pairs[] = {
1097     { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
1098     { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
1099     { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
1100     { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
1101     { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
1102
1103     { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
1104     { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
1105     { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
1106     { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* pipe + splice or copy*/
1107     { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy + pipe */
1108
1109     { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
1110     { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
1111     { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
1112     { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 0 }, /* write on demand */
1113     { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 0 }, /* write on demand */
1114
1115     { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
1116     { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
1117     { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
1118     { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 1 }, /* call and write */
1119     { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 1 }, /* call and write */
1120
1121     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
1122     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
1123     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
1124     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
1125     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
1126
1127     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
1128     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy + pipe */
1129     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
1130     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
1131     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy  */
1132
1133     /* terminator */
1134     { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1135 };
1136 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
1137
1138 static void
1139 class_init(
1140     XferElementGlueClass * selfc)
1141 {
1142     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1143     GObjectClass *goc = G_OBJECT_CLASS(selfc);
1144
1145     klass->setup = setup_impl;
1146     klass->start = start_impl;
1147     klass->push_buffer = push_buffer_impl;
1148     klass->pull_buffer = pull_buffer_impl;
1149
1150     klass->perl_class = "Amanda::Xfer::Element::Glue";
1151     klass->mech_pairs = xfer_element_glue_mech_pairs;
1152
1153     goc->finalize = finalize_impl;
1154
1155     parent_class = g_type_class_peek_parent(selfc);
1156 }
1157
1158 GType
1159 xfer_element_glue_get_type (void)
1160 {
1161     static GType type = 0;
1162
1163     if G_UNLIKELY(type == 0) {
1164         static const GTypeInfo info = {
1165             sizeof (XferElementGlueClass),
1166             (GBaseInitFunc) NULL,
1167             (GBaseFinalizeFunc) NULL,
1168             (GClassInitFunc) class_init,
1169             (GClassFinalizeFunc) NULL,
1170             NULL /* class_data */,
1171             sizeof (XferElementGlue),
1172             0 /* n_preallocs */,
1173             (GInstanceInitFunc) instance_init,
1174             NULL
1175         };
1176
1177         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
1178     }
1179
1180     return type;
1181 }
1182
1183 /* create an element of this class; prototype is in xfer-element.h */
1184 XferElement *
1185 xfer_element_glue(void)
1186 {
1187     XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
1188     XferElement *elt = XFER_ELEMENT(self);
1189
1190     return elt;
1191 }