Imported Upstream version 3.1.0
[debian/amanda] / xfer-src / xfer-test.c
1 /*
2  * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
3  * 
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  * 
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  * 
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  * 
17  * Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  *
20  * Author: Dustin J. Mitchell <dustin@zmanda.com>
21  */
22
23 #include "amxfer.h"
24 #include "glib-util.h"
25 #include "testutils.h"
26 #include "amanda.h"
27 #include "event.h"
28 #include "simpleprng.h"
29 #include "sockaddr-util.h"
30
31 /* Having tests repeat exactly is an advantage, so we use a hard-coded
32  * random seed. */
33 #define RANDOM_SEED 0xf00d
34
35 /*
36  * XferElement subclasses
37  *
38  * This file defines a few "private" element classes that each have only one
39  * mechanism pair.  These classes are then used to test all of the possible
40  * combinations of glue.
41  */
42
43 /* constants to determine the total amount of data to be transfered; EXTRA is
44  * to test out partial-block handling; it should be prime. */
45 #define TEST_BLOCK_SIZE 32768
46 #define TEST_BLOCK_COUNT 10
47 #define TEST_BLOCK_EXTRA 97
48 #define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA)
49
50 /* READFD */
51
52 static GType xfer_source_readfd_get_type(void);
53 #define XFER_SOURCE_READFD_TYPE (xfer_source_readfd_get_type())
54 #define XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd)
55 #define XFER_SOURCE_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd const)
56 #define XFER_SOURCE_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_readfd_get_type(), XferSourceReadfdClass)
57 #define IS_XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_readfd_get_type ())
58 #define XFER_SOURCE_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_readfd_get_type(), XferSourceReadfdClass)
59
60 typedef struct XferSourceReadfd {
61     XferElement __parent__;
62
63     int write_fd;
64     GThread *thread;
65     simpleprng_state_t prng;
66 } XferSourceReadfd;
67
68 typedef struct {
69     XferElementClass __parent__;
70 } XferSourceReadfdClass;
71
72 static gpointer
73 source_readfd_thread(
74     gpointer data)
75 {
76     XferSourceReadfd *self = XFER_SOURCE_READFD(data);
77     char buf[TEST_XFER_SIZE];
78     int fd = self->write_fd;
79
80     simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
81
82     if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
83         error("error in full_write(): %s", strerror(errno));
84     }
85
86     close(fd);
87
88     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
89
90     return NULL;
91 }
92
93 static gboolean
94 source_readfd_setup_impl(
95     XferElement *elt)
96 {
97     XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
98     int p[2];
99
100     simpleprng_seed(&self->prng, RANDOM_SEED);
101
102     if (pipe(p) < 0)
103         g_critical("Error from pipe(): %s", strerror(errno));
104
105     self->write_fd = p[1];
106     XFER_ELEMENT(self)->output_fd = p[0];
107
108     return TRUE;
109 }
110
111 static gboolean
112 source_readfd_start_impl(
113     XferElement *elt)
114 {
115     XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
116     self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL);
117
118     return TRUE;
119 }
120
121 static void
122 source_readfd_class_init(
123     XferSourceReadfdClass * klass)
124 {
125     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
126     static xfer_element_mech_pair_t mech_pairs[] = {
127         { XFER_MECH_NONE, XFER_MECH_READFD, 1, 1},
128         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
129     };
130
131     xec->setup = source_readfd_setup_impl;
132     xec->start = source_readfd_start_impl;
133     xec->mech_pairs = mech_pairs;
134 }
135
136 GType
137 xfer_source_readfd_get_type (void)
138 {
139     static GType type = 0;
140
141     if G_UNLIKELY(type == 0) {
142         static const GTypeInfo info = {
143             sizeof (XferSourceReadfdClass),
144             (GBaseInitFunc) NULL,
145             (GBaseFinalizeFunc) NULL,
146             (GClassInitFunc) source_readfd_class_init,
147             (GClassFinalizeFunc) NULL,
148             NULL /* class_data */,
149             sizeof (XferSourceReadfd),
150             0 /* n_preallocs */,
151             (GInstanceInitFunc) NULL,
152             NULL
153         };
154
155         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceReadfd", &info, 0);
156     }
157
158     return type;
159 }
160
161 /* WRITEFD */
162
163 static GType xfer_source_writefd_get_type(void);
164 #define XFER_SOURCE_WRITEFD_TYPE (xfer_source_writefd_get_type())
165 #define XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd)
166 #define XFER_SOURCE_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd const)
167 #define XFER_SOURCE_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_writefd_get_type(), XferSourceWritefdClass)
168 #define IS_XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_writefd_get_type ())
169 #define XFER_SOURCE_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_writefd_get_type(), XferSourceWritefdClass)
170
171 typedef struct XferSourceWritefd {
172     XferElement __parent__;
173
174     GThread *thread;
175     simpleprng_state_t prng;
176 } XferSourceWritefd;
177
178 typedef struct {
179     XferElementClass __parent__;
180 } XferSourceWritefdClass;
181
182 static gpointer
183 source_writefd_thread(
184     gpointer data)
185 {
186     XferSourceWritefd *self = XFER_SOURCE_WRITEFD(data);
187     char buf[TEST_XFER_SIZE];
188     int fd = XFER_ELEMENT(self)->downstream->input_fd;
189
190     simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
191
192     if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
193         error("error in full_write(): %s", strerror(errno));
194     }
195
196     close(fd);
197     XFER_ELEMENT(self)->downstream->input_fd = -1;
198
199     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
200
201     return NULL;
202 }
203
204 static gboolean
205 source_writefd_start_impl(
206     XferElement *elt)
207 {
208     XferSourceWritefd *self = XFER_SOURCE_WRITEFD(elt);
209
210     simpleprng_seed(&self->prng, RANDOM_SEED);
211
212     self->thread = g_thread_create(source_writefd_thread, (gpointer)self, FALSE, NULL);
213
214     return TRUE;
215 }
216
217 static void
218 source_writefd_class_init(
219     XferSourceWritefdClass * klass)
220 {
221     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
222     static xfer_element_mech_pair_t mech_pairs[] = {
223         { XFER_MECH_NONE, XFER_MECH_WRITEFD, 1, 1},
224         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
225     };
226
227     xec->start = source_writefd_start_impl;
228     xec->mech_pairs = mech_pairs;
229 }
230
231 GType
232 xfer_source_writefd_get_type (void)
233 {
234     static GType type = 0;
235
236     if G_UNLIKELY(type == 0) {
237         static const GTypeInfo info = {
238             sizeof (XferSourceWritefdClass),
239             (GBaseInitFunc) NULL,
240             (GBaseFinalizeFunc) NULL,
241             (GClassInitFunc) source_writefd_class_init,
242             (GClassFinalizeFunc) NULL,
243             NULL /* class_data */,
244             sizeof (XferSourceWritefd),
245             0 /* n_preallocs */,
246             (GInstanceInitFunc) NULL,
247             NULL
248         };
249
250         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceWritefd", &info, 0);
251     }
252
253     return type;
254 }
255
256 /* PUSH_BUFFER */
257
258 static GType xfer_source_push_get_type(void);
259 #define XFER_SOURCE_PUSH_TYPE (xfer_source_push_get_type())
260 #define XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush)
261 #define XFER_SOURCE_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush const)
262 #define XFER_SOURCE_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_push_get_type(), XferSourcePushClass)
263 #define IS_XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_push_get_type ())
264 #define XFER_SOURCE_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_push_get_type(), XferSourcePushClass)
265
266 typedef struct XferSourcePush {
267     XferElement __parent__;
268
269     GThread *thread;
270     simpleprng_state_t prng;
271 } XferSourcePush;
272
273 typedef struct {
274     XferElementClass __parent__;
275 } XferSourcePushClass;
276
277 static gpointer
278 source_push_thread(
279     gpointer data)
280 {
281     XferSourcePush *self = XFER_SOURCE_PUSH(data);
282     char *buf;
283     int i;
284
285     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
286         buf = g_malloc(TEST_BLOCK_SIZE);
287         simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
288         xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_SIZE);
289         buf = NULL;
290     }
291
292     /* send a smaller block */
293     buf = g_malloc(TEST_BLOCK_EXTRA);
294     simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
295     xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_EXTRA);
296     buf = NULL;
297
298     /* send EOF */
299     xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, NULL, 0);
300
301     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
302
303     return NULL;
304 }
305
306 static gboolean
307 source_push_start_impl(
308     XferElement *elt)
309 {
310     XferSourcePush *self = XFER_SOURCE_PUSH(elt);
311
312     simpleprng_seed(&self->prng, RANDOM_SEED);
313
314     self->thread = g_thread_create(source_push_thread, (gpointer)self, FALSE, NULL);
315
316     return TRUE;
317 }
318
319 static void
320 source_push_class_init(
321     XferSourcePushClass * klass)
322 {
323     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
324     static xfer_element_mech_pair_t mech_pairs[] = {
325         { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, 1, 1},
326         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
327     };
328
329     xec->start = source_push_start_impl;
330     xec->mech_pairs = mech_pairs;
331 }
332
333 GType
334 xfer_source_push_get_type (void)
335 {
336     static GType type = 0;
337
338     if G_UNLIKELY(type == 0) {
339         static const GTypeInfo info = {
340             sizeof (XferSourcePushClass),
341             (GBaseInitFunc) NULL,
342             (GBaseFinalizeFunc) NULL,
343             (GClassInitFunc) source_push_class_init,
344             (GClassFinalizeFunc) NULL,
345             NULL /* class_data */,
346             sizeof (XferSourcePush),
347             0 /* n_preallocs */,
348             (GInstanceInitFunc) NULL,
349             NULL
350         };
351
352         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePush", &info, 0);
353     }
354
355     return type;
356 }
357
358 /* PULL_BUFFER */
359
360 static GType xfer_source_pull_get_type(void);
361 #define XFER_SOURCE_PULL_TYPE (xfer_source_pull_get_type())
362 #define XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull)
363 #define XFER_SOURCE_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull const)
364 #define XFER_SOURCE_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_pull_get_type(), XferSourcePullClass)
365 #define IS_XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_pull_get_type ())
366 #define XFER_SOURCE_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_pull_get_type(), XferSourcePullClass)
367
368 typedef struct XferSourcePull {
369     XferElement __parent__;
370
371     gint nbuffers;
372     GThread *thread;
373     simpleprng_state_t prng;
374 } XferSourcePull;
375
376 typedef struct {
377     XferElementClass __parent__;
378 } XferSourcePullClass;
379
380 static gpointer
381 source_pull_pull_buffer_impl(
382     XferElement *elt,
383     size_t *size)
384 {
385     XferSourcePull *self = XFER_SOURCE_PULL(elt);
386     char *buf;
387     size_t bufsiz;
388
389     if (self->nbuffers > TEST_BLOCK_COUNT) {
390         *size = 0;
391         return NULL;
392     }
393     bufsiz = (self->nbuffers != TEST_BLOCK_COUNT)? TEST_BLOCK_SIZE : TEST_BLOCK_EXTRA;
394
395     self->nbuffers++;
396
397     buf = g_malloc(bufsiz);
398     simpleprng_fill_buffer(&self->prng, buf, bufsiz);
399     *size = bufsiz;
400     return buf;
401 }
402
403 static gboolean
404 source_pull_setup_impl(
405     XferElement *elt)
406 {
407     XferSourcePull *self = XFER_SOURCE_PULL(elt);
408
409     simpleprng_seed(&self->prng, RANDOM_SEED);
410
411     return TRUE;
412 }
413
414 static void
415 source_pull_class_init(
416     XferSourcePullClass * klass)
417 {
418     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
419     static xfer_element_mech_pair_t mech_pairs[] = {
420         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
421         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
422     };
423
424     xec->pull_buffer = source_pull_pull_buffer_impl;
425     xec->setup = source_pull_setup_impl;
426     xec->mech_pairs = mech_pairs;
427 }
428
429 GType
430 xfer_source_pull_get_type (void)
431 {
432     static GType type = 0;
433
434     if G_UNLIKELY(type == 0) {
435         static const GTypeInfo info = {
436             sizeof (XferSourcePullClass),
437             (GBaseInitFunc) NULL,
438             (GBaseFinalizeFunc) NULL,
439             (GClassInitFunc) source_pull_class_init,
440             (GClassFinalizeFunc) NULL,
441             NULL /* class_data */,
442             sizeof (XferSourcePull),
443             0 /* n_preallocs */,
444             (GInstanceInitFunc) NULL,
445             NULL
446         };
447
448         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePull", &info, 0);
449     }
450
451     return type;
452 }
453
454 /* LISTEN */
455
456 static GType xfer_source_listen_get_type(void);
457 #define XFER_SOURCE_LISTEN_TYPE (xfer_source_listen_get_type())
458 #define XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen)
459 #define XFER_SOURCE_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen const)
460 #define XFER_SOURCE_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_listen_get_type(), XferSourceListenClass)
461 #define IS_XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_listen_get_type ())
462 #define XFER_SOURCE_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_listen_get_type(), XferSourceListenClass)
463
464 typedef struct XferSourceListen {
465     XferElement __parent__;
466
467     GThread *thread;
468     simpleprng_state_t prng;
469 } XferSourceListen;
470
471 typedef struct {
472     XferElementClass __parent__;
473 } XferSourceListenClass;
474
475 static gpointer
476 source_listen_thread(
477     gpointer data)
478 {
479     XferSourceListen *self = XFER_SOURCE_LISTEN(data);
480     XferElement *elt = XFER_ELEMENT(self);
481     DirectTCPAddr *addrs;
482     sockaddr_union addr;
483     int sock;
484     char *buf;
485     int i;
486
487     /* set up the sockaddr -- IPv4 only */
488     SU_INIT(&addr, AF_INET);
489     addrs = elt->downstream->input_listen_addrs;
490     g_assert(addrs != NULL);
491     SU_SET_PORT(&addr, addrs->port);
492     ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
493
494     tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
495     sock = socket(AF_INET, SOCK_STREAM, 0);
496     if (sock < 0) {
497         error("socket(): %s", strerror(errno));
498     }
499     if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
500         error("connect(): %s", strerror(errno));
501     }
502
503     tu_dbg("connected to %s\n", str_sockaddr(&addr));
504
505     buf = g_malloc(TEST_BLOCK_SIZE);
506     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
507         simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
508         if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
509             error("error in full_write(): %s", strerror(errno));
510         }
511     }
512
513     /* send a smaller block */
514     simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
515     if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
516         error("error in full_write(): %s", strerror(errno));
517     }
518     g_free(buf);
519
520     /* send EOF by closing the socket */
521     close(sock);
522
523     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
524
525     return NULL;
526 }
527
528 static gboolean
529 source_listen_start_impl(
530     XferElement *elt)
531 {
532     XferSourceListen *self = XFER_SOURCE_LISTEN(elt);
533
534     simpleprng_seed(&self->prng, RANDOM_SEED);
535
536     self->thread = g_thread_create(source_listen_thread, (gpointer)self, FALSE, NULL);
537
538     return TRUE;
539 }
540
541 static void
542 source_listen_class_init(
543     XferSourceListenClass * klass)
544 {
545     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
546     static xfer_element_mech_pair_t mech_pairs[] = {
547         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 1, 0},
548         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
549     };
550
551     xec->start = source_listen_start_impl;
552     xec->mech_pairs = mech_pairs;
553 }
554
555 GType
556 xfer_source_listen_get_type (void)
557 {
558     static GType type = 0;
559
560     if G_UNLIKELY(type == 0) {
561         static const GTypeInfo info = {
562             sizeof (XferSourceListenClass),
563             (GBaseInitFunc) NULL,
564             (GBaseFinalizeFunc) NULL,
565             (GClassInitFunc) source_listen_class_init,
566             (GClassFinalizeFunc) NULL,
567             NULL /* class_data */,
568             sizeof (XferSourceListen),
569             0 /* n_preallocs */,
570             (GInstanceInitFunc) NULL,
571             NULL
572         };
573
574         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceListen", &info, 0);
575     }
576
577     return type;
578 }
579
580 /* CONNECT */
581
582 static GType xfer_source_connect_get_type(void);
583 #define XFER_SOURCE_CONNECT_TYPE (xfer_source_connect_get_type())
584 #define XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect)
585 #define XFER_SOURCE_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect const)
586 #define XFER_SOURCE_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_connect_get_type(), XferSourceConnectClass)
587 #define IS_XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_connect_get_type ())
588 #define XFER_SOURCE_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_connect_get_type(), XferSourceConnectClass)
589
590 typedef struct XferSourceConnect {
591     XferElement __parent__;
592
593     int listen_socket;
594
595     GThread *thread;
596     simpleprng_state_t prng;
597 } XferSourceConnect;
598
599 typedef struct {
600     XferElementClass __parent__;
601 } XferSourceConnectClass;
602
603 static gpointer
604 source_connect_thread(
605     gpointer data)
606 {
607     XferSourceConnect *self = XFER_SOURCE_CONNECT(data);
608     int sock;
609     char *buf;
610     int i;
611
612     g_assert(self->listen_socket != -1);
613
614     if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
615         xfer_cancel_with_error(XFER_ELEMENT(self),
616             _("Error accepting incoming connection: %s"), strerror(errno));
617         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
618         return NULL;
619     }
620
621     /* close the listening socket now, for good measure */
622     close(self->listen_socket);
623     self->listen_socket = -1;
624
625     tu_dbg("connection accepted\n");
626
627     buf = g_malloc(TEST_BLOCK_SIZE);
628     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
629         simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
630         if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
631             error("error in full_write(): %s", strerror(errno));
632         }
633     }
634
635     /* send a smaller block */
636     simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
637     if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
638         error("error in full_write(): %s", strerror(errno));
639     }
640     g_free(buf);
641
642     /* send EOF by closing the socket */
643     close(sock);
644
645     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
646
647     return NULL;
648 }
649
650 static gboolean
651 source_connect_setup_impl(
652     XferElement *elt)
653 {
654     XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
655     sockaddr_union addr;
656     DirectTCPAddr *addrs;
657     socklen_t len;
658     int sock;
659
660     /* set up self->listen_socket and set elt->output_listen_addrs */
661     sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
662     if (sock < 0)
663         error("socket(): %s", strerror(errno));
664
665     if (listen(sock, 1) < 0)
666         error("listen(): %s", strerror(errno));
667
668     len = sizeof(addr);
669     if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
670         error("getsockname(): %s", strerror(errno));
671     g_assert(SU_GET_FAMILY(&addr) == AF_INET);
672
673     addrs = g_new0(DirectTCPAddr, 2);
674     addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
675     addrs[0].port = SU_GET_PORT(&addr);
676     elt->output_listen_addrs = addrs;
677
678     return TRUE;
679 }
680
681 static gboolean
682 source_connect_start_impl(
683     XferElement *elt)
684 {
685     XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
686
687     simpleprng_seed(&self->prng, RANDOM_SEED);
688
689     self->thread = g_thread_create(source_connect_thread, (gpointer)self, FALSE, NULL);
690
691     return TRUE;
692 }
693
694 static void
695 source_connect_class_init(
696     XferSourceConnectClass * klass)
697 {
698     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
699     static xfer_element_mech_pair_t mech_pairs[] = {
700         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 1, 0},
701         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
702     };
703
704     xec->setup = source_connect_setup_impl;
705     xec->start = source_connect_start_impl;
706     xec->mech_pairs = mech_pairs;
707 }
708
709 GType
710 xfer_source_connect_get_type (void)
711 {
712     static GType type = 0;
713
714     if G_UNLIKELY(type == 0) {
715         static const GTypeInfo info = {
716             sizeof (XferSourceConnectClass),
717             (GBaseInitFunc) NULL,
718             (GBaseFinalizeFunc) NULL,
719             (GClassInitFunc) source_connect_class_init,
720             (GClassFinalizeFunc) NULL,
721             NULL /* class_data */,
722             sizeof (XferSourceConnect),
723             0 /* n_preallocs */,
724             (GInstanceInitFunc) NULL,
725             NULL
726         };
727
728         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceConnect", &info, 0);
729     }
730
731     return type;
732 }
733
734 /* READFD */
735
736 static GType xfer_dest_readfd_get_type(void);
737 #define XFER_DEST_READFD_TYPE (xfer_dest_readfd_get_type())
738 #define XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd)
739 #define XFER_DEST_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd const)
740 #define XFER_DEST_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_readfd_get_type(), XferDestReadfdClass)
741 #define IS_XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_readfd_get_type ())
742 #define XFER_DEST_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_readfd_get_type(), XferDestReadfdClass)
743
744 typedef struct XferDestReadfd {
745     XferElement __parent__;
746
747     GThread *thread;
748     simpleprng_state_t prng;
749 } XferDestReadfd;
750
751 typedef struct {
752     XferElementClass __parent__;
753 } XferDestReadfdClass;
754
755 static gpointer
756 dest_readfd_thread(
757     gpointer data)
758 {
759     XferDestReadfd *self = XFER_DEST_READFD(data);
760     char buf[TEST_XFER_SIZE];
761     size_t remaining;
762     int fd = XFER_ELEMENT(self)->upstream->output_fd;
763
764     remaining = sizeof(buf);
765     while (remaining) {
766         ssize_t nread;
767         if ((nread = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
768             error("error in read(): %s", strerror(errno));
769         }
770         remaining -= nread;
771     }
772
773     /* we should be at EOF here */
774     if (read(fd, buf, 10) != 0)
775         g_critical("too much data entering XferDestReadfd");
776
777     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
778         g_critical("data entering XferDestReadfd does not match");
779
780     close(fd);
781     XFER_ELEMENT(self)->upstream->output_fd = -1;
782
783     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
784
785     return NULL;
786 }
787
788 static gboolean
789 dest_readfd_start_impl(
790     XferElement *elt)
791 {
792     XferDestReadfd *self = XFER_DEST_READFD(elt);
793
794     simpleprng_seed(&self->prng, RANDOM_SEED);
795
796     self->thread = g_thread_create(dest_readfd_thread, (gpointer)self, FALSE, NULL);
797
798     return TRUE;
799 }
800
801 static void
802 dest_readfd_class_init(
803     XferDestReadfdClass * klass)
804 {
805     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
806     static xfer_element_mech_pair_t mech_pairs[] = {
807         { XFER_MECH_READFD, XFER_MECH_NONE, 1, 1},
808         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
809     };
810
811     xec->start = dest_readfd_start_impl;
812     xec->mech_pairs = mech_pairs;
813 }
814
815 GType
816 xfer_dest_readfd_get_type (void)
817 {
818     static GType type = 0;
819
820     if G_UNLIKELY(type == 0) {
821         static const GTypeInfo info = {
822             sizeof (XferDestReadfdClass),
823             (GBaseInitFunc) NULL,
824             (GBaseFinalizeFunc) NULL,
825             (GClassInitFunc) dest_readfd_class_init,
826             (GClassFinalizeFunc) NULL,
827             NULL /* class_data */,
828             sizeof (XferDestReadfd),
829             0 /* n_preallocs */,
830             (GInstanceInitFunc) NULL,
831             NULL
832         };
833
834         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestReadfd", &info, 0);
835     }
836
837     return type;
838 }
839
840 /* WRITEFD */
841
842 static GType xfer_dest_writefd_get_type(void);
843 #define XFER_DEST_WRITEFD_TYPE (xfer_dest_writefd_get_type())
844 #define XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd)
845 #define XFER_DEST_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd const)
846 #define XFER_DEST_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_writefd_get_type(), XferDestWritefdClass)
847 #define IS_XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_writefd_get_type ())
848 #define XFER_DEST_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_writefd_get_type(), XferDestWritefdClass)
849
850 typedef struct XferDestWritefd {
851     XferElement __parent__;
852
853     int read_fd;
854     GThread *thread;
855     simpleprng_state_t prng;
856 } XferDestWritefd;
857
858 typedef struct {
859     XferElementClass __parent__;
860 } XferDestWritefdClass;
861
862 static gpointer
863 dest_writefd_thread(
864     gpointer data)
865 {
866     XferDestWritefd *self = XFER_DEST_WRITEFD(data);
867     char buf[TEST_XFER_SIZE];
868     size_t remaining;
869     int fd = self->read_fd;
870
871     remaining = sizeof(buf);
872     while (remaining) {
873         ssize_t nwrite;
874         if ((nwrite = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
875             error("error in read(): %s", strerror(errno));
876         }
877         remaining -= nwrite;
878     }
879
880     /* we should be at EOF here */
881     if (read(fd, buf, 10) != 0)
882         g_critical("too much data entering XferDestWritefd");
883
884     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
885         g_critical("data entering XferDestWritefd does not match");
886
887     close(fd);
888     XFER_ELEMENT(self)->upstream->output_fd = -1;
889
890     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
891
892     return NULL;
893 }
894
895 static gboolean
896 dest_writefd_setup_impl(
897     XferElement *elt)
898 {
899     XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
900     int p[2];
901
902     simpleprng_seed(&self->prng, RANDOM_SEED);
903
904     if (pipe(p) < 0)
905         g_critical("Error from pipe(): %s", strerror(errno));
906
907     self->read_fd = p[0];
908     XFER_ELEMENT(self)->input_fd = p[1];
909
910     return TRUE;
911 }
912
913 static gboolean
914 dest_writefd_start_impl(
915     XferElement *elt)
916 {
917     XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
918     self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL);
919
920     return TRUE;
921 }
922
923 static void
924 dest_writefd_class_init(
925     XferDestWritefdClass * klass)
926 {
927     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
928     static xfer_element_mech_pair_t mech_pairs[] = {
929         { XFER_MECH_WRITEFD, XFER_MECH_NONE, 1, 1},
930         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
931     };
932
933     xec->setup = dest_writefd_setup_impl;
934     xec->start = dest_writefd_start_impl;
935     xec->mech_pairs = mech_pairs;
936 }
937
938 GType
939 xfer_dest_writefd_get_type (void)
940 {
941     static GType type = 0;
942
943     if G_UNLIKELY(type == 0) {
944         static const GTypeInfo info = {
945             sizeof (XferDestWritefdClass),
946             (GBaseInitFunc) NULL,
947             (GBaseFinalizeFunc) NULL,
948             (GClassInitFunc) dest_writefd_class_init,
949             (GClassFinalizeFunc) NULL,
950             NULL /* class_data */,
951             sizeof (XferDestWritefd),
952             0 /* n_preallocs */,
953             (GInstanceInitFunc) NULL,
954             NULL
955         };
956
957         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestWritefd", &info, 0);
958     }
959
960     return type;
961 }
962
963 /* PUSH_BUFFER */
964
965 static GType xfer_dest_push_get_type(void);
966 #define XFER_DEST_PUSH_TYPE (xfer_dest_push_get_type())
967 #define XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush)
968 #define XFER_DEST_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush const)
969 #define XFER_DEST_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_push_get_type(), XferDestPushClass)
970 #define IS_XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_push_get_type ())
971 #define XFER_DEST_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_push_get_type(), XferDestPushClass)
972
973 typedef struct XferDestPush {
974     XferElement __parent__;
975
976     char *buf;
977     size_t bufpos;
978
979     GThread *thread;
980     simpleprng_state_t prng;
981 } XferDestPush;
982
983 typedef struct {
984     XferElementClass __parent__;
985 } XferDestPushClass;
986
987 static void
988 dest_push_push_buffer_impl(
989     XferElement *elt,
990     gpointer buf,
991     size_t size)
992 {
993     XferDestPush *self = XFER_DEST_PUSH(elt);
994
995     if (buf == NULL) {
996         /* if we're at EOF, verify we got the right bytes */
997         g_assert(self->bufpos == TEST_XFER_SIZE);
998         if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE))
999             g_critical("data entering XferDestPush does not match");
1000         g_free(self->buf);
1001         return;
1002     }
1003
1004     g_assert(self->bufpos + size <= TEST_XFER_SIZE);
1005     memcpy(self->buf + self->bufpos, buf, size);
1006     self->bufpos += size;
1007 }
1008
1009 static gboolean
1010 dest_push_setup_impl(
1011     XferElement *elt)
1012 {
1013     XferDestPush *self = XFER_DEST_PUSH(elt);
1014
1015     self->buf = g_malloc(TEST_XFER_SIZE);
1016     simpleprng_seed(&self->prng, RANDOM_SEED);
1017
1018     return TRUE;
1019 }
1020
1021 static void
1022 dest_push_class_init(
1023     XferDestPushClass * klass)
1024 {
1025     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1026     static xfer_element_mech_pair_t mech_pairs[] = {
1027         { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 0},
1028         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1029     };
1030
1031     xec->push_buffer = dest_push_push_buffer_impl;
1032     xec->setup = dest_push_setup_impl;
1033     xec->mech_pairs = mech_pairs;
1034 }
1035
1036 GType
1037 xfer_dest_push_get_type (void)
1038 {
1039     static GType type = 0;
1040
1041     if G_UNLIKELY(type == 0) {
1042         static const GTypeInfo info = {
1043             sizeof (XferDestPushClass),
1044             (GBaseInitFunc) NULL,
1045             (GBaseFinalizeFunc) NULL,
1046             (GClassInitFunc) dest_push_class_init,
1047             (GClassFinalizeFunc) NULL,
1048             NULL /* class_data */,
1049             sizeof (XferDestPush),
1050             0 /* n_preallocs */,
1051             (GInstanceInitFunc) NULL,
1052             NULL
1053         };
1054
1055         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPush", &info, 0);
1056     }
1057
1058     return type;
1059 }
1060
1061 /* PULL_BUFFER */
1062
1063 static GType xfer_dest_pull_get_type(void);
1064 #define XFER_DEST_PULL_TYPE (xfer_dest_pull_get_type())
1065 #define XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull)
1066 #define XFER_DEST_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull const)
1067 #define XFER_DEST_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_pull_get_type(), XferDestPullClass)
1068 #define IS_XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_pull_get_type ())
1069 #define XFER_DEST_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_pull_get_type(), XferDestPullClass)
1070
1071 typedef struct XferDestPull {
1072     XferElement __parent__;
1073
1074     GThread *thread;
1075     simpleprng_state_t prng;
1076 } XferDestPull;
1077
1078 typedef struct {
1079     XferElementClass __parent__;
1080 } XferDestPullClass;
1081
1082 static gpointer
1083 dest_pull_thread(
1084     gpointer data)
1085 {
1086     XferDestPull *self = XFER_DEST_PULL(data);
1087     char fullbuf[TEST_XFER_SIZE];
1088     char *buf;
1089     size_t bufpos = 0;
1090     size_t size;
1091
1092     while ((buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size))) {
1093         g_assert(bufpos + size <= TEST_XFER_SIZE);
1094         memcpy(fullbuf + bufpos, buf, size);
1095         bufpos += size;
1096     }
1097
1098     /* we're at EOF, so verify we got the right bytes */
1099     g_assert(bufpos == TEST_XFER_SIZE);
1100     if (!simpleprng_verify_buffer(&self->prng, fullbuf, TEST_XFER_SIZE))
1101         g_critical("data entering XferDestPull does not match");
1102
1103     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1104
1105     return NULL;
1106 }
1107
1108 static gboolean
1109 dest_pull_start_impl(
1110     XferElement *elt)
1111 {
1112     XferDestPull *self = XFER_DEST_PULL(elt);
1113
1114     simpleprng_seed(&self->prng, RANDOM_SEED);
1115
1116     self->thread = g_thread_create(dest_pull_thread, (gpointer)self, FALSE, NULL);
1117
1118     return TRUE;
1119 }
1120
1121 static void
1122 dest_pull_class_init(
1123     XferDestPullClass * klass)
1124 {
1125     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1126     static xfer_element_mech_pair_t mech_pairs[] = {
1127         { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 1, 1},
1128         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1129     };
1130
1131     xec->start = dest_pull_start_impl;
1132     xec->mech_pairs = mech_pairs;
1133 }
1134
1135 GType
1136 xfer_dest_pull_get_type (void)
1137 {
1138     static GType type = 0;
1139
1140     if G_UNLIKELY(type == 0) {
1141         static const GTypeInfo info = {
1142             sizeof (XferDestPullClass),
1143             (GBaseInitFunc) NULL,
1144             (GBaseFinalizeFunc) NULL,
1145             (GClassInitFunc) dest_pull_class_init,
1146             (GClassFinalizeFunc) NULL,
1147             NULL /* class_data */,
1148             sizeof (XferDestPull),
1149             0 /* n_preallocs */,
1150             (GInstanceInitFunc) NULL,
1151             NULL
1152         };
1153
1154         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPull", &info, 0);
1155     }
1156
1157     return type;
1158 }
1159
1160 /* LISTEN */
1161
1162 static GType xfer_dest_listen_get_type(void);
1163 #define XFER_DEST_LISTEN_TYPE (xfer_dest_listen_get_type())
1164 #define XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen)
1165 #define XFER_DEST_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen const)
1166 #define XFER_DEST_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_listen_get_type(), XferDestListenClass)
1167 #define IS_XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_listen_get_type ())
1168 #define XFER_DEST_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_listen_get_type(), XferDestListenClass)
1169
1170 typedef struct XferDestListen {
1171     XferElement __parent__;
1172
1173     int listen_socket;
1174
1175     GThread *thread;
1176     simpleprng_state_t prng;
1177 } XferDestListen;
1178
1179 typedef struct {
1180     XferElementClass __parent__;
1181 } XferDestListenClass;
1182
1183 static gpointer
1184 dest_listen_thread(
1185     gpointer data)
1186 {
1187     XferDestListen *self = XFER_DEST_LISTEN(data);
1188     char *buf;
1189     size_t bytes = 0;
1190     int sock;
1191
1192     g_assert(self->listen_socket != -1);
1193
1194     if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
1195         xfer_cancel_with_error(XFER_ELEMENT(self),
1196             _("Error accepting incoming connection: %s"), strerror(errno));
1197         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1198         return NULL;
1199     }
1200
1201     /* close the listening socket now, for good measure */
1202     close(self->listen_socket);
1203     self->listen_socket = -1;
1204
1205     /* read from the socket until EOF or all of the data is read.  We try to
1206      * read one extra byte - if we get it, then upstream sent too much data */
1207     buf = g_malloc(TEST_XFER_SIZE+1);
1208     bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1209     g_assert(bytes == TEST_XFER_SIZE);
1210     close(sock);
1211
1212     /* we're at EOF, so verify we got the right bytes */
1213     g_assert(bytes == TEST_XFER_SIZE);
1214     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1215         g_critical("data entering XferDestListen does not match");
1216
1217     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1218
1219     return NULL;
1220 }
1221
1222 static gboolean
1223 dest_listen_setup_impl(
1224     XferElement *elt)
1225 {
1226     XferDestListen *self = XFER_DEST_LISTEN(elt);
1227     sockaddr_union addr;
1228     DirectTCPAddr *addrs;
1229     socklen_t len;
1230     int sock;
1231
1232     /* set up self->listen_socket and set elt->input_listen_addrs */
1233     sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
1234     if (sock < 0)
1235         error("socket(): %s", strerror(errno));
1236
1237     if (listen(sock, 1) < 0)
1238         error("listen(): %s", strerror(errno));
1239
1240     len = sizeof(addr);
1241     if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
1242         error("getsockname(): %s", strerror(errno));
1243     g_assert(SU_GET_FAMILY(&addr) == AF_INET);
1244
1245     addrs = g_new0(DirectTCPAddr, 2);
1246     addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
1247     addrs[0].port = SU_GET_PORT(&addr);
1248     elt->input_listen_addrs = addrs;
1249
1250     return TRUE;
1251 }
1252
1253 static gboolean
1254 dest_listen_start_impl(
1255     XferElement *elt)
1256 {
1257     XferDestListen *self = XFER_DEST_LISTEN(elt);
1258
1259     simpleprng_seed(&self->prng, RANDOM_SEED);
1260
1261     self->thread = g_thread_create(dest_listen_thread, (gpointer)self, FALSE, NULL);
1262
1263     return TRUE;
1264 }
1265
1266 static void
1267 dest_listen_class_init(
1268     XferDestListenClass * klass)
1269 {
1270     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1271     static xfer_element_mech_pair_t mech_pairs[] = {
1272         { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 1, 1},
1273         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1274     };
1275
1276     xec->setup = dest_listen_setup_impl;
1277     xec->start = dest_listen_start_impl;
1278     xec->mech_pairs = mech_pairs;
1279 }
1280
1281 GType
1282 xfer_dest_listen_get_type (void)
1283 {
1284     static GType type = 0;
1285
1286     if G_UNLIKELY(type == 0) {
1287         static const GTypeInfo info = {
1288             sizeof (XferDestListenClass),
1289             (GBaseInitFunc) NULL,
1290             (GBaseFinalizeFunc) NULL,
1291             (GClassInitFunc) dest_listen_class_init,
1292             (GClassFinalizeFunc) NULL,
1293             NULL /* class_data */,
1294             sizeof (XferDestListen),
1295             0 /* n_preallocs */,
1296             (GInstanceInitFunc) NULL,
1297             NULL
1298         };
1299
1300         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestListen", &info, 0);
1301     }
1302
1303     return type;
1304 }
1305
1306 /* CONNET */
1307
1308 static GType xfer_dest_connect_get_type(void);
1309 #define XFER_DEST_CONNECT_TYPE (xfer_dest_connect_get_type())
1310 #define XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect)
1311 #define XFER_DEST_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect const)
1312 #define XFER_DEST_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_connect_get_type(), XferDestConnectClass)
1313 #define IS_XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_connect_get_type ())
1314 #define XFER_DEST_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_connect_get_type(), XferDestConnectClass)
1315
1316 typedef struct XferDestConnect {
1317     XferElement __parent__;
1318
1319     int connect_socket;
1320
1321     GThread *thread;
1322     simpleprng_state_t prng;
1323 } XferDestConnect;
1324
1325 typedef struct {
1326     XferElementClass __parent__;
1327 } XferDestConnectClass;
1328
1329 static gpointer
1330 dest_connect_thread(
1331     gpointer data)
1332 {
1333     XferDestConnect *self = XFER_DEST_CONNECT(data);
1334     XferElement *elt = XFER_ELEMENT(self);
1335     DirectTCPAddr *addrs;
1336     sockaddr_union addr;
1337     char *buf;
1338     size_t bytes = 0;
1339     int sock;
1340
1341     /* set up the sockaddr -- IPv4 only */
1342     SU_INIT(&addr, AF_INET);
1343     addrs = elt->upstream->output_listen_addrs;
1344     g_assert(addrs != NULL);
1345     SU_SET_PORT(&addr, addrs->port);
1346     ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
1347
1348     tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
1349     sock = socket(AF_INET, SOCK_STREAM, 0);
1350     if (sock < 0) {
1351         error("socket(): %s", strerror(errno));
1352     }
1353     if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
1354         error("connect(): %s", strerror(errno));
1355     }
1356
1357     tu_dbg("connected to %s\n", str_sockaddr(&addr));
1358
1359     /* read from the socket until EOF or all of the data is read.  We try to
1360      * read one extra byte - if we get it, then upstream sent too much data */
1361     buf = g_malloc(TEST_XFER_SIZE+1);
1362     bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1363     g_assert(bytes == TEST_XFER_SIZE);
1364     close(sock);
1365
1366     /* we're at EOF, so verify we got the right bytes */
1367     g_assert(bytes == TEST_XFER_SIZE);
1368     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1369         g_critical("data entering XferDestConnect does not match");
1370
1371     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1372
1373     return NULL;
1374 }
1375
1376 static gboolean
1377 dest_connect_start_impl(
1378     XferElement *elt)
1379 {
1380     XferDestConnect *self = XFER_DEST_CONNECT(elt);
1381
1382     simpleprng_seed(&self->prng, RANDOM_SEED);
1383
1384     self->thread = g_thread_create(dest_connect_thread, (gpointer)self, FALSE, NULL);
1385
1386     return TRUE;
1387 }
1388
1389 static void
1390 dest_connect_class_init(
1391     XferDestConnectClass * klass)
1392 {
1393     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1394     static xfer_element_mech_pair_t mech_pairs[] = {
1395         { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, 1, 1},
1396         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1397     };
1398
1399     xec->start = dest_connect_start_impl;
1400     xec->mech_pairs = mech_pairs;
1401 }
1402
1403 GType
1404 xfer_dest_connect_get_type (void)
1405 {
1406     static GType type = 0;
1407
1408     if G_UNLIKELY(type == 0) {
1409         static const GTypeInfo info = {
1410             sizeof (XferDestConnectClass),
1411             (GBaseInitFunc) NULL,
1412             (GBaseFinalizeFunc) NULL,
1413             (GClassInitFunc) dest_connect_class_init,
1414             (GClassFinalizeFunc) NULL,
1415             NULL /* class_data */,
1416             sizeof (XferDestConnect),
1417             0 /* n_preallocs */,
1418             (GInstanceInitFunc) NULL,
1419             NULL
1420         };
1421
1422         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestConnect", &info, 0);
1423     }
1424
1425     return type;
1426 }
1427
1428
1429 /*
1430  * Tests
1431  */
1432
1433 static void
1434 test_xfer_generic_callback(
1435     gpointer data G_GNUC_UNUSED,
1436     XMsg *msg,
1437     Xfer *xfer)
1438 {
1439     tu_dbg("Received message %s\n", xmsg_repr(msg));
1440
1441     switch (msg->type) {
1442         case XMSG_DONE:
1443             /* are we done? */
1444             if (xfer->status == XFER_DONE) {
1445                 tu_dbg("all elements are done!\n");
1446                 g_main_loop_quit(default_main_loop());
1447             }
1448             break;
1449
1450         default:
1451             break;
1452     }
1453 }
1454
1455 /****
1456  * Run a simple transfer with some xor filters
1457  */
1458
1459 static int
1460 test_xfer_simple(void)
1461 {
1462     unsigned int i;
1463     GSource *src;
1464     XferElement *elements[] = {
1465         xfer_source_random(100*1024, RANDOM_SEED),
1466         xfer_filter_xor('d'),
1467         xfer_filter_xor('d'),
1468         xfer_dest_null(RANDOM_SEED),
1469     };
1470
1471     Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1472     src = xfer_get_source(xfer);
1473     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1474     g_source_attach(src, NULL);
1475     tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1476
1477     /* unreference the elements */
1478     for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1479         g_object_unref(elements[i]);
1480         g_assert(G_OBJECT(elements[i])->ref_count == 1);
1481         elements[i] = NULL;
1482     }
1483
1484     xfer_start(xfer);
1485
1486     g_main_loop_run(default_main_loop());
1487     g_assert(xfer->status == XFER_DONE);
1488
1489     xfer_unref(xfer);
1490
1491     return 1;
1492 }
1493
1494 /****
1495  * Run a transfer between two files, with or without filters
1496  */
1497
1498 static int
1499 test_xfer_files(gboolean add_filters)
1500 {
1501     unsigned int i;
1502     unsigned int elts;
1503     GSource *src;
1504     char *in_filename = __FILE__;
1505     char *out_filename = "xfer-test.tmp"; /* current directory is writeable */
1506     int rfd, wfd;
1507     Xfer *xfer;
1508     XferElement *elements[4];
1509
1510     rfd = open(in_filename, O_RDONLY, 0);
1511     if (rfd < 0)
1512         g_critical("Could not open '%s': %s", in_filename, strerror(errno));
1513
1514     wfd = open(out_filename, O_WRONLY|O_CREAT, 0777);
1515     if (wfd < 0)
1516         g_critical("Could not open '%s': %s", out_filename, strerror(errno));
1517
1518     elts = 0;
1519     elements[elts++] = xfer_source_fd(rfd);
1520     if (add_filters) {
1521         elements[elts++] = xfer_filter_xor(0xab);
1522         elements[elts++] = xfer_filter_xor(0xab);
1523     }
1524     elements[elts++] = xfer_dest_fd(wfd);
1525
1526     xfer = xfer_new(elements, elts);
1527     src = xfer_get_source(xfer);
1528     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1529     g_source_attach(src, NULL);
1530     tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1531
1532     /* unreference the elements */
1533     for (i = 0; i < elts; i++) {
1534         g_object_unref(elements[i]);
1535         g_assert(G_OBJECT(elements[i])->ref_count == 1);
1536         elements[i] = NULL;
1537     }
1538
1539     xfer_start(xfer);
1540
1541     g_main_loop_run(default_main_loop());
1542     g_assert(xfer->status == XFER_DONE);
1543
1544     xfer_unref(xfer);
1545
1546     unlink(out_filename); /* ignore any errors */
1547
1548     return 1;
1549 }
1550
1551 static int
1552 test_xfer_files_simple(void)
1553 {
1554     return test_xfer_files(FALSE);
1555 }
1556
1557 static int
1558 test_xfer_files_filter(void)
1559 {
1560     return test_xfer_files(TRUE);
1561 }
1562
1563 /*****
1564  * test each possible combination of source and destination mechansim
1565  */
1566
1567 static int
1568 test_glue_combo(
1569     XferElement *source,
1570     XferElement *dest)
1571 {
1572     unsigned int i;
1573     GSource *src;
1574     XferElement *elements[] = { source, dest };
1575
1576     Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1577     src = xfer_get_source(xfer);
1578     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1579     g_source_attach(src, NULL);
1580
1581     /* unreference the elements */
1582     for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1583         g_object_unref(elements[i]);
1584         g_assert(G_OBJECT(elements[i])->ref_count == 1);
1585         elements[i] = NULL;
1586     }
1587
1588     xfer_start(xfer);
1589
1590     g_main_loop_run(default_main_loop());
1591     g_assert(xfer->status == XFER_DONE);
1592
1593     xfer_unref(xfer);
1594
1595     return 1;
1596 }
1597
1598 #define make_test_glue(n, s, d) static int n(void) \
1599 {\
1600     return test_glue_combo((XferElement *)g_object_new(s, NULL), \
1601                            (XferElement *)g_object_new(d, NULL)); \
1602 }
1603 make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE)
1604 make_test_glue(test_glue_READFD_WRITEFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1605 make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE)
1606 make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE)
1607 make_test_glue(test_glue_READFD_LISTEN, XFER_SOURCE_READFD_TYPE, XFER_DEST_LISTEN_TYPE)
1608 make_test_glue(test_glue_READFD_CONNECT, XFER_SOURCE_READFD_TYPE, XFER_DEST_CONNECT_TYPE)
1609 make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE)
1610 make_test_glue(test_glue_WRITEFD_WRITEFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1611 make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE)
1612 make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE)
1613 make_test_glue(test_glue_WRITEFD_LISTEN, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_LISTEN_TYPE)
1614 make_test_glue(test_glue_WRITEFD_CONNECT, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_CONNECT_TYPE)
1615 make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE)
1616 make_test_glue(test_glue_PUSH_WRITEFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
1617 make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE)
1618 make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE)
1619 make_test_glue(test_glue_PUSH_LISTEN, XFER_SOURCE_PUSH_TYPE, XFER_DEST_LISTEN_TYPE)
1620 make_test_glue(test_glue_PUSH_CONNECT, XFER_SOURCE_PUSH_TYPE, XFER_DEST_CONNECT_TYPE)
1621 make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE)
1622 make_test_glue(test_glue_PULL_WRITEFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
1623 make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE)
1624 make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE)
1625 make_test_glue(test_glue_PULL_LISTEN, XFER_SOURCE_PULL_TYPE, XFER_DEST_LISTEN_TYPE)
1626 make_test_glue(test_glue_PULL_CONNECT, XFER_SOURCE_PULL_TYPE, XFER_DEST_CONNECT_TYPE)
1627 make_test_glue(test_glue_LISTEN_READFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_READFD_TYPE)
1628 make_test_glue(test_glue_LISTEN_WRITEFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_WRITEFD_TYPE)
1629 make_test_glue(test_glue_LISTEN_PUSH, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PUSH_TYPE)
1630 make_test_glue(test_glue_LISTEN_PULL, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PULL_TYPE)
1631 make_test_glue(test_glue_LISTEN_LISTEN, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_LISTEN_TYPE)
1632 make_test_glue(test_glue_LISTEN_CONNECT, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_CONNECT_TYPE)
1633 make_test_glue(test_glue_CONNECT_READFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_READFD_TYPE)
1634 make_test_glue(test_glue_CONNECT_WRITEFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_WRITEFD_TYPE)
1635 make_test_glue(test_glue_CONNECT_PUSH, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PUSH_TYPE)
1636 make_test_glue(test_glue_CONNECT_PULL, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PULL_TYPE)
1637 make_test_glue(test_glue_CONNECT_LISTEN, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_LISTEN_TYPE)
1638 make_test_glue(test_glue_CONNECT_CONNECT, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_CONNECT_TYPE)
1639
1640 /*
1641  * Main driver
1642  */
1643
1644 int
1645 main(int argc, char **argv)
1646 {
1647     static TestUtilsTest tests[] = {
1648         TU_TEST(test_xfer_simple, 90),
1649         TU_TEST(test_xfer_files_simple, 90),
1650         TU_TEST(test_xfer_files_filter, 90),
1651         TU_TEST(test_glue_READFD_READFD, 90),
1652         TU_TEST(test_glue_READFD_WRITEFD, 90),
1653         TU_TEST(test_glue_READFD_PUSH, 90),
1654         TU_TEST(test_glue_READFD_PULL, 90),
1655         TU_TEST(test_glue_READFD_LISTEN, 90),
1656         TU_TEST(test_glue_READFD_CONNECT, 90),
1657         TU_TEST(test_glue_WRITEFD_READFD, 90),
1658         TU_TEST(test_glue_WRITEFD_WRITEFD, 90),
1659         TU_TEST(test_glue_WRITEFD_PUSH, 90),
1660         TU_TEST(test_glue_WRITEFD_PULL, 90),
1661         TU_TEST(test_glue_WRITEFD_LISTEN, 90),
1662         TU_TEST(test_glue_WRITEFD_CONNECT, 90),
1663         TU_TEST(test_glue_PUSH_READFD, 90),
1664         TU_TEST(test_glue_PUSH_WRITEFD, 90),
1665         TU_TEST(test_glue_PUSH_PUSH, 90),
1666         TU_TEST(test_glue_PUSH_PULL, 90),
1667         TU_TEST(test_glue_PUSH_LISTEN, 90),
1668         TU_TEST(test_glue_PUSH_CONNECT, 90),
1669         TU_TEST(test_glue_PULL_READFD, 90),
1670         TU_TEST(test_glue_PULL_WRITEFD, 90),
1671         TU_TEST(test_glue_PULL_PUSH, 90),
1672         TU_TEST(test_glue_PULL_PULL, 90),
1673         TU_TEST(test_glue_PULL_LISTEN, 90),
1674         TU_TEST(test_glue_PULL_CONNECT, 90),
1675         TU_TEST(test_glue_LISTEN_READFD, 90),
1676         TU_TEST(test_glue_LISTEN_WRITEFD, 90),
1677         TU_TEST(test_glue_LISTEN_PUSH, 90),
1678         TU_TEST(test_glue_LISTEN_PULL, 90),
1679         TU_TEST(test_glue_LISTEN_LISTEN, 90),
1680         TU_TEST(test_glue_LISTEN_CONNECT, 90),
1681         TU_TEST(test_glue_CONNECT_READFD, 90),
1682         TU_TEST(test_glue_CONNECT_WRITEFD, 90),
1683         TU_TEST(test_glue_CONNECT_PUSH, 90),
1684         TU_TEST(test_glue_CONNECT_PULL, 90),
1685         TU_TEST(test_glue_CONNECT_LISTEN, 90),
1686         TU_TEST(test_glue_CONNECT_CONNECT, 90),
1687         TU_END()
1688     };
1689
1690     glib_init();
1691
1692     return testutils_run_tests(argc, argv, tests);
1693 }