Imported Upstream version 3.3.2
[debian/amanda] / xfer-src / xfer-test.c
1 /*
2  * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
3  * 
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  * 
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  * 
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  * 
17  * Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  *
20  * Author: Dustin J. Mitchell <dustin@zmanda.com>
21  */
22
23 #include "amanda.h"
24 #include "amxfer.h"
25 #include "glib-util.h"
26 #include "testutils.h"
27 #include "event.h"
28 #include "simpleprng.h"
29 #include "sockaddr-util.h"
30
31 /* Having tests repeat exactly is an advantage, so we use a hard-coded
32  * random seed. */
33 #define RANDOM_SEED 0xf00d
34
35 /*
36  * XferElement subclasses
37  *
38  * This file defines a few "private" element classes that each have only one
39  * mechanism pair.  These classes are then used to test all of the possible
40  * combinations of glue.
41  */
42
43 /* constants to determine the total amount of data to be transfered; EXTRA is
44  * to test out partial-block handling; it should be prime. */
45 #define TEST_BLOCK_SIZE 32768
46 #define TEST_BLOCK_COUNT 10
47 #define TEST_BLOCK_EXTRA 97
48 #define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA)
49
50 /* READFD */
51
52 static GType xfer_source_readfd_get_type(void);
53 #define XFER_SOURCE_READFD_TYPE (xfer_source_readfd_get_type())
54 #define XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd)
55 #define XFER_SOURCE_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd const)
56 #define XFER_SOURCE_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_readfd_get_type(), XferSourceReadfdClass)
57 #define IS_XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_readfd_get_type ())
58 #define XFER_SOURCE_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_readfd_get_type(), XferSourceReadfdClass)
59
60 typedef struct XferSourceReadfd {
61     XferElement __parent__;
62
63     int write_fd;
64     GThread *thread;
65     simpleprng_state_t prng;
66 } XferSourceReadfd;
67
68 typedef struct {
69     XferElementClass __parent__;
70 } XferSourceReadfdClass;
71
72 static gpointer
73 source_readfd_thread(
74     gpointer data)
75 {
76     XferSourceReadfd *self = XFER_SOURCE_READFD(data);
77     char buf[TEST_XFER_SIZE];
78     int fd = self->write_fd;
79
80     simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
81
82     if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
83         error("error in full_write(): %s", strerror(errno));
84     }
85
86     close(fd);
87
88     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
89
90     return NULL;
91 }
92
93 static gboolean
94 source_readfd_setup_impl(
95     XferElement *elt)
96 {
97     XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
98     int p[2];
99
100     simpleprng_seed(&self->prng, RANDOM_SEED);
101
102     if (pipe(p) < 0)
103         g_critical("Error from pipe(): %s", strerror(errno));
104
105     self->write_fd = p[1];
106     g_assert(xfer_element_swap_output_fd(elt, p[0]) == -1);
107
108     return TRUE;
109 }
110
111 static gboolean
112 source_readfd_start_impl(
113     XferElement *elt)
114 {
115     XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
116     self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL);
117
118     return TRUE;
119 }
120
121 static void
122 source_readfd_class_init(
123     XferSourceReadfdClass * klass)
124 {
125     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
126     static xfer_element_mech_pair_t mech_pairs[] = {
127         { XFER_MECH_NONE, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) },
128         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
129     };
130
131     xec->setup = source_readfd_setup_impl;
132     xec->start = source_readfd_start_impl;
133     xec->mech_pairs = mech_pairs;
134 }
135
136 GType
137 xfer_source_readfd_get_type (void)
138 {
139     static GType type = 0;
140
141     if G_UNLIKELY(type == 0) {
142         static const GTypeInfo info = {
143             sizeof (XferSourceReadfdClass),
144             (GBaseInitFunc) NULL,
145             (GBaseFinalizeFunc) NULL,
146             (GClassInitFunc) source_readfd_class_init,
147             (GClassFinalizeFunc) NULL,
148             NULL /* class_data */,
149             sizeof (XferSourceReadfd),
150             0 /* n_preallocs */,
151             (GInstanceInitFunc) NULL,
152             NULL
153         };
154
155         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceReadfd", &info, 0);
156     }
157
158     return type;
159 }
160
161 /* WRITEFD */
162
163 static GType xfer_source_writefd_get_type(void);
164 #define XFER_SOURCE_WRITEFD_TYPE (xfer_source_writefd_get_type())
165 #define XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd)
166 #define XFER_SOURCE_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd const)
167 #define XFER_SOURCE_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_writefd_get_type(), XferSourceWritefdClass)
168 #define IS_XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_writefd_get_type ())
169 #define XFER_SOURCE_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_writefd_get_type(), XferSourceWritefdClass)
170
171 typedef struct XferSourceWritefd {
172     XferElement __parent__;
173
174     GThread *thread;
175     simpleprng_state_t prng;
176 } XferSourceWritefd;
177
178 typedef struct {
179     XferElementClass __parent__;
180 } XferSourceWritefdClass;
181
182 static gpointer
183 source_writefd_thread(
184     gpointer data)
185 {
186     XferSourceWritefd *self = XFER_SOURCE_WRITEFD(data);
187     XferElement *elt = XFER_ELEMENT(data);
188     char buf[TEST_XFER_SIZE];
189     int fd = xfer_element_swap_input_fd(elt->downstream, -1);
190
191     /* this shouldn't happen, although non-test elements handle it gracefully */
192     g_assert(fd != -1);
193
194     simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
195
196     if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
197         error("error in full_write(): %s", strerror(errno));
198     }
199
200     close(fd);
201
202     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
203
204     return NULL;
205 }
206
207 static gboolean
208 source_writefd_start_impl(
209     XferElement *elt)
210 {
211     XferSourceWritefd *self = XFER_SOURCE_WRITEFD(elt);
212
213     simpleprng_seed(&self->prng, RANDOM_SEED);
214
215     self->thread = g_thread_create(source_writefd_thread, (gpointer)self, FALSE, NULL);
216
217     return TRUE;
218 }
219
220 static void
221 source_writefd_class_init(
222     XferSourceWritefdClass * klass)
223 {
224     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
225     static xfer_element_mech_pair_t mech_pairs[] = {
226         { XFER_MECH_NONE, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) },
227         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
228     };
229
230     xec->start = source_writefd_start_impl;
231     xec->mech_pairs = mech_pairs;
232 }
233
234 GType
235 xfer_source_writefd_get_type (void)
236 {
237     static GType type = 0;
238
239     if G_UNLIKELY(type == 0) {
240         static const GTypeInfo info = {
241             sizeof (XferSourceWritefdClass),
242             (GBaseInitFunc) NULL,
243             (GBaseFinalizeFunc) NULL,
244             (GClassInitFunc) source_writefd_class_init,
245             (GClassFinalizeFunc) NULL,
246             NULL /* class_data */,
247             sizeof (XferSourceWritefd),
248             0 /* n_preallocs */,
249             (GInstanceInitFunc) NULL,
250             NULL
251         };
252
253         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceWritefd", &info, 0);
254     }
255
256     return type;
257 }
258
259 /* PUSH_BUFFER */
260
261 static GType xfer_source_push_get_type(void);
262 #define XFER_SOURCE_PUSH_TYPE (xfer_source_push_get_type())
263 #define XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush)
264 #define XFER_SOURCE_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush const)
265 #define XFER_SOURCE_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_push_get_type(), XferSourcePushClass)
266 #define IS_XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_push_get_type ())
267 #define XFER_SOURCE_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_push_get_type(), XferSourcePushClass)
268
269 typedef struct XferSourcePush {
270     XferElement __parent__;
271
272     GThread *thread;
273     simpleprng_state_t prng;
274 } XferSourcePush;
275
276 typedef struct {
277     XferElementClass __parent__;
278 } XferSourcePushClass;
279
280 static gpointer
281 source_push_thread(
282     gpointer data)
283 {
284     XferSourcePush *self = XFER_SOURCE_PUSH(data);
285     char *buf;
286     int i;
287
288     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
289         buf = g_malloc(TEST_BLOCK_SIZE);
290         simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
291         xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_SIZE);
292         buf = NULL;
293     }
294
295     /* send a smaller block */
296     buf = g_malloc(TEST_BLOCK_EXTRA);
297     simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
298     xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_EXTRA);
299     buf = NULL;
300
301     /* send EOF */
302     xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, NULL, 0);
303
304     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
305
306     return NULL;
307 }
308
309 static gboolean
310 source_push_start_impl(
311     XferElement *elt)
312 {
313     XferSourcePush *self = XFER_SOURCE_PUSH(elt);
314
315     simpleprng_seed(&self->prng, RANDOM_SEED);
316
317     self->thread = g_thread_create(source_push_thread, (gpointer)self, FALSE, NULL);
318
319     return TRUE;
320 }
321
322 static void
323 source_push_class_init(
324     XferSourcePushClass * klass)
325 {
326     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
327     static xfer_element_mech_pair_t mech_pairs[] = {
328         { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) },
329         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
330     };
331
332     xec->start = source_push_start_impl;
333     xec->mech_pairs = mech_pairs;
334 }
335
336 GType
337 xfer_source_push_get_type (void)
338 {
339     static GType type = 0;
340
341     if G_UNLIKELY(type == 0) {
342         static const GTypeInfo info = {
343             sizeof (XferSourcePushClass),
344             (GBaseInitFunc) NULL,
345             (GBaseFinalizeFunc) NULL,
346             (GClassInitFunc) source_push_class_init,
347             (GClassFinalizeFunc) NULL,
348             NULL /* class_data */,
349             sizeof (XferSourcePush),
350             0 /* n_preallocs */,
351             (GInstanceInitFunc) NULL,
352             NULL
353         };
354
355         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePush", &info, 0);
356     }
357
358     return type;
359 }
360
361 /* PULL_BUFFER */
362
363 static GType xfer_source_pull_get_type(void);
364 #define XFER_SOURCE_PULL_TYPE (xfer_source_pull_get_type())
365 #define XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull)
366 #define XFER_SOURCE_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull const)
367 #define XFER_SOURCE_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_pull_get_type(), XferSourcePullClass)
368 #define IS_XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_pull_get_type ())
369 #define XFER_SOURCE_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_pull_get_type(), XferSourcePullClass)
370
371 typedef struct XferSourcePull {
372     XferElement __parent__;
373
374     gint nbuffers;
375     GThread *thread;
376     simpleprng_state_t prng;
377 } XferSourcePull;
378
379 typedef struct {
380     XferElementClass __parent__;
381 } XferSourcePullClass;
382
383 static gpointer
384 source_pull_pull_buffer_impl(
385     XferElement *elt,
386     size_t *size)
387 {
388     XferSourcePull *self = XFER_SOURCE_PULL(elt);
389     char *buf;
390     size_t bufsiz;
391
392     if (self->nbuffers > TEST_BLOCK_COUNT) {
393         *size = 0;
394         return NULL;
395     }
396     bufsiz = (self->nbuffers != TEST_BLOCK_COUNT)? TEST_BLOCK_SIZE : TEST_BLOCK_EXTRA;
397
398     self->nbuffers++;
399
400     buf = g_malloc(bufsiz);
401     simpleprng_fill_buffer(&self->prng, buf, bufsiz);
402     *size = bufsiz;
403     return buf;
404 }
405
406 static gboolean
407 source_pull_setup_impl(
408     XferElement *elt)
409 {
410     XferSourcePull *self = XFER_SOURCE_PULL(elt);
411
412     simpleprng_seed(&self->prng, RANDOM_SEED);
413
414     return TRUE;
415 }
416
417 static void
418 source_pull_class_init(
419     XferSourcePullClass * klass)
420 {
421     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
422     static xfer_element_mech_pair_t mech_pairs[] = {
423         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) },
424         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
425     };
426
427     xec->pull_buffer = source_pull_pull_buffer_impl;
428     xec->setup = source_pull_setup_impl;
429     xec->mech_pairs = mech_pairs;
430 }
431
432 GType
433 xfer_source_pull_get_type (void)
434 {
435     static GType type = 0;
436
437     if G_UNLIKELY(type == 0) {
438         static const GTypeInfo info = {
439             sizeof (XferSourcePullClass),
440             (GBaseInitFunc) NULL,
441             (GBaseFinalizeFunc) NULL,
442             (GClassInitFunc) source_pull_class_init,
443             (GClassFinalizeFunc) NULL,
444             NULL /* class_data */,
445             sizeof (XferSourcePull),
446             0 /* n_preallocs */,
447             (GInstanceInitFunc) NULL,
448             NULL
449         };
450
451         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePull", &info, 0);
452     }
453
454     return type;
455 }
456
457 /* LISTEN */
458
459 static GType xfer_source_listen_get_type(void);
460 #define XFER_SOURCE_LISTEN_TYPE (xfer_source_listen_get_type())
461 #define XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen)
462 #define XFER_SOURCE_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen const)
463 #define XFER_SOURCE_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_listen_get_type(), XferSourceListenClass)
464 #define IS_XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_listen_get_type ())
465 #define XFER_SOURCE_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_listen_get_type(), XferSourceListenClass)
466
467 typedef struct XferSourceListen {
468     XferElement __parent__;
469
470     GThread *thread;
471     simpleprng_state_t prng;
472 } XferSourceListen;
473
474 typedef struct {
475     XferElementClass __parent__;
476 } XferSourceListenClass;
477
478 static gpointer
479 source_listen_thread(
480     gpointer data)
481 {
482     XferSourceListen *self = XFER_SOURCE_LISTEN(data);
483     XferElement *elt = XFER_ELEMENT(self);
484     DirectTCPAddr *addrs;
485     int sock;
486     char *buf;
487     int i;
488
489     /* set up the sockaddr -- IPv4 only */
490     addrs = elt->downstream->input_listen_addrs;
491     g_assert(addrs != NULL);
492
493     tu_dbg("making data connection to %s\n", str_sockaddr(addrs));
494     sock = socket(SU_GET_FAMILY(addrs), SOCK_STREAM, 0);
495     if (sock < 0) {
496         error("socket(): %s", strerror(errno));
497     }
498     if (connect(sock, (struct sockaddr *)addrs, SS_LEN(addrs)) < 0) {
499         error("connect(): %s", strerror(errno));
500     }
501
502     tu_dbg("connected to %s\n", str_sockaddr(addrs));
503
504     buf = g_malloc(TEST_BLOCK_SIZE);
505     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
506         simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
507         if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
508             error("error in full_write(): %s", strerror(errno));
509         }
510     }
511
512     /* send a smaller block */
513     simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
514     if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
515         error("error in full_write(): %s", strerror(errno));
516     }
517     g_free(buf);
518
519     /* send EOF by closing the socket */
520     close(sock);
521
522     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
523
524     return NULL;
525 }
526
527 static gboolean
528 source_listen_start_impl(
529     XferElement *elt)
530 {
531     XferSourceListen *self = XFER_SOURCE_LISTEN(elt);
532
533     simpleprng_seed(&self->prng, RANDOM_SEED);
534
535     self->thread = g_thread_create(source_listen_thread, (gpointer)self, FALSE, NULL);
536
537     return TRUE;
538 }
539
540 static void
541 source_listen_class_init(
542     XferSourceListenClass * klass)
543 {
544     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
545     static xfer_element_mech_pair_t mech_pairs[] = {
546         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) },
547         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
548     };
549
550     xec->start = source_listen_start_impl;
551     xec->mech_pairs = mech_pairs;
552 }
553
554 GType
555 xfer_source_listen_get_type (void)
556 {
557     static GType type = 0;
558
559     if G_UNLIKELY(type == 0) {
560         static const GTypeInfo info = {
561             sizeof (XferSourceListenClass),
562             (GBaseInitFunc) NULL,
563             (GBaseFinalizeFunc) NULL,
564             (GClassInitFunc) source_listen_class_init,
565             (GClassFinalizeFunc) NULL,
566             NULL /* class_data */,
567             sizeof (XferSourceListen),
568             0 /* n_preallocs */,
569             (GInstanceInitFunc) NULL,
570             NULL
571         };
572
573         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceListen", &info, 0);
574     }
575
576     return type;
577 }
578
579 /* CONNECT */
580
581 static GType xfer_source_connect_get_type(void);
582 #define XFER_SOURCE_CONNECT_TYPE (xfer_source_connect_get_type())
583 #define XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect)
584 #define XFER_SOURCE_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect const)
585 #define XFER_SOURCE_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_connect_get_type(), XferSourceConnectClass)
586 #define IS_XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_connect_get_type ())
587 #define XFER_SOURCE_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_connect_get_type(), XferSourceConnectClass)
588
589 typedef struct XferSourceConnect {
590     XferElement __parent__;
591
592     int listen_socket;
593
594     GThread *thread;
595     simpleprng_state_t prng;
596 } XferSourceConnect;
597
598 typedef struct {
599     XferElementClass __parent__;
600 } XferSourceConnectClass;
601
602 static gpointer
603 source_connect_thread(
604     gpointer data)
605 {
606     XferSourceConnect *self = XFER_SOURCE_CONNECT(data);
607     int sock;
608     char *buf;
609     int i;
610
611     g_assert(self->listen_socket != -1);
612
613     if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
614         xfer_cancel_with_error(XFER_ELEMENT(self),
615             _("Error accepting incoming connection: %s"), strerror(errno));
616         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
617         return NULL;
618     }
619
620     /* close the listening socket now, for good measure */
621     close(self->listen_socket);
622     self->listen_socket = -1;
623
624     tu_dbg("connection accepted\n");
625
626     buf = g_malloc(TEST_BLOCK_SIZE);
627     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
628         simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
629         if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
630             error("error in full_write(): %s", strerror(errno));
631         }
632     }
633
634     /* send a smaller block */
635     simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
636     if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
637         error("error in full_write(): %s", strerror(errno));
638     }
639     g_free(buf);
640
641     /* send EOF by closing the socket */
642     close(sock);
643
644     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
645
646     return NULL;
647 }
648
649 static gboolean
650 source_connect_setup_impl(
651     XferElement *elt)
652 {
653     XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
654     sockaddr_union addr;
655     DirectTCPAddr *addrs;
656     socklen_t len;
657     int sock;
658
659     /* set up self->listen_socket and set elt->output_listen_addrs */
660     sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
661     if (sock < 0)
662         error("socket(): %s", strerror(errno));
663
664     if (listen(sock, 1) < 0)
665         error("listen(): %s", strerror(errno));
666
667     len = sizeof(addr);
668     if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
669         error("getsockname(): %s", strerror(errno));
670     g_assert(SU_GET_FAMILY(&addr) == AF_INET);
671
672     addrs = g_new0(DirectTCPAddr, 2);
673     copy_sockaddr(&addrs[0], &addr);
674     elt->output_listen_addrs = addrs;
675
676     return TRUE;
677 }
678
679 static gboolean
680 source_connect_start_impl(
681     XferElement *elt)
682 {
683     XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
684
685     simpleprng_seed(&self->prng, RANDOM_SEED);
686
687     self->thread = g_thread_create(source_connect_thread, (gpointer)self, FALSE, NULL);
688
689     return TRUE;
690 }
691
692 static void
693 source_connect_class_init(
694     XferSourceConnectClass * klass)
695 {
696     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
697     static xfer_element_mech_pair_t mech_pairs[] = {
698         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) },
699         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
700     };
701
702     xec->setup = source_connect_setup_impl;
703     xec->start = source_connect_start_impl;
704     xec->mech_pairs = mech_pairs;
705 }
706
707 GType
708 xfer_source_connect_get_type (void)
709 {
710     static GType type = 0;
711
712     if G_UNLIKELY(type == 0) {
713         static const GTypeInfo info = {
714             sizeof (XferSourceConnectClass),
715             (GBaseInitFunc) NULL,
716             (GBaseFinalizeFunc) NULL,
717             (GClassInitFunc) source_connect_class_init,
718             (GClassFinalizeFunc) NULL,
719             NULL /* class_data */,
720             sizeof (XferSourceConnect),
721             0 /* n_preallocs */,
722             (GInstanceInitFunc) NULL,
723             NULL
724         };
725
726         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceConnect", &info, 0);
727     }
728
729     return type;
730 }
731
732 /* READFD */
733
734 static GType xfer_dest_readfd_get_type(void);
735 #define XFER_DEST_READFD_TYPE (xfer_dest_readfd_get_type())
736 #define XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd)
737 #define XFER_DEST_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd const)
738 #define XFER_DEST_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_readfd_get_type(), XferDestReadfdClass)
739 #define IS_XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_readfd_get_type ())
740 #define XFER_DEST_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_readfd_get_type(), XferDestReadfdClass)
741
742 typedef struct XferDestReadfd {
743     XferElement __parent__;
744
745     GThread *thread;
746     simpleprng_state_t prng;
747 } XferDestReadfd;
748
749 typedef struct {
750     XferElementClass __parent__;
751 } XferDestReadfdClass;
752
753 static gpointer
754 dest_readfd_thread(
755     gpointer data)
756 {
757     XferDestReadfd *self = XFER_DEST_READFD(data);
758     XferElement *elt = XFER_ELEMENT(data);
759     char buf[TEST_XFER_SIZE];
760     size_t remaining;
761     int fd = xfer_element_swap_output_fd(elt->upstream, -1);
762
763     /* this shouldn't happen, although non-test elements handle it gracefully */
764     g_assert(fd != -1);
765
766     remaining = sizeof(buf);
767     while (remaining) {
768         ssize_t nread;
769         if ((nread = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
770             error("error in read(): %s", strerror(errno));
771         }
772         remaining -= nread;
773     }
774
775     /* we should be at EOF here */
776     if (read(fd, buf, 10) != 0)
777         g_critical("too much data entering XferDestReadfd");
778
779     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
780         g_critical("data entering XferDestReadfd does not match");
781
782     close(fd);
783
784     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
785
786     return NULL;
787 }
788
789 static gboolean
790 dest_readfd_start_impl(
791     XferElement *elt)
792 {
793     XferDestReadfd *self = XFER_DEST_READFD(elt);
794
795     simpleprng_seed(&self->prng, RANDOM_SEED);
796
797     self->thread = g_thread_create(dest_readfd_thread, (gpointer)self, FALSE, NULL);
798
799     return TRUE;
800 }
801
802 static void
803 dest_readfd_class_init(
804     XferDestReadfdClass * klass)
805 {
806     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
807     static xfer_element_mech_pair_t mech_pairs[] = {
808         { XFER_MECH_READFD, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
809         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
810     };
811
812     xec->start = dest_readfd_start_impl;
813     xec->mech_pairs = mech_pairs;
814 }
815
816 GType
817 xfer_dest_readfd_get_type (void)
818 {
819     static GType type = 0;
820
821     if G_UNLIKELY(type == 0) {
822         static const GTypeInfo info = {
823             sizeof (XferDestReadfdClass),
824             (GBaseInitFunc) NULL,
825             (GBaseFinalizeFunc) NULL,
826             (GClassInitFunc) dest_readfd_class_init,
827             (GClassFinalizeFunc) NULL,
828             NULL /* class_data */,
829             sizeof (XferDestReadfd),
830             0 /* n_preallocs */,
831             (GInstanceInitFunc) NULL,
832             NULL
833         };
834
835         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestReadfd", &info, 0);
836     }
837
838     return type;
839 }
840
841 /* WRITEFD */
842
843 static GType xfer_dest_writefd_get_type(void);
844 #define XFER_DEST_WRITEFD_TYPE (xfer_dest_writefd_get_type())
845 #define XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd)
846 #define XFER_DEST_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd const)
847 #define XFER_DEST_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_writefd_get_type(), XferDestWritefdClass)
848 #define IS_XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_writefd_get_type ())
849 #define XFER_DEST_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_writefd_get_type(), XferDestWritefdClass)
850
851 typedef struct XferDestWritefd {
852     XferElement __parent__;
853
854     int read_fd;
855     GThread *thread;
856     simpleprng_state_t prng;
857 } XferDestWritefd;
858
859 typedef struct {
860     XferElementClass __parent__;
861 } XferDestWritefdClass;
862
863 static gpointer
864 dest_writefd_thread(
865     gpointer data)
866 {
867     XferDestWritefd *self = XFER_DEST_WRITEFD(data);
868     char buf[TEST_XFER_SIZE];
869     size_t remaining;
870     int fd = self->read_fd;
871
872     remaining = sizeof(buf);
873     while (remaining) {
874         ssize_t nwrite;
875         if ((nwrite = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
876             error("error in read(): %s", strerror(errno));
877         }
878         remaining -= nwrite;
879     }
880
881     /* we should be at EOF here */
882     if (read(fd, buf, 10) != 0)
883         g_critical("too much data entering XferDestWritefd");
884
885     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
886         g_critical("data entering XferDestWritefd does not match");
887
888     close(fd);
889
890     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
891
892     return NULL;
893 }
894
895 static gboolean
896 dest_writefd_setup_impl(
897     XferElement *elt)
898 {
899     XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
900     int p[2];
901
902     simpleprng_seed(&self->prng, RANDOM_SEED);
903
904     if (pipe(p) < 0)
905         g_critical("Error from pipe(): %s", strerror(errno));
906
907     self->read_fd = p[0];
908     g_assert(xfer_element_swap_input_fd(elt, p[1]) == -1);
909
910     return TRUE;
911 }
912
913 static gboolean
914 dest_writefd_start_impl(
915     XferElement *elt)
916 {
917     XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
918     self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL);
919
920     return TRUE;
921 }
922
923 static void
924 dest_writefd_class_init(
925     XferDestWritefdClass * klass)
926 {
927     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
928     static xfer_element_mech_pair_t mech_pairs[] = {
929         { XFER_MECH_WRITEFD, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
930         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
931     };
932
933     xec->setup = dest_writefd_setup_impl;
934     xec->start = dest_writefd_start_impl;
935     xec->mech_pairs = mech_pairs;
936 }
937
938 GType
939 xfer_dest_writefd_get_type (void)
940 {
941     static GType type = 0;
942
943     if G_UNLIKELY(type == 0) {
944         static const GTypeInfo info = {
945             sizeof (XferDestWritefdClass),
946             (GBaseInitFunc) NULL,
947             (GBaseFinalizeFunc) NULL,
948             (GClassInitFunc) dest_writefd_class_init,
949             (GClassFinalizeFunc) NULL,
950             NULL /* class_data */,
951             sizeof (XferDestWritefd),
952             0 /* n_preallocs */,
953             (GInstanceInitFunc) NULL,
954             NULL
955         };
956
957         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestWritefd", &info, 0);
958     }
959
960     return type;
961 }
962
963 /* PUSH_BUFFER */
964
965 static GType xfer_dest_push_get_type(void);
966 #define XFER_DEST_PUSH_TYPE (xfer_dest_push_get_type())
967 #define XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush)
968 #define XFER_DEST_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush const)
969 #define XFER_DEST_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_push_get_type(), XferDestPushClass)
970 #define IS_XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_push_get_type ())
971 #define XFER_DEST_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_push_get_type(), XferDestPushClass)
972
973 typedef struct XferDestPush {
974     XferElement __parent__;
975
976     char *buf;
977     size_t bufpos;
978
979     GThread *thread;
980     simpleprng_state_t prng;
981 } XferDestPush;
982
983 typedef struct {
984     XferElementClass __parent__;
985 } XferDestPushClass;
986
987 static void
988 dest_push_push_buffer_impl(
989     XferElement *elt,
990     gpointer buf,
991     size_t size)
992 {
993     XferDestPush *self = XFER_DEST_PUSH(elt);
994
995     if (buf == NULL) {
996         /* if we're at EOF, verify we got the right bytes */
997         g_assert(self->bufpos == TEST_XFER_SIZE);
998         if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE))
999             g_critical("data entering XferDestPush does not match");
1000         g_free(self->buf);
1001         return;
1002     }
1003
1004     g_assert(self->bufpos + size <= TEST_XFER_SIZE);
1005     memcpy(self->buf + self->bufpos, buf, size);
1006     self->bufpos += size;
1007 }
1008
1009 static gboolean
1010 dest_push_setup_impl(
1011     XferElement *elt)
1012 {
1013     XferDestPush *self = XFER_DEST_PUSH(elt);
1014
1015     self->buf = g_malloc(TEST_XFER_SIZE);
1016     simpleprng_seed(&self->prng, RANDOM_SEED);
1017
1018     return TRUE;
1019 }
1020
1021 static void
1022 dest_push_class_init(
1023     XferDestPushClass * klass)
1024 {
1025     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1026     static xfer_element_mech_pair_t mech_pairs[] = {
1027         { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(0) },
1028         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1029     };
1030
1031     xec->push_buffer = dest_push_push_buffer_impl;
1032     xec->setup = dest_push_setup_impl;
1033     xec->mech_pairs = mech_pairs;
1034 }
1035
1036 GType
1037 xfer_dest_push_get_type (void)
1038 {
1039     static GType type = 0;
1040
1041     if G_UNLIKELY(type == 0) {
1042         static const GTypeInfo info = {
1043             sizeof (XferDestPushClass),
1044             (GBaseInitFunc) NULL,
1045             (GBaseFinalizeFunc) NULL,
1046             (GClassInitFunc) dest_push_class_init,
1047             (GClassFinalizeFunc) NULL,
1048             NULL /* class_data */,
1049             sizeof (XferDestPush),
1050             0 /* n_preallocs */,
1051             (GInstanceInitFunc) NULL,
1052             NULL
1053         };
1054
1055         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPush", &info, 0);
1056     }
1057
1058     return type;
1059 }
1060
1061 /* PULL_BUFFER */
1062
1063 static GType xfer_dest_pull_get_type(void);
1064 #define XFER_DEST_PULL_TYPE (xfer_dest_pull_get_type())
1065 #define XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull)
1066 #define XFER_DEST_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull const)
1067 #define XFER_DEST_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_pull_get_type(), XferDestPullClass)
1068 #define IS_XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_pull_get_type ())
1069 #define XFER_DEST_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_pull_get_type(), XferDestPullClass)
1070
1071 typedef struct XferDestPull {
1072     XferElement __parent__;
1073
1074     GThread *thread;
1075     simpleprng_state_t prng;
1076 } XferDestPull;
1077
1078 typedef struct {
1079     XferElementClass __parent__;
1080 } XferDestPullClass;
1081
1082 static gpointer
1083 dest_pull_thread(
1084     gpointer data)
1085 {
1086     XferDestPull *self = XFER_DEST_PULL(data);
1087     char fullbuf[TEST_XFER_SIZE];
1088     char *buf;
1089     size_t bufpos = 0;
1090     size_t size;
1091
1092     while ((buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size))) {
1093         g_assert(bufpos + size <= TEST_XFER_SIZE);
1094         memcpy(fullbuf + bufpos, buf, size);
1095         bufpos += size;
1096     }
1097
1098     /* we're at EOF, so verify we got the right bytes */
1099     g_assert(bufpos == TEST_XFER_SIZE);
1100     if (!simpleprng_verify_buffer(&self->prng, fullbuf, TEST_XFER_SIZE))
1101         g_critical("data entering XferDestPull does not match");
1102
1103     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1104
1105     return NULL;
1106 }
1107
1108 static gboolean
1109 dest_pull_start_impl(
1110     XferElement *elt)
1111 {
1112     XferDestPull *self = XFER_DEST_PULL(elt);
1113
1114     simpleprng_seed(&self->prng, RANDOM_SEED);
1115
1116     self->thread = g_thread_create(dest_pull_thread, (gpointer)self, FALSE, NULL);
1117
1118     return TRUE;
1119 }
1120
1121 static void
1122 dest_pull_class_init(
1123     XferDestPullClass * klass)
1124 {
1125     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1126     static xfer_element_mech_pair_t mech_pairs[] = {
1127         { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
1128         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1129     };
1130
1131     xec->start = dest_pull_start_impl;
1132     xec->mech_pairs = mech_pairs;
1133 }
1134
1135 GType
1136 xfer_dest_pull_get_type (void)
1137 {
1138     static GType type = 0;
1139
1140     if G_UNLIKELY(type == 0) {
1141         static const GTypeInfo info = {
1142             sizeof (XferDestPullClass),
1143             (GBaseInitFunc) NULL,
1144             (GBaseFinalizeFunc) NULL,
1145             (GClassInitFunc) dest_pull_class_init,
1146             (GClassFinalizeFunc) NULL,
1147             NULL /* class_data */,
1148             sizeof (XferDestPull),
1149             0 /* n_preallocs */,
1150             (GInstanceInitFunc) NULL,
1151             NULL
1152         };
1153
1154         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPull", &info, 0);
1155     }
1156
1157     return type;
1158 }
1159
1160 /* LISTEN */
1161
1162 static GType xfer_dest_listen_get_type(void);
1163 #define XFER_DEST_LISTEN_TYPE (xfer_dest_listen_get_type())
1164 #define XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen)
1165 #define XFER_DEST_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen const)
1166 #define XFER_DEST_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_listen_get_type(), XferDestListenClass)
1167 #define IS_XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_listen_get_type ())
1168 #define XFER_DEST_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_listen_get_type(), XferDestListenClass)
1169
1170 typedef struct XferDestListen {
1171     XferElement __parent__;
1172
1173     int listen_socket;
1174
1175     GThread *thread;
1176     simpleprng_state_t prng;
1177 } XferDestListen;
1178
1179 typedef struct {
1180     XferElementClass __parent__;
1181 } XferDestListenClass;
1182
1183 static gpointer
1184 dest_listen_thread(
1185     gpointer data)
1186 {
1187     XferDestListen *self = XFER_DEST_LISTEN(data);
1188     char *buf;
1189     size_t bytes = 0;
1190     int sock;
1191
1192     g_assert(self->listen_socket != -1);
1193
1194     if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
1195         xfer_cancel_with_error(XFER_ELEMENT(self),
1196             _("Error accepting incoming connection: %s"), strerror(errno));
1197         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1198         return NULL;
1199     }
1200
1201     /* close the listening socket now, for good measure */
1202     close(self->listen_socket);
1203     self->listen_socket = -1;
1204
1205     /* read from the socket until EOF or all of the data is read.  We try to
1206      * read one extra byte - if we get it, then upstream sent too much data */
1207     buf = g_malloc(TEST_XFER_SIZE+1);
1208     bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1209     g_assert(bytes == TEST_XFER_SIZE);
1210     close(sock);
1211
1212     /* we're at EOF, so verify we got the right bytes */
1213     g_assert(bytes == TEST_XFER_SIZE);
1214     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1215         g_critical("data entering XferDestListen does not match");
1216
1217     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1218
1219     return NULL;
1220 }
1221
1222 static gboolean
1223 dest_listen_setup_impl(
1224     XferElement *elt)
1225 {
1226     XferDestListen *self = XFER_DEST_LISTEN(elt);
1227     sockaddr_union addr;
1228     DirectTCPAddr *addrs;
1229     socklen_t len;
1230     int sock;
1231
1232     /* set up self->listen_socket and set elt->input_listen_addrs */
1233     sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
1234     if (sock < 0)
1235         error("socket(): %s", strerror(errno));
1236
1237     if (listen(sock, 1) < 0)
1238         error("listen(): %s", strerror(errno));
1239
1240     len = sizeof(addr);
1241     if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
1242         error("getsockname(): %s", strerror(errno));
1243     g_assert(SU_GET_FAMILY(&addr) == AF_INET);
1244
1245     addrs = g_new0(DirectTCPAddr, 2);
1246     copy_sockaddr(&addrs[0], &addr);
1247     elt->input_listen_addrs = addrs;
1248
1249     return TRUE;
1250 }
1251
1252 static gboolean
1253 dest_listen_start_impl(
1254     XferElement *elt)
1255 {
1256     XferDestListen *self = XFER_DEST_LISTEN(elt);
1257
1258     simpleprng_seed(&self->prng, RANDOM_SEED);
1259
1260     self->thread = g_thread_create(dest_listen_thread, (gpointer)self, FALSE, NULL);
1261
1262     return TRUE;
1263 }
1264
1265 static void
1266 dest_listen_class_init(
1267     XferDestListenClass * klass)
1268 {
1269     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1270     static xfer_element_mech_pair_t mech_pairs[] = {
1271         { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
1272         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1273     };
1274
1275     xec->setup = dest_listen_setup_impl;
1276     xec->start = dest_listen_start_impl;
1277     xec->mech_pairs = mech_pairs;
1278 }
1279
1280 GType
1281 xfer_dest_listen_get_type (void)
1282 {
1283     static GType type = 0;
1284
1285     if G_UNLIKELY(type == 0) {
1286         static const GTypeInfo info = {
1287             sizeof (XferDestListenClass),
1288             (GBaseInitFunc) NULL,
1289             (GBaseFinalizeFunc) NULL,
1290             (GClassInitFunc) dest_listen_class_init,
1291             (GClassFinalizeFunc) NULL,
1292             NULL /* class_data */,
1293             sizeof (XferDestListen),
1294             0 /* n_preallocs */,
1295             (GInstanceInitFunc) NULL,
1296             NULL
1297         };
1298
1299         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestListen", &info, 0);
1300     }
1301
1302     return type;
1303 }
1304
1305 /* CONNET */
1306
1307 static GType xfer_dest_connect_get_type(void);
1308 #define XFER_DEST_CONNECT_TYPE (xfer_dest_connect_get_type())
1309 #define XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect)
1310 #define XFER_DEST_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect const)
1311 #define XFER_DEST_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_connect_get_type(), XferDestConnectClass)
1312 #define IS_XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_connect_get_type ())
1313 #define XFER_DEST_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_connect_get_type(), XferDestConnectClass)
1314
1315 typedef struct XferDestConnect {
1316     XferElement __parent__;
1317
1318     int connect_socket;
1319
1320     GThread *thread;
1321     simpleprng_state_t prng;
1322 } XferDestConnect;
1323
1324 typedef struct {
1325     XferElementClass __parent__;
1326 } XferDestConnectClass;
1327
1328 static gpointer
1329 dest_connect_thread(
1330     gpointer data)
1331 {
1332     XferDestConnect *self = XFER_DEST_CONNECT(data);
1333     XferElement *elt = XFER_ELEMENT(self);
1334     DirectTCPAddr *addrs;
1335     sockaddr_union addr;
1336     char *buf;
1337     size_t bytes = 0;
1338     int sock;
1339
1340     /* set up the sockaddr -- IPv4 only */
1341     SU_INIT(&addr, AF_INET);
1342     addrs = elt->upstream->output_listen_addrs;
1343     g_assert(addrs != NULL);
1344     copy_sockaddr(&addr, addrs);
1345
1346     tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
1347     sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
1348     if (sock < 0) {
1349         error("socket(): %s", strerror(errno));
1350     }
1351     if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
1352         error("connect(): %s", strerror(errno));
1353     }
1354
1355     tu_dbg("connected to %s\n", str_sockaddr(&addr));
1356
1357     /* read from the socket until EOF or all of the data is read.  We try to
1358      * read one extra byte - if we get it, then upstream sent too much data */
1359     buf = g_malloc(TEST_XFER_SIZE+1);
1360     bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1361     g_assert(bytes == TEST_XFER_SIZE);
1362     close(sock);
1363
1364     /* we're at EOF, so verify we got the right bytes */
1365     g_assert(bytes == TEST_XFER_SIZE);
1366     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1367         g_critical("data entering XferDestConnect does not match");
1368
1369     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1370
1371     return NULL;
1372 }
1373
1374 static gboolean
1375 dest_connect_start_impl(
1376     XferElement *elt)
1377 {
1378     XferDestConnect *self = XFER_DEST_CONNECT(elt);
1379
1380     simpleprng_seed(&self->prng, RANDOM_SEED);
1381
1382     self->thread = g_thread_create(dest_connect_thread, (gpointer)self, FALSE, NULL);
1383
1384     return TRUE;
1385 }
1386
1387 static void
1388 dest_connect_class_init(
1389     XferDestConnectClass * klass)
1390 {
1391     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1392     static xfer_element_mech_pair_t mech_pairs[] = {
1393         { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
1394         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1395     };
1396
1397     xec->start = dest_connect_start_impl;
1398     xec->mech_pairs = mech_pairs;
1399 }
1400
1401 GType
1402 xfer_dest_connect_get_type (void)
1403 {
1404     static GType type = 0;
1405
1406     if G_UNLIKELY(type == 0) {
1407         static const GTypeInfo info = {
1408             sizeof (XferDestConnectClass),
1409             (GBaseInitFunc) NULL,
1410             (GBaseFinalizeFunc) NULL,
1411             (GClassInitFunc) dest_connect_class_init,
1412             (GClassFinalizeFunc) NULL,
1413             NULL /* class_data */,
1414             sizeof (XferDestConnect),
1415             0 /* n_preallocs */,
1416             (GInstanceInitFunc) NULL,
1417             NULL
1418         };
1419
1420         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestConnect", &info, 0);
1421     }
1422
1423     return type;
1424 }
1425
1426
1427 /*
1428  * Tests
1429  */
1430
1431 static void
1432 test_xfer_generic_callback(
1433     gpointer data G_GNUC_UNUSED,
1434     XMsg *msg,
1435     Xfer *xfer)
1436 {
1437     tu_dbg("Received message %s\n", xmsg_repr(msg));
1438
1439     switch (msg->type) {
1440         case XMSG_DONE:
1441             /* are we done? */
1442             if (xfer->status == XFER_DONE) {
1443                 tu_dbg("all elements are done!\n");
1444                 g_main_loop_quit(default_main_loop());
1445             }
1446             break;
1447
1448         default:
1449             break;
1450     }
1451 }
1452
1453 /****
1454  * Run a simple transfer with some xor filters
1455  */
1456
1457 static int
1458 test_xfer_simple(void)
1459 {
1460     unsigned int i;
1461     GSource *src;
1462     XferElement *elements[] = {
1463         xfer_source_random(100*1024, RANDOM_SEED),
1464         xfer_filter_xor('d'),
1465         xfer_filter_xor('d'),
1466         xfer_dest_null(RANDOM_SEED),
1467     };
1468
1469     Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1470     src = xfer_get_source(xfer);
1471     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1472     g_source_attach(src, NULL);
1473     tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1474
1475     /* unreference the elements */
1476     for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1477         g_object_unref(elements[i]);
1478         g_assert(G_OBJECT(elements[i])->ref_count == 1);
1479         elements[i] = NULL;
1480     }
1481
1482     xfer_start(xfer, 0, 0);
1483
1484     g_main_loop_run(default_main_loop());
1485     g_assert(xfer->status == XFER_DONE);
1486
1487     xfer_unref(xfer);
1488
1489     return 1;
1490 }
1491
1492 /****
1493  * Run a transfer between two files, with or without filters
1494  */
1495
1496 static int
1497 test_xfer_files(gboolean add_filters)
1498 {
1499     unsigned int i;
1500     unsigned int elts;
1501     GSource *src;
1502     char *in_filename = __FILE__;
1503     char *out_filename = "xfer-test.tmp"; /* current directory is writeable */
1504     int rfd, wfd;
1505     Xfer *xfer;
1506     XferElement *elements[4];
1507
1508     rfd = open(in_filename, O_RDONLY, 0);
1509     if (rfd < 0)
1510         g_critical("Could not open '%s': %s", in_filename, strerror(errno));
1511
1512     wfd = open(out_filename, O_WRONLY|O_CREAT, 0777);
1513     if (wfd < 0)
1514         g_critical("Could not open '%s': %s", out_filename, strerror(errno));
1515
1516     elts = 0;
1517     elements[elts++] = xfer_source_fd(rfd);
1518     if (add_filters) {
1519         elements[elts++] = xfer_filter_xor(0xab);
1520         elements[elts++] = xfer_filter_xor(0xab);
1521     }
1522     elements[elts++] = xfer_dest_fd(wfd);
1523
1524     xfer = xfer_new(elements, elts);
1525     src = xfer_get_source(xfer);
1526     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1527     g_source_attach(src, NULL);
1528     tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1529
1530     /* unreference the elements */
1531     for (i = 0; i < elts; i++) {
1532         g_object_unref(elements[i]);
1533         g_assert(G_OBJECT(elements[i])->ref_count == 1);
1534         elements[i] = NULL;
1535     }
1536
1537     xfer_start(xfer, 0, 0);
1538
1539     g_main_loop_run(default_main_loop());
1540     g_assert(xfer->status == XFER_DONE);
1541
1542     xfer_unref(xfer);
1543
1544     unlink(out_filename); /* ignore any errors */
1545
1546     return 1;
1547 }
1548
1549 static int
1550 test_xfer_files_simple(void)
1551 {
1552     return test_xfer_files(FALSE);
1553 }
1554
1555 static int
1556 test_xfer_files_filter(void)
1557 {
1558     return test_xfer_files(TRUE);
1559 }
1560
1561 /*****
1562  * test each possible combination of source and destination mechansim
1563  */
1564
1565 static int
1566 test_glue_combo(
1567     XferElement *source,
1568     XferElement *dest)
1569 {
1570     unsigned int i;
1571     GSource *src;
1572     XferElement *elements[] = { source, dest };
1573
1574     Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1575     src = xfer_get_source(xfer);
1576     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1577     g_source_attach(src, NULL);
1578
1579     /* unreference the elements */
1580     for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1581         g_object_unref(elements[i]);
1582         g_assert(G_OBJECT(elements[i])->ref_count == 1);
1583         elements[i] = NULL;
1584     }
1585
1586     xfer_start(xfer, 0, 0);
1587
1588     g_main_loop_run(default_main_loop());
1589     g_assert(xfer->status == XFER_DONE);
1590
1591     xfer_unref(xfer);
1592
1593     return 1;
1594 }
1595
1596 #define make_test_glue(n, s, d) static int n(void) \
1597 {\
1598     return test_glue_combo((XferElement *)g_object_new(s, NULL), \
1599                            (XferElement *)g_object_new(d, NULL)); \
1600 }
1601 make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE)
1602 make_test_glue(test_glue_READFD_WRITEFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1603 make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE)
1604 make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE)
1605 make_test_glue(test_glue_READFD_LISTEN, XFER_SOURCE_READFD_TYPE, XFER_DEST_LISTEN_TYPE)
1606 make_test_glue(test_glue_READFD_CONNECT, XFER_SOURCE_READFD_TYPE, XFER_DEST_CONNECT_TYPE)
1607 make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE)
1608 make_test_glue(test_glue_WRITEFD_WRITEFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1609 make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE)
1610 make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE)
1611 make_test_glue(test_glue_WRITEFD_LISTEN, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_LISTEN_TYPE)
1612 make_test_glue(test_glue_WRITEFD_CONNECT, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_CONNECT_TYPE)
1613 make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE)
1614 make_test_glue(test_glue_PUSH_WRITEFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
1615 make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE)
1616 make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE)
1617 make_test_glue(test_glue_PUSH_LISTEN, XFER_SOURCE_PUSH_TYPE, XFER_DEST_LISTEN_TYPE)
1618 make_test_glue(test_glue_PUSH_CONNECT, XFER_SOURCE_PUSH_TYPE, XFER_DEST_CONNECT_TYPE)
1619 make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE)
1620 make_test_glue(test_glue_PULL_WRITEFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
1621 make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE)
1622 make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE)
1623 make_test_glue(test_glue_PULL_LISTEN, XFER_SOURCE_PULL_TYPE, XFER_DEST_LISTEN_TYPE)
1624 make_test_glue(test_glue_PULL_CONNECT, XFER_SOURCE_PULL_TYPE, XFER_DEST_CONNECT_TYPE)
1625 make_test_glue(test_glue_LISTEN_READFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_READFD_TYPE)
1626 make_test_glue(test_glue_LISTEN_WRITEFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_WRITEFD_TYPE)
1627 make_test_glue(test_glue_LISTEN_PUSH, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PUSH_TYPE)
1628 make_test_glue(test_glue_LISTEN_PULL, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PULL_TYPE)
1629 make_test_glue(test_glue_LISTEN_LISTEN, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_LISTEN_TYPE)
1630 make_test_glue(test_glue_LISTEN_CONNECT, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_CONNECT_TYPE)
1631 make_test_glue(test_glue_CONNECT_READFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_READFD_TYPE)
1632 make_test_glue(test_glue_CONNECT_WRITEFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_WRITEFD_TYPE)
1633 make_test_glue(test_glue_CONNECT_PUSH, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PUSH_TYPE)
1634 make_test_glue(test_glue_CONNECT_PULL, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PULL_TYPE)
1635 make_test_glue(test_glue_CONNECT_LISTEN, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_LISTEN_TYPE)
1636 make_test_glue(test_glue_CONNECT_CONNECT, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_CONNECT_TYPE)
1637
1638 /*
1639  * Main driver
1640  */
1641
1642 int
1643 main(int argc, char **argv)
1644 {
1645     static TestUtilsTest tests[] = {
1646         TU_TEST(test_xfer_simple, 90),
1647         TU_TEST(test_xfer_files_simple, 90),
1648         TU_TEST(test_xfer_files_filter, 90),
1649         TU_TEST(test_glue_READFD_READFD, 90),
1650         TU_TEST(test_glue_READFD_WRITEFD, 90),
1651         TU_TEST(test_glue_READFD_PUSH, 90),
1652         TU_TEST(test_glue_READFD_PULL, 90),
1653         TU_TEST(test_glue_READFD_LISTEN, 90),
1654         TU_TEST(test_glue_READFD_CONNECT, 90),
1655         TU_TEST(test_glue_WRITEFD_READFD, 90),
1656         TU_TEST(test_glue_WRITEFD_WRITEFD, 90),
1657         TU_TEST(test_glue_WRITEFD_PUSH, 90),
1658         TU_TEST(test_glue_WRITEFD_PULL, 90),
1659         TU_TEST(test_glue_WRITEFD_LISTEN, 90),
1660         TU_TEST(test_glue_WRITEFD_CONNECT, 90),
1661         TU_TEST(test_glue_PUSH_READFD, 90),
1662         TU_TEST(test_glue_PUSH_WRITEFD, 90),
1663         TU_TEST(test_glue_PUSH_PUSH, 90),
1664         TU_TEST(test_glue_PUSH_PULL, 90),
1665         TU_TEST(test_glue_PUSH_LISTEN, 90),
1666         TU_TEST(test_glue_PUSH_CONNECT, 90),
1667         TU_TEST(test_glue_PULL_READFD, 90),
1668         TU_TEST(test_glue_PULL_WRITEFD, 90),
1669         TU_TEST(test_glue_PULL_PUSH, 90),
1670         TU_TEST(test_glue_PULL_PULL, 90),
1671         TU_TEST(test_glue_PULL_LISTEN, 90),
1672         TU_TEST(test_glue_PULL_CONNECT, 90),
1673         TU_TEST(test_glue_LISTEN_READFD, 90),
1674         TU_TEST(test_glue_LISTEN_WRITEFD, 90),
1675         TU_TEST(test_glue_LISTEN_PUSH, 90),
1676         TU_TEST(test_glue_LISTEN_PULL, 90),
1677         TU_TEST(test_glue_LISTEN_LISTEN, 90),
1678         TU_TEST(test_glue_LISTEN_CONNECT, 90),
1679         TU_TEST(test_glue_CONNECT_READFD, 90),
1680         TU_TEST(test_glue_CONNECT_WRITEFD, 90),
1681         TU_TEST(test_glue_CONNECT_PUSH, 90),
1682         TU_TEST(test_glue_CONNECT_PULL, 90),
1683         TU_TEST(test_glue_CONNECT_LISTEN, 90),
1684         TU_TEST(test_glue_CONNECT_CONNECT, 90),
1685         TU_END()
1686     };
1687
1688     glib_init();
1689
1690     return testutils_run_tests(argc, argv, tests);
1691 }