Imported Upstream version 3.2.0
[debian/amanda] / xfer-src / xfer-test.c
1 /*
2  * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
3  * 
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  * 
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  * 
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  * 
17  * Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  *
20  * Author: Dustin J. Mitchell <dustin@zmanda.com>
21  */
22
23 #include "amanda.h"
24 #include "amxfer.h"
25 #include "glib-util.h"
26 #include "testutils.h"
27 #include "event.h"
28 #include "simpleprng.h"
29 #include "sockaddr-util.h"
30
31 /* Having tests repeat exactly is an advantage, so we use a hard-coded
32  * random seed. */
33 #define RANDOM_SEED 0xf00d
34
35 /*
36  * XferElement subclasses
37  *
38  * This file defines a few "private" element classes that each have only one
39  * mechanism pair.  These classes are then used to test all of the possible
40  * combinations of glue.
41  */
42
43 /* constants to determine the total amount of data to be transfered; EXTRA is
44  * to test out partial-block handling; it should be prime. */
45 #define TEST_BLOCK_SIZE 32768
46 #define TEST_BLOCK_COUNT 10
47 #define TEST_BLOCK_EXTRA 97
48 #define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA)
49
50 /* READFD */
51
52 static GType xfer_source_readfd_get_type(void);
53 #define XFER_SOURCE_READFD_TYPE (xfer_source_readfd_get_type())
54 #define XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd)
55 #define XFER_SOURCE_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd const)
56 #define XFER_SOURCE_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_readfd_get_type(), XferSourceReadfdClass)
57 #define IS_XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_readfd_get_type ())
58 #define XFER_SOURCE_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_readfd_get_type(), XferSourceReadfdClass)
59
60 typedef struct XferSourceReadfd {
61     XferElement __parent__;
62
63     int write_fd;
64     GThread *thread;
65     simpleprng_state_t prng;
66 } XferSourceReadfd;
67
68 typedef struct {
69     XferElementClass __parent__;
70 } XferSourceReadfdClass;
71
72 static gpointer
73 source_readfd_thread(
74     gpointer data)
75 {
76     XferSourceReadfd *self = XFER_SOURCE_READFD(data);
77     char buf[TEST_XFER_SIZE];
78     int fd = self->write_fd;
79
80     simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
81
82     if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
83         error("error in full_write(): %s", strerror(errno));
84     }
85
86     close(fd);
87
88     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
89
90     return NULL;
91 }
92
93 static gboolean
94 source_readfd_setup_impl(
95     XferElement *elt)
96 {
97     XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
98     int p[2];
99
100     simpleprng_seed(&self->prng, RANDOM_SEED);
101
102     if (pipe(p) < 0)
103         g_critical("Error from pipe(): %s", strerror(errno));
104
105     self->write_fd = p[1];
106     g_assert(xfer_element_swap_output_fd(elt, p[0]) == -1);
107
108     return TRUE;
109 }
110
111 static gboolean
112 source_readfd_start_impl(
113     XferElement *elt)
114 {
115     XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
116     self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL);
117
118     return TRUE;
119 }
120
121 static void
122 source_readfd_class_init(
123     XferSourceReadfdClass * klass)
124 {
125     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
126     static xfer_element_mech_pair_t mech_pairs[] = {
127         { XFER_MECH_NONE, XFER_MECH_READFD, 1, 1},
128         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
129     };
130
131     xec->setup = source_readfd_setup_impl;
132     xec->start = source_readfd_start_impl;
133     xec->mech_pairs = mech_pairs;
134 }
135
136 GType
137 xfer_source_readfd_get_type (void)
138 {
139     static GType type = 0;
140
141     if G_UNLIKELY(type == 0) {
142         static const GTypeInfo info = {
143             sizeof (XferSourceReadfdClass),
144             (GBaseInitFunc) NULL,
145             (GBaseFinalizeFunc) NULL,
146             (GClassInitFunc) source_readfd_class_init,
147             (GClassFinalizeFunc) NULL,
148             NULL /* class_data */,
149             sizeof (XferSourceReadfd),
150             0 /* n_preallocs */,
151             (GInstanceInitFunc) NULL,
152             NULL
153         };
154
155         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceReadfd", &info, 0);
156     }
157
158     return type;
159 }
160
161 /* WRITEFD */
162
163 static GType xfer_source_writefd_get_type(void);
164 #define XFER_SOURCE_WRITEFD_TYPE (xfer_source_writefd_get_type())
165 #define XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd)
166 #define XFER_SOURCE_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd const)
167 #define XFER_SOURCE_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_writefd_get_type(), XferSourceWritefdClass)
168 #define IS_XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_writefd_get_type ())
169 #define XFER_SOURCE_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_writefd_get_type(), XferSourceWritefdClass)
170
171 typedef struct XferSourceWritefd {
172     XferElement __parent__;
173
174     GThread *thread;
175     simpleprng_state_t prng;
176 } XferSourceWritefd;
177
178 typedef struct {
179     XferElementClass __parent__;
180 } XferSourceWritefdClass;
181
182 static gpointer
183 source_writefd_thread(
184     gpointer data)
185 {
186     XferSourceWritefd *self = XFER_SOURCE_WRITEFD(data);
187     XferElement *elt = XFER_ELEMENT(data);
188     char buf[TEST_XFER_SIZE];
189     int fd = xfer_element_swap_input_fd(elt->downstream, -1);
190
191     /* this shouldn't happen, although non-test elements handle it gracefully */
192     g_assert(fd != -1);
193
194     simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
195
196     if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
197         error("error in full_write(): %s", strerror(errno));
198     }
199
200     close(fd);
201
202     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
203
204     return NULL;
205 }
206
207 static gboolean
208 source_writefd_start_impl(
209     XferElement *elt)
210 {
211     XferSourceWritefd *self = XFER_SOURCE_WRITEFD(elt);
212
213     simpleprng_seed(&self->prng, RANDOM_SEED);
214
215     self->thread = g_thread_create(source_writefd_thread, (gpointer)self, FALSE, NULL);
216
217     return TRUE;
218 }
219
220 static void
221 source_writefd_class_init(
222     XferSourceWritefdClass * klass)
223 {
224     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
225     static xfer_element_mech_pair_t mech_pairs[] = {
226         { XFER_MECH_NONE, XFER_MECH_WRITEFD, 1, 1},
227         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
228     };
229
230     xec->start = source_writefd_start_impl;
231     xec->mech_pairs = mech_pairs;
232 }
233
234 GType
235 xfer_source_writefd_get_type (void)
236 {
237     static GType type = 0;
238
239     if G_UNLIKELY(type == 0) {
240         static const GTypeInfo info = {
241             sizeof (XferSourceWritefdClass),
242             (GBaseInitFunc) NULL,
243             (GBaseFinalizeFunc) NULL,
244             (GClassInitFunc) source_writefd_class_init,
245             (GClassFinalizeFunc) NULL,
246             NULL /* class_data */,
247             sizeof (XferSourceWritefd),
248             0 /* n_preallocs */,
249             (GInstanceInitFunc) NULL,
250             NULL
251         };
252
253         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceWritefd", &info, 0);
254     }
255
256     return type;
257 }
258
259 /* PUSH_BUFFER */
260
261 static GType xfer_source_push_get_type(void);
262 #define XFER_SOURCE_PUSH_TYPE (xfer_source_push_get_type())
263 #define XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush)
264 #define XFER_SOURCE_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush const)
265 #define XFER_SOURCE_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_push_get_type(), XferSourcePushClass)
266 #define IS_XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_push_get_type ())
267 #define XFER_SOURCE_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_push_get_type(), XferSourcePushClass)
268
269 typedef struct XferSourcePush {
270     XferElement __parent__;
271
272     GThread *thread;
273     simpleprng_state_t prng;
274 } XferSourcePush;
275
276 typedef struct {
277     XferElementClass __parent__;
278 } XferSourcePushClass;
279
280 static gpointer
281 source_push_thread(
282     gpointer data)
283 {
284     XferSourcePush *self = XFER_SOURCE_PUSH(data);
285     char *buf;
286     int i;
287
288     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
289         buf = g_malloc(TEST_BLOCK_SIZE);
290         simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
291         xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_SIZE);
292         buf = NULL;
293     }
294
295     /* send a smaller block */
296     buf = g_malloc(TEST_BLOCK_EXTRA);
297     simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
298     xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_EXTRA);
299     buf = NULL;
300
301     /* send EOF */
302     xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, NULL, 0);
303
304     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
305
306     return NULL;
307 }
308
309 static gboolean
310 source_push_start_impl(
311     XferElement *elt)
312 {
313     XferSourcePush *self = XFER_SOURCE_PUSH(elt);
314
315     simpleprng_seed(&self->prng, RANDOM_SEED);
316
317     self->thread = g_thread_create(source_push_thread, (gpointer)self, FALSE, NULL);
318
319     return TRUE;
320 }
321
322 static void
323 source_push_class_init(
324     XferSourcePushClass * klass)
325 {
326     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
327     static xfer_element_mech_pair_t mech_pairs[] = {
328         { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, 1, 1},
329         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
330     };
331
332     xec->start = source_push_start_impl;
333     xec->mech_pairs = mech_pairs;
334 }
335
336 GType
337 xfer_source_push_get_type (void)
338 {
339     static GType type = 0;
340
341     if G_UNLIKELY(type == 0) {
342         static const GTypeInfo info = {
343             sizeof (XferSourcePushClass),
344             (GBaseInitFunc) NULL,
345             (GBaseFinalizeFunc) NULL,
346             (GClassInitFunc) source_push_class_init,
347             (GClassFinalizeFunc) NULL,
348             NULL /* class_data */,
349             sizeof (XferSourcePush),
350             0 /* n_preallocs */,
351             (GInstanceInitFunc) NULL,
352             NULL
353         };
354
355         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePush", &info, 0);
356     }
357
358     return type;
359 }
360
361 /* PULL_BUFFER */
362
363 static GType xfer_source_pull_get_type(void);
364 #define XFER_SOURCE_PULL_TYPE (xfer_source_pull_get_type())
365 #define XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull)
366 #define XFER_SOURCE_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull const)
367 #define XFER_SOURCE_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_pull_get_type(), XferSourcePullClass)
368 #define IS_XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_pull_get_type ())
369 #define XFER_SOURCE_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_pull_get_type(), XferSourcePullClass)
370
371 typedef struct XferSourcePull {
372     XferElement __parent__;
373
374     gint nbuffers;
375     GThread *thread;
376     simpleprng_state_t prng;
377 } XferSourcePull;
378
379 typedef struct {
380     XferElementClass __parent__;
381 } XferSourcePullClass;
382
383 static gpointer
384 source_pull_pull_buffer_impl(
385     XferElement *elt,
386     size_t *size)
387 {
388     XferSourcePull *self = XFER_SOURCE_PULL(elt);
389     char *buf;
390     size_t bufsiz;
391
392     if (self->nbuffers > TEST_BLOCK_COUNT) {
393         *size = 0;
394         return NULL;
395     }
396     bufsiz = (self->nbuffers != TEST_BLOCK_COUNT)? TEST_BLOCK_SIZE : TEST_BLOCK_EXTRA;
397
398     self->nbuffers++;
399
400     buf = g_malloc(bufsiz);
401     simpleprng_fill_buffer(&self->prng, buf, bufsiz);
402     *size = bufsiz;
403     return buf;
404 }
405
406 static gboolean
407 source_pull_setup_impl(
408     XferElement *elt)
409 {
410     XferSourcePull *self = XFER_SOURCE_PULL(elt);
411
412     simpleprng_seed(&self->prng, RANDOM_SEED);
413
414     return TRUE;
415 }
416
417 static void
418 source_pull_class_init(
419     XferSourcePullClass * klass)
420 {
421     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
422     static xfer_element_mech_pair_t mech_pairs[] = {
423         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
424         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
425     };
426
427     xec->pull_buffer = source_pull_pull_buffer_impl;
428     xec->setup = source_pull_setup_impl;
429     xec->mech_pairs = mech_pairs;
430 }
431
432 GType
433 xfer_source_pull_get_type (void)
434 {
435     static GType type = 0;
436
437     if G_UNLIKELY(type == 0) {
438         static const GTypeInfo info = {
439             sizeof (XferSourcePullClass),
440             (GBaseInitFunc) NULL,
441             (GBaseFinalizeFunc) NULL,
442             (GClassInitFunc) source_pull_class_init,
443             (GClassFinalizeFunc) NULL,
444             NULL /* class_data */,
445             sizeof (XferSourcePull),
446             0 /* n_preallocs */,
447             (GInstanceInitFunc) NULL,
448             NULL
449         };
450
451         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePull", &info, 0);
452     }
453
454     return type;
455 }
456
457 /* LISTEN */
458
459 static GType xfer_source_listen_get_type(void);
460 #define XFER_SOURCE_LISTEN_TYPE (xfer_source_listen_get_type())
461 #define XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen)
462 #define XFER_SOURCE_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen const)
463 #define XFER_SOURCE_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_listen_get_type(), XferSourceListenClass)
464 #define IS_XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_listen_get_type ())
465 #define XFER_SOURCE_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_listen_get_type(), XferSourceListenClass)
466
467 typedef struct XferSourceListen {
468     XferElement __parent__;
469
470     GThread *thread;
471     simpleprng_state_t prng;
472 } XferSourceListen;
473
474 typedef struct {
475     XferElementClass __parent__;
476 } XferSourceListenClass;
477
478 static gpointer
479 source_listen_thread(
480     gpointer data)
481 {
482     XferSourceListen *self = XFER_SOURCE_LISTEN(data);
483     XferElement *elt = XFER_ELEMENT(self);
484     DirectTCPAddr *addrs;
485     sockaddr_union addr;
486     int sock;
487     char *buf;
488     int i;
489
490     /* set up the sockaddr -- IPv4 only */
491     SU_INIT(&addr, AF_INET);
492     addrs = elt->downstream->input_listen_addrs;
493     g_assert(addrs != NULL);
494     SU_SET_PORT(&addr, addrs->port);
495     ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
496
497     tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
498     sock = socket(AF_INET, SOCK_STREAM, 0);
499     if (sock < 0) {
500         error("socket(): %s", strerror(errno));
501     }
502     if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
503         error("connect(): %s", strerror(errno));
504     }
505
506     tu_dbg("connected to %s\n", str_sockaddr(&addr));
507
508     buf = g_malloc(TEST_BLOCK_SIZE);
509     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
510         simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
511         if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
512             error("error in full_write(): %s", strerror(errno));
513         }
514     }
515
516     /* send a smaller block */
517     simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
518     if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
519         error("error in full_write(): %s", strerror(errno));
520     }
521     g_free(buf);
522
523     /* send EOF by closing the socket */
524     close(sock);
525
526     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
527
528     return NULL;
529 }
530
531 static gboolean
532 source_listen_start_impl(
533     XferElement *elt)
534 {
535     XferSourceListen *self = XFER_SOURCE_LISTEN(elt);
536
537     simpleprng_seed(&self->prng, RANDOM_SEED);
538
539     self->thread = g_thread_create(source_listen_thread, (gpointer)self, FALSE, NULL);
540
541     return TRUE;
542 }
543
544 static void
545 source_listen_class_init(
546     XferSourceListenClass * klass)
547 {
548     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
549     static xfer_element_mech_pair_t mech_pairs[] = {
550         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 1, 0},
551         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
552     };
553
554     xec->start = source_listen_start_impl;
555     xec->mech_pairs = mech_pairs;
556 }
557
558 GType
559 xfer_source_listen_get_type (void)
560 {
561     static GType type = 0;
562
563     if G_UNLIKELY(type == 0) {
564         static const GTypeInfo info = {
565             sizeof (XferSourceListenClass),
566             (GBaseInitFunc) NULL,
567             (GBaseFinalizeFunc) NULL,
568             (GClassInitFunc) source_listen_class_init,
569             (GClassFinalizeFunc) NULL,
570             NULL /* class_data */,
571             sizeof (XferSourceListen),
572             0 /* n_preallocs */,
573             (GInstanceInitFunc) NULL,
574             NULL
575         };
576
577         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceListen", &info, 0);
578     }
579
580     return type;
581 }
582
583 /* CONNECT */
584
585 static GType xfer_source_connect_get_type(void);
586 #define XFER_SOURCE_CONNECT_TYPE (xfer_source_connect_get_type())
587 #define XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect)
588 #define XFER_SOURCE_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect const)
589 #define XFER_SOURCE_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_connect_get_type(), XferSourceConnectClass)
590 #define IS_XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_connect_get_type ())
591 #define XFER_SOURCE_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_connect_get_type(), XferSourceConnectClass)
592
593 typedef struct XferSourceConnect {
594     XferElement __parent__;
595
596     int listen_socket;
597
598     GThread *thread;
599     simpleprng_state_t prng;
600 } XferSourceConnect;
601
602 typedef struct {
603     XferElementClass __parent__;
604 } XferSourceConnectClass;
605
606 static gpointer
607 source_connect_thread(
608     gpointer data)
609 {
610     XferSourceConnect *self = XFER_SOURCE_CONNECT(data);
611     int sock;
612     char *buf;
613     int i;
614
615     g_assert(self->listen_socket != -1);
616
617     if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
618         xfer_cancel_with_error(XFER_ELEMENT(self),
619             _("Error accepting incoming connection: %s"), strerror(errno));
620         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
621         return NULL;
622     }
623
624     /* close the listening socket now, for good measure */
625     close(self->listen_socket);
626     self->listen_socket = -1;
627
628     tu_dbg("connection accepted\n");
629
630     buf = g_malloc(TEST_BLOCK_SIZE);
631     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
632         simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
633         if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
634             error("error in full_write(): %s", strerror(errno));
635         }
636     }
637
638     /* send a smaller block */
639     simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
640     if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
641         error("error in full_write(): %s", strerror(errno));
642     }
643     g_free(buf);
644
645     /* send EOF by closing the socket */
646     close(sock);
647
648     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
649
650     return NULL;
651 }
652
653 static gboolean
654 source_connect_setup_impl(
655     XferElement *elt)
656 {
657     XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
658     sockaddr_union addr;
659     DirectTCPAddr *addrs;
660     socklen_t len;
661     int sock;
662
663     /* set up self->listen_socket and set elt->output_listen_addrs */
664     sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
665     if (sock < 0)
666         error("socket(): %s", strerror(errno));
667
668     if (listen(sock, 1) < 0)
669         error("listen(): %s", strerror(errno));
670
671     len = sizeof(addr);
672     if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
673         error("getsockname(): %s", strerror(errno));
674     g_assert(SU_GET_FAMILY(&addr) == AF_INET);
675
676     addrs = g_new0(DirectTCPAddr, 2);
677     addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
678     addrs[0].port = SU_GET_PORT(&addr);
679     elt->output_listen_addrs = addrs;
680
681     return TRUE;
682 }
683
684 static gboolean
685 source_connect_start_impl(
686     XferElement *elt)
687 {
688     XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
689
690     simpleprng_seed(&self->prng, RANDOM_SEED);
691
692     self->thread = g_thread_create(source_connect_thread, (gpointer)self, FALSE, NULL);
693
694     return TRUE;
695 }
696
697 static void
698 source_connect_class_init(
699     XferSourceConnectClass * klass)
700 {
701     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
702     static xfer_element_mech_pair_t mech_pairs[] = {
703         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 1, 0},
704         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
705     };
706
707     xec->setup = source_connect_setup_impl;
708     xec->start = source_connect_start_impl;
709     xec->mech_pairs = mech_pairs;
710 }
711
712 GType
713 xfer_source_connect_get_type (void)
714 {
715     static GType type = 0;
716
717     if G_UNLIKELY(type == 0) {
718         static const GTypeInfo info = {
719             sizeof (XferSourceConnectClass),
720             (GBaseInitFunc) NULL,
721             (GBaseFinalizeFunc) NULL,
722             (GClassInitFunc) source_connect_class_init,
723             (GClassFinalizeFunc) NULL,
724             NULL /* class_data */,
725             sizeof (XferSourceConnect),
726             0 /* n_preallocs */,
727             (GInstanceInitFunc) NULL,
728             NULL
729         };
730
731         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceConnect", &info, 0);
732     }
733
734     return type;
735 }
736
737 /* READFD */
738
739 static GType xfer_dest_readfd_get_type(void);
740 #define XFER_DEST_READFD_TYPE (xfer_dest_readfd_get_type())
741 #define XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd)
742 #define XFER_DEST_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd const)
743 #define XFER_DEST_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_readfd_get_type(), XferDestReadfdClass)
744 #define IS_XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_readfd_get_type ())
745 #define XFER_DEST_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_readfd_get_type(), XferDestReadfdClass)
746
747 typedef struct XferDestReadfd {
748     XferElement __parent__;
749
750     GThread *thread;
751     simpleprng_state_t prng;
752 } XferDestReadfd;
753
754 typedef struct {
755     XferElementClass __parent__;
756 } XferDestReadfdClass;
757
758 static gpointer
759 dest_readfd_thread(
760     gpointer data)
761 {
762     XferDestReadfd *self = XFER_DEST_READFD(data);
763     XferElement *elt = XFER_ELEMENT(data);
764     char buf[TEST_XFER_SIZE];
765     size_t remaining;
766     int fd = xfer_element_swap_output_fd(elt->upstream, -1);
767
768     /* this shouldn't happen, although non-test elements handle it gracefully */
769     g_assert(fd != -1);
770
771     remaining = sizeof(buf);
772     while (remaining) {
773         ssize_t nread;
774         if ((nread = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
775             error("error in read(): %s", strerror(errno));
776         }
777         remaining -= nread;
778     }
779
780     /* we should be at EOF here */
781     if (read(fd, buf, 10) != 0)
782         g_critical("too much data entering XferDestReadfd");
783
784     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
785         g_critical("data entering XferDestReadfd does not match");
786
787     close(fd);
788
789     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
790
791     return NULL;
792 }
793
794 static gboolean
795 dest_readfd_start_impl(
796     XferElement *elt)
797 {
798     XferDestReadfd *self = XFER_DEST_READFD(elt);
799
800     simpleprng_seed(&self->prng, RANDOM_SEED);
801
802     self->thread = g_thread_create(dest_readfd_thread, (gpointer)self, FALSE, NULL);
803
804     return TRUE;
805 }
806
807 static void
808 dest_readfd_class_init(
809     XferDestReadfdClass * klass)
810 {
811     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
812     static xfer_element_mech_pair_t mech_pairs[] = {
813         { XFER_MECH_READFD, XFER_MECH_NONE, 1, 1},
814         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
815     };
816
817     xec->start = dest_readfd_start_impl;
818     xec->mech_pairs = mech_pairs;
819 }
820
821 GType
822 xfer_dest_readfd_get_type (void)
823 {
824     static GType type = 0;
825
826     if G_UNLIKELY(type == 0) {
827         static const GTypeInfo info = {
828             sizeof (XferDestReadfdClass),
829             (GBaseInitFunc) NULL,
830             (GBaseFinalizeFunc) NULL,
831             (GClassInitFunc) dest_readfd_class_init,
832             (GClassFinalizeFunc) NULL,
833             NULL /* class_data */,
834             sizeof (XferDestReadfd),
835             0 /* n_preallocs */,
836             (GInstanceInitFunc) NULL,
837             NULL
838         };
839
840         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestReadfd", &info, 0);
841     }
842
843     return type;
844 }
845
846 /* WRITEFD */
847
848 static GType xfer_dest_writefd_get_type(void);
849 #define XFER_DEST_WRITEFD_TYPE (xfer_dest_writefd_get_type())
850 #define XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd)
851 #define XFER_DEST_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd const)
852 #define XFER_DEST_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_writefd_get_type(), XferDestWritefdClass)
853 #define IS_XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_writefd_get_type ())
854 #define XFER_DEST_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_writefd_get_type(), XferDestWritefdClass)
855
856 typedef struct XferDestWritefd {
857     XferElement __parent__;
858
859     int read_fd;
860     GThread *thread;
861     simpleprng_state_t prng;
862 } XferDestWritefd;
863
864 typedef struct {
865     XferElementClass __parent__;
866 } XferDestWritefdClass;
867
868 static gpointer
869 dest_writefd_thread(
870     gpointer data)
871 {
872     XferDestWritefd *self = XFER_DEST_WRITEFD(data);
873     char buf[TEST_XFER_SIZE];
874     size_t remaining;
875     int fd = self->read_fd;
876
877     remaining = sizeof(buf);
878     while (remaining) {
879         ssize_t nwrite;
880         if ((nwrite = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
881             error("error in read(): %s", strerror(errno));
882         }
883         remaining -= nwrite;
884     }
885
886     /* we should be at EOF here */
887     if (read(fd, buf, 10) != 0)
888         g_critical("too much data entering XferDestWritefd");
889
890     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
891         g_critical("data entering XferDestWritefd does not match");
892
893     close(fd);
894
895     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
896
897     return NULL;
898 }
899
900 static gboolean
901 dest_writefd_setup_impl(
902     XferElement *elt)
903 {
904     XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
905     int p[2];
906
907     simpleprng_seed(&self->prng, RANDOM_SEED);
908
909     if (pipe(p) < 0)
910         g_critical("Error from pipe(): %s", strerror(errno));
911
912     self->read_fd = p[0];
913     g_assert(xfer_element_swap_input_fd(elt, p[1]) == -1);
914
915     return TRUE;
916 }
917
918 static gboolean
919 dest_writefd_start_impl(
920     XferElement *elt)
921 {
922     XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
923     self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL);
924
925     return TRUE;
926 }
927
928 static void
929 dest_writefd_class_init(
930     XferDestWritefdClass * klass)
931 {
932     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
933     static xfer_element_mech_pair_t mech_pairs[] = {
934         { XFER_MECH_WRITEFD, XFER_MECH_NONE, 1, 1},
935         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
936     };
937
938     xec->setup = dest_writefd_setup_impl;
939     xec->start = dest_writefd_start_impl;
940     xec->mech_pairs = mech_pairs;
941 }
942
943 GType
944 xfer_dest_writefd_get_type (void)
945 {
946     static GType type = 0;
947
948     if G_UNLIKELY(type == 0) {
949         static const GTypeInfo info = {
950             sizeof (XferDestWritefdClass),
951             (GBaseInitFunc) NULL,
952             (GBaseFinalizeFunc) NULL,
953             (GClassInitFunc) dest_writefd_class_init,
954             (GClassFinalizeFunc) NULL,
955             NULL /* class_data */,
956             sizeof (XferDestWritefd),
957             0 /* n_preallocs */,
958             (GInstanceInitFunc) NULL,
959             NULL
960         };
961
962         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestWritefd", &info, 0);
963     }
964
965     return type;
966 }
967
968 /* PUSH_BUFFER */
969
970 static GType xfer_dest_push_get_type(void);
971 #define XFER_DEST_PUSH_TYPE (xfer_dest_push_get_type())
972 #define XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush)
973 #define XFER_DEST_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush const)
974 #define XFER_DEST_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_push_get_type(), XferDestPushClass)
975 #define IS_XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_push_get_type ())
976 #define XFER_DEST_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_push_get_type(), XferDestPushClass)
977
978 typedef struct XferDestPush {
979     XferElement __parent__;
980
981     char *buf;
982     size_t bufpos;
983
984     GThread *thread;
985     simpleprng_state_t prng;
986 } XferDestPush;
987
988 typedef struct {
989     XferElementClass __parent__;
990 } XferDestPushClass;
991
992 static void
993 dest_push_push_buffer_impl(
994     XferElement *elt,
995     gpointer buf,
996     size_t size)
997 {
998     XferDestPush *self = XFER_DEST_PUSH(elt);
999
1000     if (buf == NULL) {
1001         /* if we're at EOF, verify we got the right bytes */
1002         g_assert(self->bufpos == TEST_XFER_SIZE);
1003         if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE))
1004             g_critical("data entering XferDestPush does not match");
1005         g_free(self->buf);
1006         return;
1007     }
1008
1009     g_assert(self->bufpos + size <= TEST_XFER_SIZE);
1010     memcpy(self->buf + self->bufpos, buf, size);
1011     self->bufpos += size;
1012 }
1013
1014 static gboolean
1015 dest_push_setup_impl(
1016     XferElement *elt)
1017 {
1018     XferDestPush *self = XFER_DEST_PUSH(elt);
1019
1020     self->buf = g_malloc(TEST_XFER_SIZE);
1021     simpleprng_seed(&self->prng, RANDOM_SEED);
1022
1023     return TRUE;
1024 }
1025
1026 static void
1027 dest_push_class_init(
1028     XferDestPushClass * klass)
1029 {
1030     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1031     static xfer_element_mech_pair_t mech_pairs[] = {
1032         { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 0},
1033         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1034     };
1035
1036     xec->push_buffer = dest_push_push_buffer_impl;
1037     xec->setup = dest_push_setup_impl;
1038     xec->mech_pairs = mech_pairs;
1039 }
1040
1041 GType
1042 xfer_dest_push_get_type (void)
1043 {
1044     static GType type = 0;
1045
1046     if G_UNLIKELY(type == 0) {
1047         static const GTypeInfo info = {
1048             sizeof (XferDestPushClass),
1049             (GBaseInitFunc) NULL,
1050             (GBaseFinalizeFunc) NULL,
1051             (GClassInitFunc) dest_push_class_init,
1052             (GClassFinalizeFunc) NULL,
1053             NULL /* class_data */,
1054             sizeof (XferDestPush),
1055             0 /* n_preallocs */,
1056             (GInstanceInitFunc) NULL,
1057             NULL
1058         };
1059
1060         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPush", &info, 0);
1061     }
1062
1063     return type;
1064 }
1065
1066 /* PULL_BUFFER */
1067
1068 static GType xfer_dest_pull_get_type(void);
1069 #define XFER_DEST_PULL_TYPE (xfer_dest_pull_get_type())
1070 #define XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull)
1071 #define XFER_DEST_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull const)
1072 #define XFER_DEST_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_pull_get_type(), XferDestPullClass)
1073 #define IS_XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_pull_get_type ())
1074 #define XFER_DEST_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_pull_get_type(), XferDestPullClass)
1075
1076 typedef struct XferDestPull {
1077     XferElement __parent__;
1078
1079     GThread *thread;
1080     simpleprng_state_t prng;
1081 } XferDestPull;
1082
1083 typedef struct {
1084     XferElementClass __parent__;
1085 } XferDestPullClass;
1086
1087 static gpointer
1088 dest_pull_thread(
1089     gpointer data)
1090 {
1091     XferDestPull *self = XFER_DEST_PULL(data);
1092     char fullbuf[TEST_XFER_SIZE];
1093     char *buf;
1094     size_t bufpos = 0;
1095     size_t size;
1096
1097     while ((buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size))) {
1098         g_assert(bufpos + size <= TEST_XFER_SIZE);
1099         memcpy(fullbuf + bufpos, buf, size);
1100         bufpos += size;
1101     }
1102
1103     /* we're at EOF, so verify we got the right bytes */
1104     g_assert(bufpos == TEST_XFER_SIZE);
1105     if (!simpleprng_verify_buffer(&self->prng, fullbuf, TEST_XFER_SIZE))
1106         g_critical("data entering XferDestPull does not match");
1107
1108     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1109
1110     return NULL;
1111 }
1112
1113 static gboolean
1114 dest_pull_start_impl(
1115     XferElement *elt)
1116 {
1117     XferDestPull *self = XFER_DEST_PULL(elt);
1118
1119     simpleprng_seed(&self->prng, RANDOM_SEED);
1120
1121     self->thread = g_thread_create(dest_pull_thread, (gpointer)self, FALSE, NULL);
1122
1123     return TRUE;
1124 }
1125
1126 static void
1127 dest_pull_class_init(
1128     XferDestPullClass * klass)
1129 {
1130     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1131     static xfer_element_mech_pair_t mech_pairs[] = {
1132         { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 1, 1},
1133         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1134     };
1135
1136     xec->start = dest_pull_start_impl;
1137     xec->mech_pairs = mech_pairs;
1138 }
1139
1140 GType
1141 xfer_dest_pull_get_type (void)
1142 {
1143     static GType type = 0;
1144
1145     if G_UNLIKELY(type == 0) {
1146         static const GTypeInfo info = {
1147             sizeof (XferDestPullClass),
1148             (GBaseInitFunc) NULL,
1149             (GBaseFinalizeFunc) NULL,
1150             (GClassInitFunc) dest_pull_class_init,
1151             (GClassFinalizeFunc) NULL,
1152             NULL /* class_data */,
1153             sizeof (XferDestPull),
1154             0 /* n_preallocs */,
1155             (GInstanceInitFunc) NULL,
1156             NULL
1157         };
1158
1159         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPull", &info, 0);
1160     }
1161
1162     return type;
1163 }
1164
1165 /* LISTEN */
1166
1167 static GType xfer_dest_listen_get_type(void);
1168 #define XFER_DEST_LISTEN_TYPE (xfer_dest_listen_get_type())
1169 #define XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen)
1170 #define XFER_DEST_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen const)
1171 #define XFER_DEST_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_listen_get_type(), XferDestListenClass)
1172 #define IS_XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_listen_get_type ())
1173 #define XFER_DEST_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_listen_get_type(), XferDestListenClass)
1174
1175 typedef struct XferDestListen {
1176     XferElement __parent__;
1177
1178     int listen_socket;
1179
1180     GThread *thread;
1181     simpleprng_state_t prng;
1182 } XferDestListen;
1183
1184 typedef struct {
1185     XferElementClass __parent__;
1186 } XferDestListenClass;
1187
1188 static gpointer
1189 dest_listen_thread(
1190     gpointer data)
1191 {
1192     XferDestListen *self = XFER_DEST_LISTEN(data);
1193     char *buf;
1194     size_t bytes = 0;
1195     int sock;
1196
1197     g_assert(self->listen_socket != -1);
1198
1199     if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
1200         xfer_cancel_with_error(XFER_ELEMENT(self),
1201             _("Error accepting incoming connection: %s"), strerror(errno));
1202         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1203         return NULL;
1204     }
1205
1206     /* close the listening socket now, for good measure */
1207     close(self->listen_socket);
1208     self->listen_socket = -1;
1209
1210     /* read from the socket until EOF or all of the data is read.  We try to
1211      * read one extra byte - if we get it, then upstream sent too much data */
1212     buf = g_malloc(TEST_XFER_SIZE+1);
1213     bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1214     g_assert(bytes == TEST_XFER_SIZE);
1215     close(sock);
1216
1217     /* we're at EOF, so verify we got the right bytes */
1218     g_assert(bytes == TEST_XFER_SIZE);
1219     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1220         g_critical("data entering XferDestListen does not match");
1221
1222     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1223
1224     return NULL;
1225 }
1226
1227 static gboolean
1228 dest_listen_setup_impl(
1229     XferElement *elt)
1230 {
1231     XferDestListen *self = XFER_DEST_LISTEN(elt);
1232     sockaddr_union addr;
1233     DirectTCPAddr *addrs;
1234     socklen_t len;
1235     int sock;
1236
1237     /* set up self->listen_socket and set elt->input_listen_addrs */
1238     sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
1239     if (sock < 0)
1240         error("socket(): %s", strerror(errno));
1241
1242     if (listen(sock, 1) < 0)
1243         error("listen(): %s", strerror(errno));
1244
1245     len = sizeof(addr);
1246     if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
1247         error("getsockname(): %s", strerror(errno));
1248     g_assert(SU_GET_FAMILY(&addr) == AF_INET);
1249
1250     addrs = g_new0(DirectTCPAddr, 2);
1251     addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
1252     addrs[0].port = SU_GET_PORT(&addr);
1253     elt->input_listen_addrs = addrs;
1254
1255     return TRUE;
1256 }
1257
1258 static gboolean
1259 dest_listen_start_impl(
1260     XferElement *elt)
1261 {
1262     XferDestListen *self = XFER_DEST_LISTEN(elt);
1263
1264     simpleprng_seed(&self->prng, RANDOM_SEED);
1265
1266     self->thread = g_thread_create(dest_listen_thread, (gpointer)self, FALSE, NULL);
1267
1268     return TRUE;
1269 }
1270
1271 static void
1272 dest_listen_class_init(
1273     XferDestListenClass * klass)
1274 {
1275     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1276     static xfer_element_mech_pair_t mech_pairs[] = {
1277         { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 1, 1},
1278         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1279     };
1280
1281     xec->setup = dest_listen_setup_impl;
1282     xec->start = dest_listen_start_impl;
1283     xec->mech_pairs = mech_pairs;
1284 }
1285
1286 GType
1287 xfer_dest_listen_get_type (void)
1288 {
1289     static GType type = 0;
1290
1291     if G_UNLIKELY(type == 0) {
1292         static const GTypeInfo info = {
1293             sizeof (XferDestListenClass),
1294             (GBaseInitFunc) NULL,
1295             (GBaseFinalizeFunc) NULL,
1296             (GClassInitFunc) dest_listen_class_init,
1297             (GClassFinalizeFunc) NULL,
1298             NULL /* class_data */,
1299             sizeof (XferDestListen),
1300             0 /* n_preallocs */,
1301             (GInstanceInitFunc) NULL,
1302             NULL
1303         };
1304
1305         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestListen", &info, 0);
1306     }
1307
1308     return type;
1309 }
1310
1311 /* CONNET */
1312
1313 static GType xfer_dest_connect_get_type(void);
1314 #define XFER_DEST_CONNECT_TYPE (xfer_dest_connect_get_type())
1315 #define XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect)
1316 #define XFER_DEST_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect const)
1317 #define XFER_DEST_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_connect_get_type(), XferDestConnectClass)
1318 #define IS_XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_connect_get_type ())
1319 #define XFER_DEST_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_connect_get_type(), XferDestConnectClass)
1320
1321 typedef struct XferDestConnect {
1322     XferElement __parent__;
1323
1324     int connect_socket;
1325
1326     GThread *thread;
1327     simpleprng_state_t prng;
1328 } XferDestConnect;
1329
1330 typedef struct {
1331     XferElementClass __parent__;
1332 } XferDestConnectClass;
1333
1334 static gpointer
1335 dest_connect_thread(
1336     gpointer data)
1337 {
1338     XferDestConnect *self = XFER_DEST_CONNECT(data);
1339     XferElement *elt = XFER_ELEMENT(self);
1340     DirectTCPAddr *addrs;
1341     sockaddr_union addr;
1342     char *buf;
1343     size_t bytes = 0;
1344     int sock;
1345
1346     /* set up the sockaddr -- IPv4 only */
1347     SU_INIT(&addr, AF_INET);
1348     addrs = elt->upstream->output_listen_addrs;
1349     g_assert(addrs != NULL);
1350     SU_SET_PORT(&addr, addrs->port);
1351     ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
1352
1353     tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
1354     sock = socket(AF_INET, SOCK_STREAM, 0);
1355     if (sock < 0) {
1356         error("socket(): %s", strerror(errno));
1357     }
1358     if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
1359         error("connect(): %s", strerror(errno));
1360     }
1361
1362     tu_dbg("connected to %s\n", str_sockaddr(&addr));
1363
1364     /* read from the socket until EOF or all of the data is read.  We try to
1365      * read one extra byte - if we get it, then upstream sent too much data */
1366     buf = g_malloc(TEST_XFER_SIZE+1);
1367     bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1368     g_assert(bytes == TEST_XFER_SIZE);
1369     close(sock);
1370
1371     /* we're at EOF, so verify we got the right bytes */
1372     g_assert(bytes == TEST_XFER_SIZE);
1373     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1374         g_critical("data entering XferDestConnect does not match");
1375
1376     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1377
1378     return NULL;
1379 }
1380
1381 static gboolean
1382 dest_connect_start_impl(
1383     XferElement *elt)
1384 {
1385     XferDestConnect *self = XFER_DEST_CONNECT(elt);
1386
1387     simpleprng_seed(&self->prng, RANDOM_SEED);
1388
1389     self->thread = g_thread_create(dest_connect_thread, (gpointer)self, FALSE, NULL);
1390
1391     return TRUE;
1392 }
1393
1394 static void
1395 dest_connect_class_init(
1396     XferDestConnectClass * klass)
1397 {
1398     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1399     static xfer_element_mech_pair_t mech_pairs[] = {
1400         { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, 1, 1},
1401         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1402     };
1403
1404     xec->start = dest_connect_start_impl;
1405     xec->mech_pairs = mech_pairs;
1406 }
1407
1408 GType
1409 xfer_dest_connect_get_type (void)
1410 {
1411     static GType type = 0;
1412
1413     if G_UNLIKELY(type == 0) {
1414         static const GTypeInfo info = {
1415             sizeof (XferDestConnectClass),
1416             (GBaseInitFunc) NULL,
1417             (GBaseFinalizeFunc) NULL,
1418             (GClassInitFunc) dest_connect_class_init,
1419             (GClassFinalizeFunc) NULL,
1420             NULL /* class_data */,
1421             sizeof (XferDestConnect),
1422             0 /* n_preallocs */,
1423             (GInstanceInitFunc) NULL,
1424             NULL
1425         };
1426
1427         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestConnect", &info, 0);
1428     }
1429
1430     return type;
1431 }
1432
1433
1434 /*
1435  * Tests
1436  */
1437
1438 static void
1439 test_xfer_generic_callback(
1440     gpointer data G_GNUC_UNUSED,
1441     XMsg *msg,
1442     Xfer *xfer)
1443 {
1444     tu_dbg("Received message %s\n", xmsg_repr(msg));
1445
1446     switch (msg->type) {
1447         case XMSG_DONE:
1448             /* are we done? */
1449             if (xfer->status == XFER_DONE) {
1450                 tu_dbg("all elements are done!\n");
1451                 g_main_loop_quit(default_main_loop());
1452             }
1453             break;
1454
1455         default:
1456             break;
1457     }
1458 }
1459
1460 /****
1461  * Run a simple transfer with some xor filters
1462  */
1463
1464 static int
1465 test_xfer_simple(void)
1466 {
1467     unsigned int i;
1468     GSource *src;
1469     XferElement *elements[] = {
1470         xfer_source_random(100*1024, RANDOM_SEED),
1471         xfer_filter_xor('d'),
1472         xfer_filter_xor('d'),
1473         xfer_dest_null(RANDOM_SEED),
1474     };
1475
1476     Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1477     src = xfer_get_source(xfer);
1478     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1479     g_source_attach(src, NULL);
1480     tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1481
1482     /* unreference the elements */
1483     for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1484         g_object_unref(elements[i]);
1485         g_assert(G_OBJECT(elements[i])->ref_count == 1);
1486         elements[i] = NULL;
1487     }
1488
1489     xfer_start(xfer);
1490
1491     g_main_loop_run(default_main_loop());
1492     g_assert(xfer->status == XFER_DONE);
1493
1494     xfer_unref(xfer);
1495
1496     return 1;
1497 }
1498
1499 /****
1500  * Run a transfer between two files, with or without filters
1501  */
1502
1503 static int
1504 test_xfer_files(gboolean add_filters)
1505 {
1506     unsigned int i;
1507     unsigned int elts;
1508     GSource *src;
1509     char *in_filename = __FILE__;
1510     char *out_filename = "xfer-test.tmp"; /* current directory is writeable */
1511     int rfd, wfd;
1512     Xfer *xfer;
1513     XferElement *elements[4];
1514
1515     rfd = open(in_filename, O_RDONLY, 0);
1516     if (rfd < 0)
1517         g_critical("Could not open '%s': %s", in_filename, strerror(errno));
1518
1519     wfd = open(out_filename, O_WRONLY|O_CREAT, 0777);
1520     if (wfd < 0)
1521         g_critical("Could not open '%s': %s", out_filename, strerror(errno));
1522
1523     elts = 0;
1524     elements[elts++] = xfer_source_fd(rfd);
1525     if (add_filters) {
1526         elements[elts++] = xfer_filter_xor(0xab);
1527         elements[elts++] = xfer_filter_xor(0xab);
1528     }
1529     elements[elts++] = xfer_dest_fd(wfd);
1530
1531     xfer = xfer_new(elements, elts);
1532     src = xfer_get_source(xfer);
1533     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1534     g_source_attach(src, NULL);
1535     tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1536
1537     /* unreference the elements */
1538     for (i = 0; i < elts; i++) {
1539         g_object_unref(elements[i]);
1540         g_assert(G_OBJECT(elements[i])->ref_count == 1);
1541         elements[i] = NULL;
1542     }
1543
1544     xfer_start(xfer);
1545
1546     g_main_loop_run(default_main_loop());
1547     g_assert(xfer->status == XFER_DONE);
1548
1549     xfer_unref(xfer);
1550
1551     unlink(out_filename); /* ignore any errors */
1552
1553     return 1;
1554 }
1555
1556 static int
1557 test_xfer_files_simple(void)
1558 {
1559     return test_xfer_files(FALSE);
1560 }
1561
1562 static int
1563 test_xfer_files_filter(void)
1564 {
1565     return test_xfer_files(TRUE);
1566 }
1567
1568 /*****
1569  * test each possible combination of source and destination mechansim
1570  */
1571
1572 static int
1573 test_glue_combo(
1574     XferElement *source,
1575     XferElement *dest)
1576 {
1577     unsigned int i;
1578     GSource *src;
1579     XferElement *elements[] = { source, dest };
1580
1581     Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1582     src = xfer_get_source(xfer);
1583     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1584     g_source_attach(src, NULL);
1585
1586     /* unreference the elements */
1587     for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1588         g_object_unref(elements[i]);
1589         g_assert(G_OBJECT(elements[i])->ref_count == 1);
1590         elements[i] = NULL;
1591     }
1592
1593     xfer_start(xfer);
1594
1595     g_main_loop_run(default_main_loop());
1596     g_assert(xfer->status == XFER_DONE);
1597
1598     xfer_unref(xfer);
1599
1600     return 1;
1601 }
1602
1603 #define make_test_glue(n, s, d) static int n(void) \
1604 {\
1605     return test_glue_combo((XferElement *)g_object_new(s, NULL), \
1606                            (XferElement *)g_object_new(d, NULL)); \
1607 }
1608 make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE)
1609 make_test_glue(test_glue_READFD_WRITEFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1610 make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE)
1611 make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE)
1612 make_test_glue(test_glue_READFD_LISTEN, XFER_SOURCE_READFD_TYPE, XFER_DEST_LISTEN_TYPE)
1613 make_test_glue(test_glue_READFD_CONNECT, XFER_SOURCE_READFD_TYPE, XFER_DEST_CONNECT_TYPE)
1614 make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE)
1615 make_test_glue(test_glue_WRITEFD_WRITEFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1616 make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE)
1617 make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE)
1618 make_test_glue(test_glue_WRITEFD_LISTEN, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_LISTEN_TYPE)
1619 make_test_glue(test_glue_WRITEFD_CONNECT, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_CONNECT_TYPE)
1620 make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE)
1621 make_test_glue(test_glue_PUSH_WRITEFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
1622 make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE)
1623 make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE)
1624 make_test_glue(test_glue_PUSH_LISTEN, XFER_SOURCE_PUSH_TYPE, XFER_DEST_LISTEN_TYPE)
1625 make_test_glue(test_glue_PUSH_CONNECT, XFER_SOURCE_PUSH_TYPE, XFER_DEST_CONNECT_TYPE)
1626 make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE)
1627 make_test_glue(test_glue_PULL_WRITEFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
1628 make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE)
1629 make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE)
1630 make_test_glue(test_glue_PULL_LISTEN, XFER_SOURCE_PULL_TYPE, XFER_DEST_LISTEN_TYPE)
1631 make_test_glue(test_glue_PULL_CONNECT, XFER_SOURCE_PULL_TYPE, XFER_DEST_CONNECT_TYPE)
1632 make_test_glue(test_glue_LISTEN_READFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_READFD_TYPE)
1633 make_test_glue(test_glue_LISTEN_WRITEFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_WRITEFD_TYPE)
1634 make_test_glue(test_glue_LISTEN_PUSH, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PUSH_TYPE)
1635 make_test_glue(test_glue_LISTEN_PULL, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PULL_TYPE)
1636 make_test_glue(test_glue_LISTEN_LISTEN, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_LISTEN_TYPE)
1637 make_test_glue(test_glue_LISTEN_CONNECT, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_CONNECT_TYPE)
1638 make_test_glue(test_glue_CONNECT_READFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_READFD_TYPE)
1639 make_test_glue(test_glue_CONNECT_WRITEFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_WRITEFD_TYPE)
1640 make_test_glue(test_glue_CONNECT_PUSH, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PUSH_TYPE)
1641 make_test_glue(test_glue_CONNECT_PULL, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PULL_TYPE)
1642 make_test_glue(test_glue_CONNECT_LISTEN, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_LISTEN_TYPE)
1643 make_test_glue(test_glue_CONNECT_CONNECT, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_CONNECT_TYPE)
1644
1645 /*
1646  * Main driver
1647  */
1648
1649 int
1650 main(int argc, char **argv)
1651 {
1652     static TestUtilsTest tests[] = {
1653         TU_TEST(test_xfer_simple, 90),
1654         TU_TEST(test_xfer_files_simple, 90),
1655         TU_TEST(test_xfer_files_filter, 90),
1656         TU_TEST(test_glue_READFD_READFD, 90),
1657         TU_TEST(test_glue_READFD_WRITEFD, 90),
1658         TU_TEST(test_glue_READFD_PUSH, 90),
1659         TU_TEST(test_glue_READFD_PULL, 90),
1660         TU_TEST(test_glue_READFD_LISTEN, 90),
1661         TU_TEST(test_glue_READFD_CONNECT, 90),
1662         TU_TEST(test_glue_WRITEFD_READFD, 90),
1663         TU_TEST(test_glue_WRITEFD_WRITEFD, 90),
1664         TU_TEST(test_glue_WRITEFD_PUSH, 90),
1665         TU_TEST(test_glue_WRITEFD_PULL, 90),
1666         TU_TEST(test_glue_WRITEFD_LISTEN, 90),
1667         TU_TEST(test_glue_WRITEFD_CONNECT, 90),
1668         TU_TEST(test_glue_PUSH_READFD, 90),
1669         TU_TEST(test_glue_PUSH_WRITEFD, 90),
1670         TU_TEST(test_glue_PUSH_PUSH, 90),
1671         TU_TEST(test_glue_PUSH_PULL, 90),
1672         TU_TEST(test_glue_PUSH_LISTEN, 90),
1673         TU_TEST(test_glue_PUSH_CONNECT, 90),
1674         TU_TEST(test_glue_PULL_READFD, 90),
1675         TU_TEST(test_glue_PULL_WRITEFD, 90),
1676         TU_TEST(test_glue_PULL_PUSH, 90),
1677         TU_TEST(test_glue_PULL_PULL, 90),
1678         TU_TEST(test_glue_PULL_LISTEN, 90),
1679         TU_TEST(test_glue_PULL_CONNECT, 90),
1680         TU_TEST(test_glue_LISTEN_READFD, 90),
1681         TU_TEST(test_glue_LISTEN_WRITEFD, 90),
1682         TU_TEST(test_glue_LISTEN_PUSH, 90),
1683         TU_TEST(test_glue_LISTEN_PULL, 90),
1684         TU_TEST(test_glue_LISTEN_LISTEN, 90),
1685         TU_TEST(test_glue_LISTEN_CONNECT, 90),
1686         TU_TEST(test_glue_CONNECT_READFD, 90),
1687         TU_TEST(test_glue_CONNECT_WRITEFD, 90),
1688         TU_TEST(test_glue_CONNECT_PUSH, 90),
1689         TU_TEST(test_glue_CONNECT_PULL, 90),
1690         TU_TEST(test_glue_CONNECT_LISTEN, 90),
1691         TU_TEST(test_glue_CONNECT_CONNECT, 90),
1692         TU_END()
1693     };
1694
1695     glib_init();
1696
1697     return testutils_run_tests(argc, argv, tests);
1698 }