Imported Upstream version 3.3.3
[debian/amanda] / xfer-src / element-glue.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18  *
19  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21  */
22
23 #include "amanda.h"
24 #include "amxfer.h"
25 #include "element-glue.h"
26 #include "directtcp.h"
27 #include "util.h"
28 #include "sockaddr-util.h"
29 #include "debug.h"
30
31 /*
32  * Instance definition
33  */
34
35 typedef struct XferElementGlue_ {
36     XferElement __parent__;
37
38     /* instructions to push_buffer_impl */
39     enum {
40         PUSH_TO_RING_BUFFER,
41         PUSH_TO_FD, /* write to write_fd */
42         PUSH_INVALID,
43
44         PUSH_ACCEPT_FIRST = (1 << 16),
45         PUSH_CONNECT_FIRST = (2 << 16),
46     } on_push;
47
48     /* instructions to pull_buffer_impl */
49     enum {
50         PULL_FROM_RING_BUFFER,
51         PULL_FROM_FD, /* read from read_fd */
52         PULL_INVALID,
53
54         PULL_ACCEPT_FIRST = (1 << 16),
55         PULL_CONNECT_FIRST = (2 << 16),
56     } on_pull;
57
58     int *write_fdp;
59     int *read_fdp;
60
61     gboolean need_thread;
62
63     /* the stuff we might use, depending on what flavor of glue we're
64      * providing.. */
65     int pipe[2];
66     int input_listen_socket, output_listen_socket;
67     int input_data_socket, output_data_socket;
68     int read_fd, write_fd;
69
70     /* a ring buffer of ptr/size pairs with semaphores */
71     struct { gpointer buf; size_t size; } *ring;
72     amsemaphore_t *ring_used_sem, *ring_free_sem;
73     gint ring_head, ring_tail;
74
75     GThread *thread;
76     GThreadFunc threadfunc;
77 } XferElementGlue;
78
79 /*
80  * Class definition
81  */
82
83 typedef struct XferElementGlueClass_ {
84     XferElementClass __parent__;
85 } XferElementGlueClass;
86
87 static GObjectClass *parent_class = NULL;
88
89 /*
90  * Utility functions, etc.
91  */
92
93 static void
94 make_pipe(
95     XferElementGlue *self)
96 {
97     if (pipe(self->pipe) < 0)
98         g_critical(_("Could not create pipe: %s"), strerror(errno));
99 }
100
101 static void
102 send_xfer_done(
103     XferElementGlue *self)
104 {
105     xfer_queue_message(XFER_ELEMENT(self)->xfer,
106             xmsg_new((XferElement *)self, XMSG_DONE, 0));
107 }
108
109 static gboolean
110 do_directtcp_listen(
111     XferElement *elt,
112     int *sockp,
113     DirectTCPAddr **addrsp)
114 {
115     int sock;
116     sockaddr_union data_addr;
117     DirectTCPAddr *addrs;
118     socklen_t len;
119     struct addrinfo *res;
120     struct addrinfo *res_addr;
121     sockaddr_union *addr = NULL;
122
123     if (resolve_hostname("localhost", 0, &res, NULL) != 0) {
124         xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno));
125         return FALSE;
126     }
127     for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
128         if (res_addr->ai_family == AF_INET) {
129             addr = (sockaddr_union *)res_addr->ai_addr;
130             break;
131         }
132     }
133     if (!addr) {
134         addr = (sockaddr_union *)res->ai_addr;
135     }
136
137     sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0);
138     if (sock < 0) {
139         xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
140         return FALSE;
141     }
142
143     len = SS_LEN(addr);
144     if (bind(sock, (struct sockaddr *)addr, len) != 0) {
145         xfer_cancel_with_error(elt, "bind(): %s", strerror(errno));
146         freeaddrinfo(res);
147         return FALSE;
148     }
149
150     if (listen(sock, 1) < 0) {
151         xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
152         return FALSE;
153     }
154
155     /* TODO: which addresses should this display? all ifaces? localhost? */
156     len = sizeof(data_addr);
157     if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0)
158         error("getsockname(): %s", strerror(errno));
159
160     addrs = g_new0(DirectTCPAddr, 2);
161     copy_sockaddr(&addrs[0], &data_addr);
162     *addrsp = addrs;
163
164     return TRUE;
165 }
166
167 static gboolean
168 prolong_accept(
169     gpointer data)
170 {
171     return !XFER_ELEMENT(data)->cancelled;
172 }
173
174 static int
175 do_directtcp_accept(
176     XferElementGlue *self,
177     int *socketp)
178 {
179     int sock;
180     g_assert(*socketp != -1);
181
182     if ((sock = interruptible_accept(*socketp, NULL, NULL,
183                                      prolong_accept, self)) == -1) {
184         /* if the accept was interrupted due to a cancellation, then do not
185          * add a further error message */
186         if (errno == 0 && XFER_ELEMENT(self)->cancelled)
187             return -1;
188
189         xfer_cancel_with_error(XFER_ELEMENT(self),
190             _("Error accepting incoming connection: %s"), strerror(errno));
191         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
192         return -1;
193     }
194
195     /* close the listening socket now, for good measure */
196     close(*socketp);
197     *socketp = -1;
198
199     return sock;
200 }
201
202 static int
203 do_directtcp_connect(
204     XferElementGlue *self,
205     DirectTCPAddr *addrs)
206 {
207     XferElement *elt = XFER_ELEMENT(self);
208     sockaddr_union addr;
209     int sock;
210
211     if (!addrs) {
212         g_debug("element-glue got no directtcp addresses to connect to!");
213         if (!elt->cancelled) {
214             xfer_cancel_with_error(elt,
215                 "%s got no directtcp addresses to connect to",
216                 xfer_element_repr(elt));
217         }
218         goto cancel_wait;
219     }
220
221     /* set up the sockaddr -- IPv4 only */
222     copy_sockaddr(&addr, addrs);
223
224     g_debug("do_directtcp_connect making data connection to %s", str_sockaddr(&addr));
225     sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
226     if (sock < 0) {
227         xfer_cancel_with_error(elt,
228             "socket(): %s", strerror(errno));
229         goto cancel_wait;
230     }
231     if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
232         xfer_cancel_with_error(elt,
233             "connect(): %s", strerror(errno));
234         goto cancel_wait;
235     }
236
237     g_debug("connected to %s", str_sockaddr(&addr));
238
239     return sock;
240
241 cancel_wait:
242     wait_until_xfer_cancelled(elt->xfer);
243     return -1;
244 }
245
246 #define GLUE_BUFFER_SIZE 32768
247 #define GLUE_RING_BUFFER_SIZE 32
248
249 #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
250
251 /*
252  * fd handling
253  */
254
255 /* if self->read_fdp or self->write_fdp are pointing to this integer, then they
256  * should be redirected to point to the upstream's output_fd or downstream's
257  * input_fd, respectively, at the first call to get_read_fd or get_write_fd,
258  * respectively. */
259 static int neighboring_element_fd = -1;
260
261 #define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd)
262 static int
263 _get_read_fd(XferElementGlue *self)
264 {
265     if (!self->read_fdp)
266         return -1; /* shouldn't happen.. */
267
268     if (self->read_fdp == &neighboring_element_fd) {
269         XferElement *elt = XFER_ELEMENT(self);
270         self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1);
271     } else {
272         self->read_fd = *self->read_fdp;
273         *self->read_fdp = -1;
274     }
275     self->read_fdp = NULL;
276     return self->read_fd;
277 }
278
279 #define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd)
280 static int
281 _get_write_fd(XferElementGlue *self)
282 {
283     if (!self->write_fdp)
284         return -1; /* shouldn't happen.. */
285
286     if (self->write_fdp == &neighboring_element_fd) {
287         XferElement *elt = XFER_ELEMENT(self);
288         self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1);
289     } else {
290         self->write_fd = *self->write_fdp;
291         *self->write_fdp = -1;
292     }
293     self->write_fdp = NULL;
294     return self->write_fd;
295 }
296
297 static int
298 close_read_fd(XferElementGlue *self)
299 {
300     int fd = get_read_fd(self);
301     self->read_fd = -1;
302     return close(fd);
303 }
304
305 static int
306 close_write_fd(XferElementGlue *self)
307 {
308     int fd = get_write_fd(self);
309     self->write_fd = -1;
310     return close(fd);
311 }
312
313 /*
314  * Worker thread utility functions
315  */
316
317 static void
318 pull_and_write(XferElementGlue *self)
319 {
320     XferElement *elt = XFER_ELEMENT(self);
321     int fd = get_write_fd(self);
322     self->write_fdp = NULL;
323
324     while (!elt->cancelled) {
325         size_t len;
326         char *buf;
327
328         /* get a buffer from upstream */
329         buf = xfer_element_pull_buffer(elt->upstream, &len);
330         if (!buf)
331             break;
332
333         /* write it */
334         if (full_write(fd, buf, len) < len) {
335             if (!elt->cancelled) {
336                 xfer_cancel_with_error(elt,
337                     _("Error writing to fd %d: %s"), fd, strerror(errno));
338                 wait_until_xfer_cancelled(elt->xfer);
339             }
340             amfree(buf);
341             break;
342         }
343
344         amfree(buf);
345     }
346
347     if (elt->cancelled && elt->expect_eof)
348         xfer_element_drain_buffers(elt->upstream);
349
350     /* close the fd we've been writing, as an EOF signal to downstream, and
351      * set it to -1 to avoid accidental re-use */
352     close_write_fd(self);
353 }
354
355 static void
356 read_and_write(XferElementGlue *self)
357 {
358     XferElement *elt = XFER_ELEMENT(self);
359     /* dynamically allocate a buffer, in case this thread has
360      * a limited amount of stack allocated */
361     char *buf = g_malloc(GLUE_BUFFER_SIZE);
362     int rfd = get_read_fd(self);
363     int wfd = get_write_fd(self);
364
365     while (!elt->cancelled) {
366         size_t len;
367
368         /* read from upstream */
369         len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
370         if (len < GLUE_BUFFER_SIZE) {
371             if (errno) {
372                 if (!elt->cancelled) {
373                     xfer_cancel_with_error(elt,
374                         _("Error reading from fd %d: %s"), rfd, strerror(errno));
375                     wait_until_xfer_cancelled(elt->xfer);
376                 }
377                 break;
378             } else if (len == 0) { /* we only count a zero-length read as EOF */
379                 break;
380             }
381         }
382
383         /* write the buffer fully */
384         if (full_write(wfd, buf, len) < len) {
385             if (!elt->cancelled) {
386                 xfer_cancel_with_error(elt,
387                     _("Could not write to fd %d: %s"), wfd, strerror(errno));
388                 wait_until_xfer_cancelled(elt->xfer);
389             }
390             break;
391         }
392     }
393
394     if (elt->cancelled && elt->expect_eof)
395         xfer_element_drain_fd(rfd);
396
397     /* close the read fd.  If it's not at EOF, then upstream will get EPIPE, which will hopefully
398      * kill it and complete the cancellation */
399     close_read_fd(self);
400
401     /* close the fd we've been writing, as an EOF signal to downstream */
402     close_write_fd(self);
403
404     amfree(buf);
405 }
406
407 static void
408 read_and_push(
409     XferElementGlue *self)
410 {
411     XferElement *elt = XFER_ELEMENT(self);
412     int fd = get_read_fd(self);
413
414     while (!elt->cancelled) {
415         char *buf = g_malloc(GLUE_BUFFER_SIZE);
416         size_t len;
417
418         /* read a buffer from upstream */
419         len = full_read(fd, buf, GLUE_BUFFER_SIZE);
420         if (len < GLUE_BUFFER_SIZE) {
421             if (errno) {
422                 if (!elt->cancelled) {
423                     int saved_errno = errno;
424                     xfer_cancel_with_error(elt,
425                         _("Error reading from fd %d: %s"), fd, strerror(saved_errno));
426                     g_debug("element-glue: error reading from fd %d: %s",
427                             fd, strerror(saved_errno));
428                     wait_until_xfer_cancelled(elt->xfer);
429                 }
430                 amfree(buf);
431                 break;
432             } else if (len == 0) { /* we only count a zero-length read as EOF */
433                 amfree(buf);
434                 break;
435             }
436         }
437
438         xfer_element_push_buffer(elt->downstream, buf, len);
439     }
440
441     if (elt->cancelled && elt->expect_eof)
442         xfer_element_drain_fd(fd);
443
444     /* send an EOF indication downstream */
445     xfer_element_push_buffer(elt->downstream, NULL, 0);
446
447     /* close the read fd, since it's at EOF */
448     close_read_fd(self);
449 }
450
451 static void
452 pull_and_push(XferElementGlue *self)
453 {
454     XferElement *elt = XFER_ELEMENT(self);
455     gboolean eof_sent = FALSE;
456
457     while (!elt->cancelled) {
458         char *buf;
459         size_t len;
460
461         /* get a buffer from upstream */
462         buf = xfer_element_pull_buffer(elt->upstream, &len);
463
464         /* and push it downstream */
465         xfer_element_push_buffer(elt->downstream, buf, len);
466
467         if (!buf) {
468             eof_sent = TRUE;
469             break;
470         }
471     }
472
473     if (elt->cancelled && elt->expect_eof)
474         xfer_element_drain_buffers(elt->upstream);
475
476     if (!eof_sent)
477         xfer_element_push_buffer(elt->downstream, NULL, 0);
478 }
479
480 static gpointer
481 worker_thread(
482     gpointer data)
483 {
484     XferElement *elt = XFER_ELEMENT(data);
485     XferElementGlue *self = XFER_ELEMENT_GLUE(data);
486
487     switch (mech_pair(elt->input_mech, elt->output_mech)) {
488     case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
489         read_and_write(self);
490         break;
491
492     case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
493     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
494         read_and_push(self);
495         break;
496
497     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
498     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
499         pull_and_write(self);
500         break;
501
502     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
503         pull_and_push(self);
504         break;
505
506     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
507     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
508         if ((self->output_data_socket = do_directtcp_connect(self,
509                                     elt->downstream->input_listen_addrs)) == -1)
510             break;
511         self->write_fdp = &self->output_data_socket;
512         read_and_write(self);
513         break;
514
515     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
516         if ((self->output_data_socket = do_directtcp_connect(self,
517                                     elt->downstream->input_listen_addrs)) == -1)
518             break;
519         self->write_fdp = &self->output_data_socket;
520         pull_and_write(self);
521         break;
522
523     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
524     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
525         if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
526             break;
527         self->read_fdp = &self->input_data_socket;
528         read_and_write(self);
529         break;
530
531     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
532         if ((self->input_data_socket = do_directtcp_accept(self,
533                                             &self->input_listen_socket)) == -1)
534             break;
535         self->read_fdp = &self->input_data_socket;
536         read_and_push(self);
537         break;
538
539     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
540     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
541     case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
542     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
543     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
544     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
545     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
546     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
547     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
548     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
549     default:
550         g_assert_not_reached();
551         break;
552
553     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
554     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
555         if ((self->output_data_socket = do_directtcp_accept(self,
556                                             &self->output_listen_socket)) == -1)
557             break;
558         self->write_fdp = &self->output_data_socket;
559         read_and_write(self);
560         break;
561
562     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
563     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
564         if ((self->input_data_socket = do_directtcp_connect(self,
565                                     elt->upstream->output_listen_addrs)) == -1)
566             break;
567         self->read_fdp = &self->input_data_socket;
568         read_and_write(self);
569         break;
570
571     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
572         if ((self->input_data_socket = do_directtcp_connect(self,
573                                     elt->upstream->output_listen_addrs)) == -1)
574             break;
575         self->read_fdp = &self->input_data_socket;
576         read_and_push(self);
577         break;
578
579     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
580         if ((self->output_data_socket = do_directtcp_accept(self,
581                                             &self->output_listen_socket)) == -1)
582             break;
583         self->write_fdp = &self->output_data_socket;
584         pull_and_write(self);
585         break;
586
587     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
588         /* TODO: use async accept's here to avoid order dependency */
589         if ((self->output_data_socket = do_directtcp_accept(self,
590                                             &self->output_listen_socket)) == -1)
591             break;
592         self->write_fdp = &self->output_data_socket;
593         if ((self->input_data_socket = do_directtcp_accept(self,
594                                             &self->input_listen_socket)) == -1)
595             break;
596         self->read_fdp = &self->input_data_socket;
597         read_and_write(self);
598         break;
599
600     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
601         /* TODO: use async connects and select() to avoid order dependency here */
602         if ((self->input_data_socket = do_directtcp_connect(self,
603                                     elt->upstream->output_listen_addrs)) == -1)
604             break;
605         self->read_fdp = &self->input_data_socket;
606         if ((self->output_data_socket = do_directtcp_connect(self,
607                                     elt->downstream->input_listen_addrs)) == -1)
608             break;
609         self->write_fdp = &self->output_data_socket;
610         read_and_write(self);
611         break;
612     }
613
614     send_xfer_done(self);
615
616     return NULL;
617 }
618
619 /*
620  * Implementation
621  */
622
623 static gboolean
624 setup_impl(
625     XferElement *elt)
626 {
627     XferElementGlue *self = (XferElementGlue *)elt;
628     gboolean need_ring = FALSE;
629     gboolean need_listen_input = FALSE;
630     gboolean need_listen_output = FALSE;
631
632     g_assert(elt->input_mech != XFER_MECH_NONE);
633     g_assert(elt->output_mech != XFER_MECH_NONE);
634     g_assert(elt->input_mech != elt->output_mech);
635
636     self->read_fdp = NULL;
637     self->write_fdp = NULL;
638     self->on_push = PUSH_INVALID;
639     self->on_pull = PULL_INVALID;
640     self->need_thread = FALSE;
641
642     switch (mech_pair(elt->input_mech, elt->output_mech)) {
643     case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
644         /* thread will read from one fd and write to the other */
645         self->read_fdp = &neighboring_element_fd;
646         self->write_fdp = &neighboring_element_fd;
647         self->need_thread = TRUE;
648         break;
649
650     case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
651         /* thread will read from one fd and call push_buffer downstream */
652         self->read_fdp = &neighboring_element_fd;
653         self->need_thread = TRUE;
654         break;
655
656     case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
657         self->read_fdp = &neighboring_element_fd;
658         self->on_pull = PULL_FROM_FD;
659         break;
660
661     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
662         /* thread will connect for output, then read from fd and write to the
663          * socket. */
664         self->read_fdp = &neighboring_element_fd;
665         self->need_thread = TRUE;
666         break;
667
668     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
669         /* thread will accept output conn, then read from upstream and write to socket */
670         self->read_fdp = &neighboring_element_fd;
671         self->need_thread = TRUE;
672         need_listen_output = TRUE;
673         break;
674
675     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
676         make_pipe(self);
677         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
678         self->pipe[1] = -1; /* upstream will close this for us */
679         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
680         self->pipe[0] = -1; /* downstream will close this for us */
681         break;
682
683     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
684         /* thread will read from pipe and call downstream's push_buffer */
685         make_pipe(self);
686         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
687         self->pipe[1] = -1; /* upstream will close this for us */
688         self->read_fdp = &self->pipe[0];
689         self->need_thread = TRUE;
690         break;
691
692     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
693         make_pipe(self);
694         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
695         self->pipe[1] = -1; /* upstream will close this for us */
696         self->on_pull = PULL_FROM_FD;
697         self->read_fdp = &self->pipe[0];
698         break;
699
700     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
701         /* thread will connect for output, then read from pipe and write to socket */
702         make_pipe(self);
703         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
704         self->pipe[1] = -1; /* upstream will close this for us */
705         self->read_fdp = &self->pipe[0];
706         self->need_thread = TRUE;
707         break;
708
709     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
710         /* thread will accept output conn, then read from pipe and write to socket */
711         make_pipe(self);
712         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
713         self->pipe[1] = -1; /* upstream will close this for us */
714         self->read_fdp = &self->pipe[0];
715         self->need_thread = TRUE;
716         need_listen_output = TRUE;
717         break;
718
719     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
720         make_pipe(self);
721         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
722         self->pipe[0] = -1; /* downstream will close this for us */
723         self->on_push = PUSH_TO_FD;
724         self->write_fdp = &self->pipe[1];
725         break;
726
727     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
728         self->on_push = PUSH_TO_FD;
729         self->write_fdp = &neighboring_element_fd;
730         break;
731
732     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
733         self->on_push = PUSH_TO_RING_BUFFER;
734         self->on_pull = PULL_FROM_RING_BUFFER;
735         need_ring = TRUE;
736         break;
737
738     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
739         /* push will connect for output first */
740         self->on_push = PUSH_TO_FD | PUSH_CONNECT_FIRST;
741         break;
742
743     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
744         /* push will accept for output first */
745         self->on_push = PUSH_TO_FD | PUSH_ACCEPT_FIRST;
746         need_listen_output = TRUE;
747         break;
748
749     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
750         /* thread will pull from upstream and write to pipe */
751         make_pipe(self);
752         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
753         self->pipe[0] = -1; /* downstream will close this for us */
754         self->write_fdp = &self->pipe[1];
755         self->need_thread = TRUE;
756         break;
757
758     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
759         /* thread will pull from upstream and write to downstream */
760         self->write_fdp = &neighboring_element_fd;
761         self->need_thread = TRUE;
762         break;
763
764     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
765         /* thread will pull from upstream and push to downstream */
766         self->need_thread = TRUE;
767         break;
768
769     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
770         /* thread will connect for output, then pull from upstream and write to socket */
771         self->need_thread = TRUE;
772         break;
773
774     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
775         /* thread will accept for output, then pull from upstream and write to socket */
776         self->need_thread = TRUE;
777         need_listen_output = TRUE;
778         break;
779
780     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
781         /* thread will accept for input, then read from socket and write to pipe */
782         make_pipe(self);
783         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
784         self->pipe[0] = -1; /* downstream will close this for us */
785         self->write_fdp = &self->pipe[1];
786         self->need_thread = TRUE;
787         need_listen_input = TRUE;
788         break;
789
790     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
791         /* thread will accept for input, then read from socket and write to downstream */
792         self->write_fdp = &neighboring_element_fd;
793         self->need_thread = TRUE;
794         need_listen_input = TRUE;
795         break;
796
797     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
798         /* thread will accept for input, then read from socket and push downstream */
799         self->need_thread = TRUE;
800         need_listen_input = TRUE;
801         break;
802
803     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
804         /* first pull will accept for input, then read from socket */
805         self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
806         need_listen_input = TRUE;
807         break;
808
809     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
810         /* thread will accept on both sides, then copy from socket to socket */
811         self->need_thread = TRUE;
812         need_listen_input = TRUE;
813         need_listen_output = TRUE;
814         break;
815
816     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
817         /* thread will connect for input, then read from socket and write to pipe */
818         make_pipe(self);
819         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
820         self->pipe[0] = -1; /* downstream will close this for us */
821         self->write_fdp = &self->pipe[1];
822         self->need_thread = TRUE;
823         break;
824
825     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
826         /* thread will connect for input, then read from socket and write to downstream */
827         self->write_fdp = &neighboring_element_fd;
828         self->need_thread = TRUE;
829         break;
830
831     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
832         /* thread will connect for input, then read from socket and push downstream */
833         self->need_thread = TRUE;
834         break;
835
836     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
837         /* first pull will connect for input, then read from socket */
838         self->on_pull = PULL_FROM_FD | PULL_CONNECT_FIRST;
839         break;
840
841     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
842         /* thread will connect on both sides, then copy from socket to socket */
843         self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
844         self->need_thread = TRUE;
845         break;
846
847     default:
848         g_assert_not_reached();
849         break;
850     }
851
852     /* set up ring if desired */
853     if (need_ring) {
854         self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
855         self->ring_used_sem = amsemaphore_new_with_value(0);
856         self->ring_free_sem = amsemaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
857     }
858
859     if (need_listen_input) {
860         if (!do_directtcp_listen(elt,
861                     &self->input_listen_socket, &elt->input_listen_addrs))
862             return FALSE;
863     }
864     if (need_listen_output) {
865         if (!do_directtcp_listen(elt,
866                     &self->output_listen_socket, &elt->output_listen_addrs))
867             return FALSE;
868     }
869
870     return TRUE;
871 }
872
873 static gboolean
874 start_impl(
875     XferElement *elt)
876 {
877     XferElementGlue *self = (XferElementGlue *)elt;
878
879     if (self->need_thread)
880         self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL);
881
882     /* we're active if we have a thread that will eventually die */
883     return self->need_thread;
884 }
885
886 static gpointer
887 pull_buffer_impl(
888     XferElement *elt,
889     size_t *size)
890 {
891     XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
892
893     /* accept first, if required */
894     if (self->on_pull & PULL_ACCEPT_FIRST) {
895         /* don't accept the next time around */
896         self->on_pull &= ~PULL_ACCEPT_FIRST;
897
898         if (elt->cancelled) {
899             *size = 0;
900             return NULL;
901         }
902
903         if ((self->input_data_socket = do_directtcp_accept(self,
904                                             &self->input_listen_socket)) == -1) {
905             /* do_directtcp_accept already signalled an error; xfer
906              * is cancelled */
907             *size = 0;
908             return NULL;
909         }
910
911         /* read from this new socket */
912         self->read_fdp = &self->input_data_socket;
913     }
914
915     /* or connect first, if required */
916     if (self->on_pull & PULL_CONNECT_FIRST) {
917         /* don't connect the next time around */
918         self->on_pull &= ~PULL_CONNECT_FIRST;
919
920         if (elt->cancelled) {
921             *size = 0;
922             return NULL;
923         }
924
925         if ((self->input_data_socket = do_directtcp_connect(self,
926                                     elt->upstream->output_listen_addrs)) == -1) {
927             /* do_directtcp_connect already signalled an error; xfer
928              * is cancelled */
929             *size = 0;
930             return NULL;
931         }
932
933         /* read from this new socket */
934         self->read_fdp = &self->input_data_socket;
935     }
936
937     switch (self->on_pull) {
938         case PULL_FROM_RING_BUFFER: {
939             gpointer buf;
940
941             if (elt->cancelled) {
942                 /* the finalize method will empty the ring buffer */
943                 *size = 0;
944                 return NULL;
945             }
946
947             /* make sure there's at least one element available */
948             amsemaphore_down(self->ring_used_sem);
949
950             /* get it */
951             buf = self->ring[self->ring_tail].buf;
952             *size = self->ring[self->ring_tail].size;
953             self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
954
955             /* and mark this element as free to be overwritten */
956             amsemaphore_up(self->ring_free_sem);
957
958             return buf;
959         }
960
961         case PULL_FROM_FD: {
962             int fd = get_read_fd(self);
963             char *buf;
964             ssize_t len;
965
966             /* if the fd is already closed, it's possible upstream bailed out
967              * so quickly that we didn't even get a look at the fd */
968             if (elt->cancelled || fd == -1) {
969                 if (fd != -1) {
970                     if (elt->expect_eof)
971                         xfer_element_drain_fd(fd);
972
973                     close_read_fd(self);
974                 }
975
976                 *size = 0;
977                 return NULL;
978             }
979
980             buf = g_malloc(GLUE_BUFFER_SIZE);
981
982             /* read from upstream */
983             len = full_read(fd, buf, GLUE_BUFFER_SIZE);
984             if (len < GLUE_BUFFER_SIZE) {
985                 if (errno) {
986                     if (!elt->cancelled) {
987                         xfer_cancel_with_error(elt,
988                             _("Error reading from fd %d: %s"), fd, strerror(errno));
989                         wait_until_xfer_cancelled(elt->xfer);
990                     }
991
992                     /* return an EOF */
993                     amfree(buf);
994                     len = 0;
995
996                     /* and finish off the upstream */
997                     if (elt->expect_eof) {
998                         xfer_element_drain_fd(fd);
999                     }
1000                     close_read_fd(self);
1001                 } else if (len == 0) {
1002                     /* EOF */
1003                     g_free(buf);
1004                     buf = NULL;
1005                     *size = 0;
1006
1007                     /* signal EOF to downstream */
1008                     close_read_fd(self);
1009                 }
1010             }
1011
1012             *size = (size_t)len;
1013
1014             return buf;
1015         }
1016
1017         default:
1018         case PULL_INVALID:
1019             g_assert_not_reached();
1020             return NULL;
1021     }
1022 }
1023
1024 static void
1025 push_buffer_impl(
1026     XferElement *elt,
1027     gpointer buf,
1028     size_t len)
1029 {
1030     XferElementGlue *self = (XferElementGlue *)elt;
1031
1032     /* accept first, if required */
1033     if (self->on_push & PUSH_ACCEPT_FIRST) {
1034         /* don't accept the next time around */
1035         self->on_push &= ~PUSH_ACCEPT_FIRST;
1036
1037         if (elt->cancelled) {
1038             return;
1039         }
1040
1041         if ((self->output_data_socket = do_directtcp_accept(self,
1042                                             &self->output_listen_socket)) == -1) {
1043             /* do_directtcp_accept already signalled an error; xfer
1044              * is cancelled */
1045             return;
1046         }
1047
1048         /* write to this new socket */
1049         self->write_fdp = &self->output_data_socket;
1050     }
1051
1052     /* or connect first, if required */
1053     if (self->on_push & PUSH_CONNECT_FIRST) {
1054         /* don't accept the next time around */
1055         self->on_push &= ~PUSH_CONNECT_FIRST;
1056
1057         if (elt->cancelled) {
1058             return;
1059         }
1060
1061         if ((self->output_data_socket = do_directtcp_connect(self,
1062                                     elt->downstream->input_listen_addrs)) == -1) {
1063             /* do_directtcp_connect already signalled an error; xfer
1064              * is cancelled */
1065             return;
1066         }
1067
1068         /* read from this new socket */
1069         self->write_fdp = &self->output_data_socket;
1070     }
1071
1072     switch (self->on_push) {
1073         case PUSH_TO_RING_BUFFER:
1074             /* just drop packets if the transfer has been cancelled */
1075             if (elt->cancelled) {
1076                 amfree(buf);
1077                 return;
1078             }
1079
1080             /* make sure there's at least one element free */
1081             amsemaphore_down(self->ring_free_sem);
1082
1083             /* set it */
1084             self->ring[self->ring_head].buf = buf;
1085             self->ring[self->ring_head].size = len;
1086             self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
1087
1088             /* and mark this element as available for reading */
1089             amsemaphore_up(self->ring_used_sem);
1090
1091             return;
1092
1093         case PUSH_TO_FD: {
1094             int fd = get_write_fd(self);
1095
1096             /* if the fd is already closed, it's possible upstream bailed out
1097              * so quickly that we didn't even get a look at the fd.  In this
1098              * case we can assume the xfer has been cancelled and just discard
1099              * the data. */
1100             if (fd == -1)
1101                 return;
1102
1103             if (elt->cancelled) {
1104                 if (!elt->expect_eof || !buf) {
1105                     close_write_fd(self);
1106
1107                     /* hack to ensure we won't close the fd again, if we get another push */
1108                     elt->expect_eof = TRUE;
1109                 }
1110
1111                 amfree(buf);
1112
1113                 return;
1114             }
1115
1116             /* write the full buffer to the fd, or close on EOF */
1117             if (buf) {
1118                 if (full_write(fd, buf, len) < len) {
1119                     if (!elt->cancelled) {
1120                         xfer_cancel_with_error(elt,
1121                             _("Error writing to fd %d: %s"), fd, strerror(errno));
1122                         wait_until_xfer_cancelled(elt->xfer);
1123                     }
1124                     /* nothing special to do to handle a cancellation */
1125                 }
1126                 amfree(buf);
1127             } else {
1128                 close_write_fd(self);
1129             }
1130
1131             return;
1132         }
1133
1134         default:
1135         case PUSH_INVALID:
1136             g_assert_not_reached();
1137             break;
1138     }
1139 }
1140
1141 static void
1142 instance_init(
1143     XferElementGlue *self)
1144 {
1145     XferElement *elt = (XferElement *)self;
1146     elt->can_generate_eof = TRUE;
1147     self->pipe[0] = self->pipe[1] = -1;
1148     self->input_listen_socket = -1;
1149     self->output_listen_socket = -1;
1150     self->input_data_socket = -1;
1151     self->output_data_socket = -1;
1152     self->read_fd = -1;
1153     self->write_fd = -1;
1154 }
1155
1156 static void
1157 finalize_impl(
1158     GObject * obj_self)
1159 {
1160     XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
1161
1162     /* first make sure the worker thread has finished up */
1163     if (self->thread)
1164         g_thread_join(self->thread);
1165
1166     /* close our pipes and fd's if they're still open */
1167     if (self->pipe[0] != -1) close(self->pipe[0]);
1168     if (self->pipe[1] != -1) close(self->pipe[1]);
1169     if (self->input_data_socket != -1) close(self->input_data_socket);
1170     if (self->output_data_socket != -1) close(self->output_data_socket);
1171     if (self->input_listen_socket != -1) close(self->input_listen_socket);
1172     if (self->output_listen_socket != -1) close(self->output_listen_socket);
1173     if (self->read_fd != -1) close(self->read_fd);
1174     if (self->write_fd != -1) close(self->write_fd);
1175
1176     if (self->ring) {
1177         /* empty the ring buffer, ignoring syncronization issues */
1178         while (self->ring_used_sem->value) {
1179             if (self->ring[self->ring_tail].buf)
1180                 amfree(self->ring[self->ring_tail].buf);
1181             self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
1182         }
1183
1184         amfree(self->ring);
1185         amsemaphore_free(self->ring_used_sem);
1186         amsemaphore_free(self->ring_free_sem);
1187     }
1188
1189     /* chain up */
1190     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1191 }
1192
1193 static xfer_element_mech_pair_t _pairs[] = {
1194     { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1195     { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1196     { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1197     { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1198     { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1199
1200     { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */
1201     { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/
1202     { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */
1203     { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/
1204     { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1205
1206     { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */
1207     { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1208     { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */
1209     { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1210     { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1211
1212     { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */
1213     { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1214     { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */
1215     { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1216     { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1217
1218     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1219     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1220     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1221     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1222     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1223
1224     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1225     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1226     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1227     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1228     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy  */
1229
1230     /* terminator */
1231     { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1232 };
1233 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
1234
1235 static void
1236 class_init(
1237     XferElementGlueClass * selfc)
1238 {
1239     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1240     GObjectClass *goc = G_OBJECT_CLASS(selfc);
1241
1242     klass->setup = setup_impl;
1243     klass->start = start_impl;
1244     klass->push_buffer = push_buffer_impl;
1245     klass->pull_buffer = pull_buffer_impl;
1246
1247     klass->perl_class = "Amanda::Xfer::Element::Glue";
1248     klass->mech_pairs = xfer_element_glue_mech_pairs;
1249
1250     goc->finalize = finalize_impl;
1251
1252     parent_class = g_type_class_peek_parent(selfc);
1253 }
1254
1255 GType
1256 xfer_element_glue_get_type (void)
1257 {
1258     static GType type = 0;
1259
1260     if G_UNLIKELY(type == 0) {
1261         static const GTypeInfo info = {
1262             sizeof (XferElementGlueClass),
1263             (GBaseInitFunc) NULL,
1264             (GBaseFinalizeFunc) NULL,
1265             (GClassInitFunc) class_init,
1266             (GClassFinalizeFunc) NULL,
1267             NULL /* class_data */,
1268             sizeof (XferElementGlue),
1269             0 /* n_preallocs */,
1270             (GInstanceInitFunc) instance_init,
1271             NULL
1272         };
1273
1274         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
1275     }
1276
1277     return type;
1278 }
1279
1280 /* create an element of this class; prototype is in xfer-element.h */
1281 XferElement *
1282 xfer_element_glue(void)
1283 {
1284     XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
1285     XferElement *elt = XFER_ELEMENT(self);
1286
1287     return elt;
1288 }