319b576080658f3639913c52adf03c90e9fc6937
[debian/amanda] / xfer-src / element-glue.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amanda.h"
23 #include "amxfer.h"
24 #include "element-glue.h"
25 #include "directtcp.h"
26 #include "util.h"
27 #include "sockaddr-util.h"
28 #include "debug.h"
29
30 /*
31  * Instance definition
32  */
33
34 typedef struct XferElementGlue_ {
35     XferElement __parent__;
36
37     /* instructions to push_buffer_impl */
38     enum {
39         PUSH_TO_RING_BUFFER,
40         PUSH_TO_FD, /* write to write_fd */
41         PUSH_INVALID,
42
43         PUSH_ACCEPT_FIRST = (1 << 16),
44         PUSH_CONNECT_FIRST = (2 << 16),
45     } on_push;
46
47     /* instructions to pull_buffer_impl */
48     enum {
49         PULL_FROM_RING_BUFFER,
50         PULL_FROM_FD, /* read from read_fd */
51         PULL_INVALID,
52
53         PULL_ACCEPT_FIRST = (1 << 16),
54         PULL_CONNECT_FIRST = (2 << 16),
55     } on_pull;
56
57     int *write_fdp;
58     int *read_fdp;
59
60     gboolean need_thread;
61
62     /* the stuff we might use, depending on what flavor of glue we're
63      * providing.. */
64     int pipe[2];
65     int input_listen_socket, output_listen_socket;
66     int input_data_socket, output_data_socket;
67     int read_fd, write_fd;
68
69     /* a ring buffer of ptr/size pairs with semaphores */
70     struct { gpointer buf; size_t size; } *ring;
71     amsemaphore_t *ring_used_sem, *ring_free_sem;
72     gint ring_head, ring_tail;
73
74     GThread *thread;
75     GThreadFunc threadfunc;
76 } XferElementGlue;
77
78 /*
79  * Class definition
80  */
81
82 typedef struct XferElementGlueClass_ {
83     XferElementClass __parent__;
84 } XferElementGlueClass;
85
86 static GObjectClass *parent_class = NULL;
87
88 /*
89  * Utility functions, etc.
90  */
91
92 static void
93 make_pipe(
94     XferElementGlue *self)
95 {
96     if (pipe(self->pipe) < 0)
97         g_critical(_("Could not create pipe: %s"), strerror(errno));
98 }
99
100 static void
101 send_xfer_done(
102     XferElementGlue *self)
103 {
104     xfer_queue_message(XFER_ELEMENT(self)->xfer,
105             xmsg_new((XferElement *)self, XMSG_DONE, 0));
106 }
107
108 static gboolean
109 do_directtcp_listen(
110     XferElement *elt,
111     int *sockp,
112     DirectTCPAddr **addrsp)
113 {
114     int sock;
115     sockaddr_union data_addr;
116     DirectTCPAddr *addrs;
117     socklen_t len;
118     struct addrinfo *res;
119     struct addrinfo *res_addr;
120     sockaddr_union *addr = NULL;
121
122     if (resolve_hostname("localhost", 0, &res, NULL) != 0) {
123         xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno));
124         return FALSE;
125     }
126     for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
127         if (res_addr->ai_family == AF_INET) {
128             addr = (sockaddr_union *)res_addr->ai_addr;
129             break;
130         }
131     }
132     if (!addr) {
133         addr = (sockaddr_union *)res->ai_addr;
134     }
135
136     sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0);
137     if (sock < 0) {
138         xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
139         return FALSE;
140     }
141
142     len = SS_LEN(addr);
143     if (bind(sock, (struct sockaddr *)addr, len) != 0) {
144         xfer_cancel_with_error(elt, "bind(): %s", strerror(errno));
145         freeaddrinfo(res);
146         return FALSE;
147     }
148
149     if (listen(sock, 1) < 0) {
150         xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
151         return FALSE;
152     }
153
154     /* TODO: which addresses should this display? all ifaces? localhost? */
155     len = sizeof(data_addr);
156     if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0)
157         error("getsockname(): %s", strerror(errno));
158
159     addrs = g_new0(DirectTCPAddr, 2);
160     copy_sockaddr(&addrs[0], &data_addr);
161     *addrsp = addrs;
162
163     return TRUE;
164 }
165
166 static gboolean
167 prolong_accept(
168     gpointer data)
169 {
170     return !XFER_ELEMENT(data)->cancelled;
171 }
172
173 static int
174 do_directtcp_accept(
175     XferElementGlue *self,
176     int *socketp)
177 {
178     int sock;
179     g_assert(*socketp != -1);
180
181     if ((sock = interruptible_accept(*socketp, NULL, NULL,
182                                      prolong_accept, self)) == -1) {
183         /* if the accept was interrupted due to a cancellation, then do not
184          * add a further error message */
185         if (errno == 0 && XFER_ELEMENT(self)->cancelled)
186             return -1;
187
188         xfer_cancel_with_error(XFER_ELEMENT(self),
189             _("Error accepting incoming connection: %s"), strerror(errno));
190         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
191         return -1;
192     }
193
194     /* close the listening socket now, for good measure */
195     close(*socketp);
196     *socketp = -1;
197
198     return sock;
199 }
200
201 static int
202 do_directtcp_connect(
203     XferElementGlue *self,
204     DirectTCPAddr *addrs)
205 {
206     XferElement *elt = XFER_ELEMENT(self);
207     sockaddr_union addr;
208     int sock;
209
210     if (!addrs) {
211         g_debug("element-glue got no directtcp addresses to connect to!");
212         if (!elt->cancelled) {
213             xfer_cancel_with_error(elt,
214                 "%s got no directtcp addresses to connect to",
215                 xfer_element_repr(elt));
216         }
217         goto cancel_wait;
218     }
219
220     /* set up the sockaddr -- IPv4 only */
221     copy_sockaddr(&addr, addrs);
222
223     g_debug("making data connection to %s", str_sockaddr(&addr));
224     sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
225     if (sock < 0) {
226         xfer_cancel_with_error(elt,
227             "socket(): %s", strerror(errno));
228         goto cancel_wait;
229     }
230     if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
231         xfer_cancel_with_error(elt,
232             "connect(): %s", strerror(errno));
233         goto cancel_wait;
234     }
235
236     g_debug("connected to %s", str_sockaddr(&addr));
237
238     return sock;
239
240 cancel_wait:
241     wait_until_xfer_cancelled(elt->xfer);
242     return -1;
243 }
244
245 #define GLUE_BUFFER_SIZE 32768
246 #define GLUE_RING_BUFFER_SIZE 32
247
248 #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
249
250 /*
251  * fd handling
252  */
253
254 /* if self->read_fdp or self->write_fdp are pointing to this integer, then they
255  * should be redirected to point to the upstream's output_fd or downstream's
256  * input_fd, respectively, at the first call to get_read_fd or get_write_fd,
257  * respectively. */
258 static int neighboring_element_fd = -1;
259
260 #define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd)
261 static int
262 _get_read_fd(XferElementGlue *self)
263 {
264     if (!self->read_fdp)
265         return -1; /* shouldn't happen.. */
266
267     if (self->read_fdp == &neighboring_element_fd) {
268         XferElement *elt = XFER_ELEMENT(self);
269         self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1);
270     } else {
271         self->read_fd = *self->read_fdp;
272         *self->read_fdp = -1;
273     }
274     self->read_fdp = NULL;
275     return self->read_fd;
276 }
277
278 #define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd)
279 static int
280 _get_write_fd(XferElementGlue *self)
281 {
282     if (!self->write_fdp)
283         return -1; /* shouldn't happen.. */
284
285     if (self->write_fdp == &neighboring_element_fd) {
286         XferElement *elt = XFER_ELEMENT(self);
287         self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1);
288     } else {
289         self->write_fd = *self->write_fdp;
290         *self->write_fdp = -1;
291     }
292     self->write_fdp = NULL;
293     return self->write_fd;
294 }
295
296 static int
297 close_read_fd(XferElementGlue *self)
298 {
299     int fd = get_read_fd(self);
300     self->read_fd = -1;
301     return close(fd);
302 }
303
304 static int
305 close_write_fd(XferElementGlue *self)
306 {
307     int fd = get_write_fd(self);
308     self->write_fd = -1;
309     return close(fd);
310 }
311
312 /*
313  * Worker thread utility functions
314  */
315
316 static void
317 pull_and_write(XferElementGlue *self)
318 {
319     XferElement *elt = XFER_ELEMENT(self);
320     int fd = get_write_fd(self);
321     self->write_fdp = NULL;
322
323     while (!elt->cancelled) {
324         size_t len;
325         char *buf;
326
327         /* get a buffer from upstream */
328         buf = xfer_element_pull_buffer(elt->upstream, &len);
329         if (!buf)
330             break;
331
332         /* write it */
333         if (full_write(fd, buf, len) < len) {
334             if (!elt->cancelled) {
335                 xfer_cancel_with_error(elt,
336                     _("Error writing to fd %d: %s"), fd, strerror(errno));
337                 wait_until_xfer_cancelled(elt->xfer);
338             }
339             amfree(buf);
340             break;
341         }
342
343         amfree(buf);
344     }
345
346     if (elt->cancelled && elt->expect_eof)
347         xfer_element_drain_buffers(elt->upstream);
348
349     /* close the fd we've been writing, as an EOF signal to downstream, and
350      * set it to -1 to avoid accidental re-use */
351     close_write_fd(self);
352 }
353
354 static void
355 read_and_write(XferElementGlue *self)
356 {
357     XferElement *elt = XFER_ELEMENT(self);
358     /* dynamically allocate a buffer, in case this thread has
359      * a limited amount of stack allocated */
360     char *buf = g_malloc(GLUE_BUFFER_SIZE);
361     int rfd = get_read_fd(self);
362     int wfd = get_write_fd(self);
363
364     while (!elt->cancelled) {
365         size_t len;
366
367         /* read from upstream */
368         len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
369         if (len < GLUE_BUFFER_SIZE) {
370             if (errno) {
371                 if (!elt->cancelled) {
372                     xfer_cancel_with_error(elt,
373                         _("Error reading from fd %d: %s"), rfd, strerror(errno));
374                     wait_until_xfer_cancelled(elt->xfer);
375                 }
376                 break;
377             } else if (len == 0) { /* we only count a zero-length read as EOF */
378                 break;
379             }
380         }
381
382         /* write the buffer fully */
383         if (full_write(wfd, buf, len) < len) {
384             if (!elt->cancelled) {
385                 xfer_cancel_with_error(elt,
386                     _("Could not write to fd %d: %s"), wfd, strerror(errno));
387                 wait_until_xfer_cancelled(elt->xfer);
388             }
389             break;
390         }
391     }
392
393     if (elt->cancelled && elt->expect_eof)
394         xfer_element_drain_fd(rfd);
395
396     /* close the read fd.  If it's not at EOF, then upstream will get EPIPE, which will hopefully
397      * kill it and complete the cancellation */
398     close_read_fd(self);
399
400     /* close the fd we've been writing, as an EOF signal to downstream */
401     close_write_fd(self);
402
403     amfree(buf);
404 }
405
406 static void
407 read_and_push(
408     XferElementGlue *self)
409 {
410     XferElement *elt = XFER_ELEMENT(self);
411     int fd = get_read_fd(self);
412
413     while (!elt->cancelled) {
414         char *buf = g_malloc(GLUE_BUFFER_SIZE);
415         size_t len;
416
417         /* read a buffer from upstream */
418         len = full_read(fd, buf, GLUE_BUFFER_SIZE);
419         if (len < GLUE_BUFFER_SIZE) {
420             if (errno) {
421                 if (!elt->cancelled) {
422                     int saved_errno = errno;
423                     xfer_cancel_with_error(elt,
424                         _("Error reading from fd %d: %s"), fd, strerror(saved_errno));
425                     g_debug("element-glue: error reading from fd %d: %s",
426                             fd, strerror(saved_errno));
427                     wait_until_xfer_cancelled(elt->xfer);
428                 }
429                 amfree(buf);
430                 break;
431             } else if (len == 0) { /* we only count a zero-length read as EOF */
432                 amfree(buf);
433                 break;
434             }
435         }
436
437         xfer_element_push_buffer(elt->downstream, buf, len);
438     }
439
440     if (elt->cancelled && elt->expect_eof)
441         xfer_element_drain_fd(fd);
442
443     /* send an EOF indication downstream */
444     xfer_element_push_buffer(elt->downstream, NULL, 0);
445
446     /* close the read fd, since it's at EOF */
447     close_read_fd(self);
448 }
449
450 static void
451 pull_and_push(XferElementGlue *self)
452 {
453     XferElement *elt = XFER_ELEMENT(self);
454     gboolean eof_sent = FALSE;
455
456     while (!elt->cancelled) {
457         char *buf;
458         size_t len;
459
460         /* get a buffer from upstream */
461         buf = xfer_element_pull_buffer(elt->upstream, &len);
462
463         /* and push it downstream */
464         xfer_element_push_buffer(elt->downstream, buf, len);
465
466         if (!buf) {
467             eof_sent = TRUE;
468             break;
469         }
470     }
471
472     if (elt->cancelled && elt->expect_eof)
473         xfer_element_drain_buffers(elt->upstream);
474
475     if (!eof_sent)
476         xfer_element_push_buffer(elt->downstream, NULL, 0);
477 }
478
479 static gpointer
480 worker_thread(
481     gpointer data)
482 {
483     XferElement *elt = XFER_ELEMENT(data);
484     XferElementGlue *self = XFER_ELEMENT_GLUE(data);
485
486     switch (mech_pair(elt->input_mech, elt->output_mech)) {
487     case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
488         read_and_write(self);
489         break;
490
491     case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
492     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
493         read_and_push(self);
494         break;
495
496     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
497     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
498         pull_and_write(self);
499         break;
500
501     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
502         pull_and_push(self);
503         break;
504
505     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
506     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
507         if ((self->output_data_socket = do_directtcp_connect(self,
508                                     elt->downstream->input_listen_addrs)) == -1)
509             break;
510         self->write_fdp = &self->output_data_socket;
511         read_and_write(self);
512         break;
513
514     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
515         if ((self->output_data_socket = do_directtcp_connect(self,
516                                     elt->downstream->input_listen_addrs)) == -1)
517             break;
518         self->write_fdp = &self->output_data_socket;
519         pull_and_write(self);
520         break;
521
522     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
523     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
524         if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
525             break;
526         self->read_fdp = &self->input_data_socket;
527         read_and_write(self);
528         break;
529
530     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
531         if ((self->input_data_socket = do_directtcp_accept(self,
532                                             &self->input_listen_socket)) == -1)
533             break;
534         self->read_fdp = &self->input_data_socket;
535         read_and_push(self);
536         break;
537
538     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
539     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
540     case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
541     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
542     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
543     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
544     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
545     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
546     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
547     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
548     default:
549         g_assert_not_reached();
550         break;
551
552     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
553     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
554         if ((self->output_data_socket = do_directtcp_accept(self,
555                                             &self->output_listen_socket)) == -1)
556             break;
557         self->write_fdp = &self->output_data_socket;
558         read_and_write(self);
559         break;
560
561     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
562     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
563         if ((self->input_data_socket = do_directtcp_connect(self,
564                                     elt->upstream->output_listen_addrs)) == -1)
565             break;
566         self->read_fdp = &self->input_data_socket;
567         read_and_write(self);
568         break;
569
570     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
571         if ((self->input_data_socket = do_directtcp_connect(self,
572                                     elt->upstream->output_listen_addrs)) == -1)
573             break;
574         self->read_fdp = &self->input_data_socket;
575         read_and_push(self);
576         break;
577
578     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
579         if ((self->output_data_socket = do_directtcp_accept(self,
580                                             &self->output_listen_socket)) == -1)
581             break;
582         self->write_fdp = &self->output_data_socket;
583         pull_and_write(self);
584         break;
585
586     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
587         /* TODO: use async accept's here to avoid order dependency */
588         if ((self->output_data_socket = do_directtcp_accept(self,
589                                             &self->output_listen_socket)) == -1)
590             break;
591         self->write_fdp = &self->output_data_socket;
592         if ((self->input_data_socket = do_directtcp_accept(self,
593                                             &self->input_listen_socket)) == -1)
594             break;
595         self->read_fdp = &self->input_data_socket;
596         read_and_write(self);
597         break;
598
599     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
600         /* TODO: use async connects and select() to avoid order dependency here */
601         if ((self->input_data_socket = do_directtcp_connect(self,
602                                     elt->upstream->output_listen_addrs)) == -1)
603             break;
604         self->read_fdp = &self->input_data_socket;
605         if ((self->output_data_socket = do_directtcp_connect(self,
606                                     elt->downstream->input_listen_addrs)) == -1)
607             break;
608         self->write_fdp = &self->output_data_socket;
609         read_and_write(self);
610         break;
611     }
612
613     send_xfer_done(self);
614
615     return NULL;
616 }
617
618 /*
619  * Implementation
620  */
621
622 static gboolean
623 setup_impl(
624     XferElement *elt)
625 {
626     XferElementGlue *self = (XferElementGlue *)elt;
627     gboolean need_ring = FALSE;
628     gboolean need_listen_input = FALSE;
629     gboolean need_listen_output = FALSE;
630
631     g_assert(elt->input_mech != XFER_MECH_NONE);
632     g_assert(elt->output_mech != XFER_MECH_NONE);
633     g_assert(elt->input_mech != elt->output_mech);
634
635     self->read_fdp = NULL;
636     self->write_fdp = NULL;
637     self->on_push = PUSH_INVALID;
638     self->on_pull = PULL_INVALID;
639     self->need_thread = FALSE;
640
641     switch (mech_pair(elt->input_mech, elt->output_mech)) {
642     case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
643         /* thread will read from one fd and write to the other */
644         self->read_fdp = &neighboring_element_fd;
645         self->write_fdp = &neighboring_element_fd;
646         self->need_thread = TRUE;
647         break;
648
649     case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
650         /* thread will read from one fd and call push_buffer downstream */
651         self->read_fdp = &neighboring_element_fd;
652         self->need_thread = TRUE;
653         break;
654
655     case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
656         self->read_fdp = &neighboring_element_fd;
657         self->on_pull = PULL_FROM_FD;
658         break;
659
660     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
661         /* thread will connect for output, then read from fd and write to the
662          * socket. */
663         self->read_fdp = &neighboring_element_fd;
664         self->need_thread = TRUE;
665         break;
666
667     case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
668         /* thread will accept output conn, then read from upstream and write to socket */
669         self->read_fdp = &neighboring_element_fd;
670         self->need_thread = TRUE;
671         need_listen_output = TRUE;
672         break;
673
674     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
675         make_pipe(self);
676         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
677         self->pipe[1] = -1; /* upstream will close this for us */
678         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
679         self->pipe[0] = -1; /* downstream will close this for us */
680         break;
681
682     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
683         /* thread will read from pipe and call downstream's push_buffer */
684         make_pipe(self);
685         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
686         self->pipe[1] = -1; /* upstream will close this for us */
687         self->read_fdp = &self->pipe[0];
688         self->need_thread = TRUE;
689         break;
690
691     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
692         make_pipe(self);
693         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
694         self->pipe[1] = -1; /* upstream will close this for us */
695         self->on_pull = PULL_FROM_FD;
696         self->read_fdp = &self->pipe[0];
697         break;
698
699     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
700         /* thread will connect for output, then read from pipe and write to socket */
701         make_pipe(self);
702         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
703         self->pipe[1] = -1; /* upstream will close this for us */
704         self->read_fdp = &self->pipe[0];
705         self->need_thread = TRUE;
706         break;
707
708     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
709         /* thread will accept output conn, then read from pipe and write to socket */
710         make_pipe(self);
711         g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
712         self->pipe[1] = -1; /* upstream will close this for us */
713         self->read_fdp = &self->pipe[0];
714         self->need_thread = TRUE;
715         need_listen_output = TRUE;
716         break;
717
718     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
719         make_pipe(self);
720         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
721         self->pipe[0] = -1; /* downstream will close this for us */
722         self->on_push = PUSH_TO_FD;
723         self->write_fdp = &self->pipe[1];
724         break;
725
726     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
727         self->on_push = PUSH_TO_FD;
728         self->write_fdp = &neighboring_element_fd;
729         break;
730
731     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
732         self->on_push = PUSH_TO_RING_BUFFER;
733         self->on_pull = PULL_FROM_RING_BUFFER;
734         need_ring = TRUE;
735         break;
736
737     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
738         /* push will connect for output first */
739         self->on_push = PUSH_TO_FD | PUSH_CONNECT_FIRST;
740         break;
741
742     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
743         /* push will accept for output first */
744         self->on_push = PUSH_TO_FD | PUSH_ACCEPT_FIRST;
745         need_listen_output = TRUE;
746         break;
747
748     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
749         /* thread will pull from upstream and write to pipe */
750         make_pipe(self);
751         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
752         self->pipe[0] = -1; /* downstream will close this for us */
753         self->write_fdp = &self->pipe[1];
754         self->need_thread = TRUE;
755         break;
756
757     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
758         /* thread will pull from upstream and write to downstream */
759         self->write_fdp = &neighboring_element_fd;
760         self->need_thread = TRUE;
761         break;
762
763     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
764         /* thread will pull from upstream and push to downstream */
765         self->need_thread = TRUE;
766         break;
767
768     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
769         /* thread will connect for output, then pull from upstream and write to socket */
770         self->need_thread = TRUE;
771         break;
772
773     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
774         /* thread will accept for output, then pull from upstream and write to socket */
775         self->need_thread = TRUE;
776         need_listen_output = TRUE;
777         break;
778
779     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
780         /* thread will accept for input, then read from socket and write to pipe */
781         make_pipe(self);
782         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
783         self->pipe[0] = -1; /* downstream will close this for us */
784         self->write_fdp = &self->pipe[1];
785         self->need_thread = TRUE;
786         need_listen_input = TRUE;
787         break;
788
789     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
790         /* thread will accept for input, then read from socket and write to downstream */
791         self->write_fdp = &neighboring_element_fd;
792         self->need_thread = TRUE;
793         need_listen_input = TRUE;
794         break;
795
796     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
797         /* thread will accept for input, then read from socket and push downstream */
798         self->need_thread = TRUE;
799         need_listen_input = TRUE;
800         break;
801
802     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
803         /* first pull will accept for input, then read from socket */
804         self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
805         need_listen_input = TRUE;
806         break;
807
808     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
809         /* thread will accept on both sides, then copy from socket to socket */
810         self->need_thread = TRUE;
811         need_listen_input = TRUE;
812         need_listen_output = TRUE;
813         break;
814
815     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
816         /* thread will connect for input, then read from socket and write to pipe */
817         make_pipe(self);
818         g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
819         self->pipe[0] = -1; /* downstream will close this for us */
820         self->write_fdp = &self->pipe[1];
821         self->need_thread = TRUE;
822         break;
823
824     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
825         /* thread will connect for input, then read from socket and write to downstream */
826         self->write_fdp = &neighboring_element_fd;
827         self->need_thread = TRUE;
828         break;
829
830     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
831         /* thread will connect for input, then read from socket and push downstream */
832         self->need_thread = TRUE;
833         break;
834
835     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
836         /* first pull will connect for input, then read from socket */
837         self->on_pull = PULL_FROM_FD | PULL_CONNECT_FIRST;
838         break;
839
840     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
841         /* thread will connect on both sides, then copy from socket to socket */
842         self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
843         self->need_thread = TRUE;
844         break;
845
846     default:
847         g_assert_not_reached();
848         break;
849     }
850
851     /* set up ring if desired */
852     if (need_ring) {
853         self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
854         self->ring_used_sem = amsemaphore_new_with_value(0);
855         self->ring_free_sem = amsemaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
856     }
857
858     if (need_listen_input) {
859         if (!do_directtcp_listen(elt,
860                     &self->input_listen_socket, &elt->input_listen_addrs))
861             return FALSE;
862     }
863     if (need_listen_output) {
864         if (!do_directtcp_listen(elt,
865                     &self->output_listen_socket, &elt->output_listen_addrs))
866             return FALSE;
867     }
868
869     return TRUE;
870 }
871
872 static gboolean
873 start_impl(
874     XferElement *elt)
875 {
876     XferElementGlue *self = (XferElementGlue *)elt;
877
878     if (self->need_thread)
879         self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL);
880
881     /* we're active if we have a thread that will eventually die */
882     return self->need_thread;
883 }
884
885 static gpointer
886 pull_buffer_impl(
887     XferElement *elt,
888     size_t *size)
889 {
890     XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
891
892     /* accept first, if required */
893     if (self->on_pull & PULL_ACCEPT_FIRST) {
894         /* don't accept the next time around */
895         self->on_pull &= ~PULL_ACCEPT_FIRST;
896
897         if (elt->cancelled) {
898             *size = 0;
899             return NULL;
900         }
901
902         if ((self->input_data_socket = do_directtcp_accept(self,
903                                             &self->input_listen_socket)) == -1) {
904             /* do_directtcp_accept already signalled an error; xfer
905              * is cancelled */
906             *size = 0;
907             return NULL;
908         }
909
910         /* read from this new socket */
911         self->read_fdp = &self->input_data_socket;
912     }
913
914     /* or connect first, if required */
915     if (self->on_pull & PULL_CONNECT_FIRST) {
916         /* don't connect the next time around */
917         self->on_pull &= ~PULL_CONNECT_FIRST;
918
919         if (elt->cancelled) {
920             *size = 0;
921             return NULL;
922         }
923
924         if ((self->input_data_socket = do_directtcp_connect(self,
925                                     elt->upstream->output_listen_addrs)) == -1) {
926             /* do_directtcp_connect already signalled an error; xfer
927              * is cancelled */
928             *size = 0;
929             return NULL;
930         }
931
932         /* read from this new socket */
933         self->read_fdp = &self->input_data_socket;
934     }
935
936     switch (self->on_pull) {
937         case PULL_FROM_RING_BUFFER: {
938             gpointer buf;
939
940             if (elt->cancelled) {
941                 /* the finalize method will empty the ring buffer */
942                 *size = 0;
943                 return NULL;
944             }
945
946             /* make sure there's at least one element available */
947             amsemaphore_down(self->ring_used_sem);
948
949             /* get it */
950             buf = self->ring[self->ring_tail].buf;
951             *size = self->ring[self->ring_tail].size;
952             self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
953
954             /* and mark this element as free to be overwritten */
955             amsemaphore_up(self->ring_free_sem);
956
957             return buf;
958         }
959
960         case PULL_FROM_FD: {
961             int fd = get_read_fd(self);
962             char *buf;
963             ssize_t len;
964
965             /* if the fd is already closed, it's possible upstream bailed out
966              * so quickly that we didn't even get a look at the fd */
967             if (elt->cancelled || fd == -1) {
968                 if (fd != -1) {
969                     if (elt->expect_eof)
970                         xfer_element_drain_fd(fd);
971
972                     close_read_fd(self);
973                 }
974
975                 *size = 0;
976                 return NULL;
977             }
978
979             buf = g_malloc(GLUE_BUFFER_SIZE);
980
981             /* read from upstream */
982             len = full_read(fd, buf, GLUE_BUFFER_SIZE);
983             if (len < GLUE_BUFFER_SIZE) {
984                 if (errno) {
985                     if (!elt->cancelled) {
986                         xfer_cancel_with_error(elt,
987                             _("Error reading from fd %d: %s"), fd, strerror(errno));
988                         wait_until_xfer_cancelled(elt->xfer);
989                     }
990
991                     /* return an EOF */
992                     amfree(buf);
993                     len = 0;
994
995                     /* and finish off the upstream */
996                     if (elt->expect_eof) {
997                         xfer_element_drain_fd(fd);
998                     }
999                     close_read_fd(self);
1000                 } else if (len == 0) {
1001                     /* EOF */
1002                     g_free(buf);
1003                     buf = NULL;
1004                     *size = 0;
1005
1006                     /* signal EOF to downstream */
1007                     close_read_fd(self);
1008                 }
1009             }
1010
1011             *size = (size_t)len;
1012
1013             return buf;
1014         }
1015
1016         default:
1017         case PULL_INVALID:
1018             g_assert_not_reached();
1019             return NULL;
1020     }
1021 }
1022
1023 static void
1024 push_buffer_impl(
1025     XferElement *elt,
1026     gpointer buf,
1027     size_t len)
1028 {
1029     XferElementGlue *self = (XferElementGlue *)elt;
1030
1031     /* accept first, if required */
1032     if (self->on_push & PUSH_ACCEPT_FIRST) {
1033         /* don't accept the next time around */
1034         self->on_push &= ~PUSH_ACCEPT_FIRST;
1035
1036         if (elt->cancelled) {
1037             return;
1038         }
1039
1040         if ((self->output_data_socket = do_directtcp_accept(self,
1041                                             &self->output_listen_socket)) == -1) {
1042             /* do_directtcp_accept already signalled an error; xfer
1043              * is cancelled */
1044             return;
1045         }
1046
1047         /* write to this new socket */
1048         self->write_fdp = &self->output_data_socket;
1049     }
1050
1051     /* or connect first, if required */
1052     if (self->on_push & PUSH_CONNECT_FIRST) {
1053         /* don't accept the next time around */
1054         self->on_push &= ~PUSH_CONNECT_FIRST;
1055
1056         if (elt->cancelled) {
1057             return;
1058         }
1059
1060         if ((self->output_data_socket = do_directtcp_connect(self,
1061                                     elt->downstream->input_listen_addrs)) == -1) {
1062             /* do_directtcp_connect already signalled an error; xfer
1063              * is cancelled */
1064             return;
1065         }
1066
1067         /* read from this new socket */
1068         self->write_fdp = &self->output_data_socket;
1069     }
1070
1071     switch (self->on_push) {
1072         case PUSH_TO_RING_BUFFER:
1073             /* just drop packets if the transfer has been cancelled */
1074             if (elt->cancelled) {
1075                 amfree(buf);
1076                 return;
1077             }
1078
1079             /* make sure there's at least one element free */
1080             amsemaphore_down(self->ring_free_sem);
1081
1082             /* set it */
1083             self->ring[self->ring_head].buf = buf;
1084             self->ring[self->ring_head].size = len;
1085             self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
1086
1087             /* and mark this element as available for reading */
1088             amsemaphore_up(self->ring_used_sem);
1089
1090             return;
1091
1092         case PUSH_TO_FD: {
1093             int fd = get_write_fd(self);
1094
1095             /* if the fd is already closed, it's possible upstream bailed out
1096              * so quickly that we didn't even get a look at the fd.  In this
1097              * case we can assume the xfer has been cancelled and just discard
1098              * the data. */
1099             if (fd == -1)
1100                 return;
1101
1102             if (elt->cancelled) {
1103                 if (!elt->expect_eof || !buf) {
1104                     close_write_fd(self);
1105
1106                     /* hack to ensure we won't close the fd again, if we get another push */
1107                     elt->expect_eof = TRUE;
1108                 }
1109
1110                 amfree(buf);
1111
1112                 return;
1113             }
1114
1115             /* write the full buffer to the fd, or close on EOF */
1116             if (buf) {
1117                 if (full_write(fd, buf, len) < len) {
1118                     if (!elt->cancelled) {
1119                         xfer_cancel_with_error(elt,
1120                             _("Error writing to fd %d: %s"), fd, strerror(errno));
1121                         wait_until_xfer_cancelled(elt->xfer);
1122                     }
1123                     /* nothing special to do to handle a cancellation */
1124                 }
1125                 amfree(buf);
1126             } else {
1127                 close_write_fd(self);
1128             }
1129
1130             return;
1131         }
1132
1133         default:
1134         case PUSH_INVALID:
1135             g_assert_not_reached();
1136             break;
1137     }
1138 }
1139
1140 static void
1141 instance_init(
1142     XferElementGlue *self)
1143 {
1144     XferElement *elt = (XferElement *)self;
1145     elt->can_generate_eof = TRUE;
1146     self->pipe[0] = self->pipe[1] = -1;
1147     self->input_listen_socket = -1;
1148     self->output_listen_socket = -1;
1149     self->input_data_socket = -1;
1150     self->output_data_socket = -1;
1151     self->read_fd = -1;
1152     self->write_fd = -1;
1153 }
1154
1155 static void
1156 finalize_impl(
1157     GObject * obj_self)
1158 {
1159     XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
1160
1161     /* first make sure the worker thread has finished up */
1162     if (self->thread)
1163         g_thread_join(self->thread);
1164
1165     /* close our pipes and fd's if they're still open */
1166     if (self->pipe[0] != -1) close(self->pipe[0]);
1167     if (self->pipe[1] != -1) close(self->pipe[1]);
1168     if (self->input_data_socket != -1) close(self->input_data_socket);
1169     if (self->output_data_socket != -1) close(self->output_data_socket);
1170     if (self->input_listen_socket != -1) close(self->input_listen_socket);
1171     if (self->output_listen_socket != -1) close(self->output_listen_socket);
1172     if (self->read_fd != -1) close(self->read_fd);
1173     if (self->write_fd != -1) close(self->write_fd);
1174
1175     if (self->ring) {
1176         /* empty the ring buffer, ignoring syncronization issues */
1177         while (self->ring_used_sem->value) {
1178             if (self->ring[self->ring_tail].buf)
1179                 amfree(self->ring[self->ring_tail].buf);
1180             self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
1181         }
1182
1183         amfree(self->ring);
1184         amsemaphore_free(self->ring_used_sem);
1185         amsemaphore_free(self->ring_free_sem);
1186     }
1187
1188     /* chain up */
1189     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1190 }
1191
1192 static xfer_element_mech_pair_t _pairs[] = {
1193     { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1194     { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1195     { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1196     { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1197     { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1198
1199     { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */
1200     { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/
1201     { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */
1202     { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/
1203     { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1204
1205     { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */
1206     { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1207     { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */
1208     { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1209     { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
1210
1211     { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */
1212     { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1213     { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */
1214     { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1215     { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
1216
1217     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1218     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1219     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1220     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1221     { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
1222
1223     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1224     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
1225     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
1226     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
1227     { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy  */
1228
1229     /* terminator */
1230     { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1231 };
1232 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
1233
1234 static void
1235 class_init(
1236     XferElementGlueClass * selfc)
1237 {
1238     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1239     GObjectClass *goc = G_OBJECT_CLASS(selfc);
1240
1241     klass->setup = setup_impl;
1242     klass->start = start_impl;
1243     klass->push_buffer = push_buffer_impl;
1244     klass->pull_buffer = pull_buffer_impl;
1245
1246     klass->perl_class = "Amanda::Xfer::Element::Glue";
1247     klass->mech_pairs = xfer_element_glue_mech_pairs;
1248
1249     goc->finalize = finalize_impl;
1250
1251     parent_class = g_type_class_peek_parent(selfc);
1252 }
1253
1254 GType
1255 xfer_element_glue_get_type (void)
1256 {
1257     static GType type = 0;
1258
1259     if G_UNLIKELY(type == 0) {
1260         static const GTypeInfo info = {
1261             sizeof (XferElementGlueClass),
1262             (GBaseInitFunc) NULL,
1263             (GBaseFinalizeFunc) NULL,
1264             (GClassInitFunc) class_init,
1265             (GClassFinalizeFunc) NULL,
1266             NULL /* class_data */,
1267             sizeof (XferElementGlue),
1268             0 /* n_preallocs */,
1269             (GInstanceInitFunc) instance_init,
1270             NULL
1271         };
1272
1273         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
1274     }
1275
1276     return type;
1277 }
1278
1279 /* create an element of this class; prototype is in xfer-element.h */
1280 XferElement *
1281 xfer_element_glue(void)
1282 {
1283     XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
1284     XferElement *elt = XFER_ELEMENT(self);
1285
1286     return elt;
1287 }