Imported Upstream version 3.3.3
[debian/amanda] / xfer-src / xfer-test.c
1 /*
2  * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
3  * 
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  * 
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  * 
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  * 
18  * Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  *
21  * Author: Dustin J. Mitchell <dustin@zmanda.com>
22  */
23
24 #include "amanda.h"
25 #include "amxfer.h"
26 #include "glib-util.h"
27 #include "testutils.h"
28 #include "event.h"
29 #include "simpleprng.h"
30 #include "sockaddr-util.h"
31
32 /* Having tests repeat exactly is an advantage, so we use a hard-coded
33  * random seed. */
34 #define RANDOM_SEED 0xf00d
35
36 /*
37  * XferElement subclasses
38  *
39  * This file defines a few "private" element classes that each have only one
40  * mechanism pair.  These classes are then used to test all of the possible
41  * combinations of glue.
42  */
43
44 /* constants to determine the total amount of data to be transfered; EXTRA is
45  * to test out partial-block handling; it should be prime. */
46 #define TEST_BLOCK_SIZE 32768
47 #define TEST_BLOCK_COUNT 10
48 #define TEST_BLOCK_EXTRA 97
49 #define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA)
50
51 /* READFD */
52
53 static GType xfer_source_readfd_get_type(void);
54 #define XFER_SOURCE_READFD_TYPE (xfer_source_readfd_get_type())
55 #define XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd)
56 #define XFER_SOURCE_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd const)
57 #define XFER_SOURCE_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_readfd_get_type(), XferSourceReadfdClass)
58 #define IS_XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_readfd_get_type ())
59 #define XFER_SOURCE_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_readfd_get_type(), XferSourceReadfdClass)
60
61 typedef struct XferSourceReadfd {
62     XferElement __parent__;
63
64     int write_fd;
65     GThread *thread;
66     simpleprng_state_t prng;
67 } XferSourceReadfd;
68
69 typedef struct {
70     XferElementClass __parent__;
71 } XferSourceReadfdClass;
72
73 static gpointer
74 source_readfd_thread(
75     gpointer data)
76 {
77     XferSourceReadfd *self = XFER_SOURCE_READFD(data);
78     char buf[TEST_XFER_SIZE];
79     int fd = self->write_fd;
80
81     simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
82
83     if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
84         error("error in full_write(): %s", strerror(errno));
85     }
86
87     close(fd);
88
89     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
90
91     return NULL;
92 }
93
94 static gboolean
95 source_readfd_setup_impl(
96     XferElement *elt)
97 {
98     XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
99     int p[2];
100
101     simpleprng_seed(&self->prng, RANDOM_SEED);
102
103     if (pipe(p) < 0)
104         g_critical("Error from pipe(): %s", strerror(errno));
105
106     self->write_fd = p[1];
107     g_assert(xfer_element_swap_output_fd(elt, p[0]) == -1);
108
109     return TRUE;
110 }
111
112 static gboolean
113 source_readfd_start_impl(
114     XferElement *elt)
115 {
116     XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
117     self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL);
118
119     return TRUE;
120 }
121
122 static void
123 source_readfd_class_init(
124     XferSourceReadfdClass * klass)
125 {
126     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
127     static xfer_element_mech_pair_t mech_pairs[] = {
128         { XFER_MECH_NONE, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) },
129         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
130     };
131
132     xec->setup = source_readfd_setup_impl;
133     xec->start = source_readfd_start_impl;
134     xec->mech_pairs = mech_pairs;
135 }
136
137 GType
138 xfer_source_readfd_get_type (void)
139 {
140     static GType type = 0;
141
142     if G_UNLIKELY(type == 0) {
143         static const GTypeInfo info = {
144             sizeof (XferSourceReadfdClass),
145             (GBaseInitFunc) NULL,
146             (GBaseFinalizeFunc) NULL,
147             (GClassInitFunc) source_readfd_class_init,
148             (GClassFinalizeFunc) NULL,
149             NULL /* class_data */,
150             sizeof (XferSourceReadfd),
151             0 /* n_preallocs */,
152             (GInstanceInitFunc) NULL,
153             NULL
154         };
155
156         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceReadfd", &info, 0);
157     }
158
159     return type;
160 }
161
162 /* WRITEFD */
163
164 static GType xfer_source_writefd_get_type(void);
165 #define XFER_SOURCE_WRITEFD_TYPE (xfer_source_writefd_get_type())
166 #define XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd)
167 #define XFER_SOURCE_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd const)
168 #define XFER_SOURCE_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_writefd_get_type(), XferSourceWritefdClass)
169 #define IS_XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_writefd_get_type ())
170 #define XFER_SOURCE_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_writefd_get_type(), XferSourceWritefdClass)
171
172 typedef struct XferSourceWritefd {
173     XferElement __parent__;
174
175     GThread *thread;
176     simpleprng_state_t prng;
177 } XferSourceWritefd;
178
179 typedef struct {
180     XferElementClass __parent__;
181 } XferSourceWritefdClass;
182
183 static gpointer
184 source_writefd_thread(
185     gpointer data)
186 {
187     XferSourceWritefd *self = XFER_SOURCE_WRITEFD(data);
188     XferElement *elt = XFER_ELEMENT(data);
189     char buf[TEST_XFER_SIZE];
190     int fd = xfer_element_swap_input_fd(elt->downstream, -1);
191
192     /* this shouldn't happen, although non-test elements handle it gracefully */
193     g_assert(fd != -1);
194
195     simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
196
197     if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
198         error("error in full_write(): %s", strerror(errno));
199     }
200
201     close(fd);
202
203     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
204
205     return NULL;
206 }
207
208 static gboolean
209 source_writefd_start_impl(
210     XferElement *elt)
211 {
212     XferSourceWritefd *self = XFER_SOURCE_WRITEFD(elt);
213
214     simpleprng_seed(&self->prng, RANDOM_SEED);
215
216     self->thread = g_thread_create(source_writefd_thread, (gpointer)self, FALSE, NULL);
217
218     return TRUE;
219 }
220
221 static void
222 source_writefd_class_init(
223     XferSourceWritefdClass * klass)
224 {
225     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
226     static xfer_element_mech_pair_t mech_pairs[] = {
227         { XFER_MECH_NONE, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) },
228         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
229     };
230
231     xec->start = source_writefd_start_impl;
232     xec->mech_pairs = mech_pairs;
233 }
234
235 GType
236 xfer_source_writefd_get_type (void)
237 {
238     static GType type = 0;
239
240     if G_UNLIKELY(type == 0) {
241         static const GTypeInfo info = {
242             sizeof (XferSourceWritefdClass),
243             (GBaseInitFunc) NULL,
244             (GBaseFinalizeFunc) NULL,
245             (GClassInitFunc) source_writefd_class_init,
246             (GClassFinalizeFunc) NULL,
247             NULL /* class_data */,
248             sizeof (XferSourceWritefd),
249             0 /* n_preallocs */,
250             (GInstanceInitFunc) NULL,
251             NULL
252         };
253
254         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceWritefd", &info, 0);
255     }
256
257     return type;
258 }
259
260 /* PUSH_BUFFER */
261
262 static GType xfer_source_push_get_type(void);
263 #define XFER_SOURCE_PUSH_TYPE (xfer_source_push_get_type())
264 #define XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush)
265 #define XFER_SOURCE_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush const)
266 #define XFER_SOURCE_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_push_get_type(), XferSourcePushClass)
267 #define IS_XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_push_get_type ())
268 #define XFER_SOURCE_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_push_get_type(), XferSourcePushClass)
269
270 typedef struct XferSourcePush {
271     XferElement __parent__;
272
273     GThread *thread;
274     simpleprng_state_t prng;
275 } XferSourcePush;
276
277 typedef struct {
278     XferElementClass __parent__;
279 } XferSourcePushClass;
280
281 static gpointer
282 source_push_thread(
283     gpointer data)
284 {
285     XferSourcePush *self = XFER_SOURCE_PUSH(data);
286     char *buf;
287     int i;
288
289     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
290         buf = g_malloc(TEST_BLOCK_SIZE);
291         simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
292         xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_SIZE);
293         buf = NULL;
294     }
295
296     /* send a smaller block */
297     buf = g_malloc(TEST_BLOCK_EXTRA);
298     simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
299     xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_EXTRA);
300     buf = NULL;
301
302     /* send EOF */
303     xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, NULL, 0);
304
305     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
306
307     return NULL;
308 }
309
310 static gboolean
311 source_push_start_impl(
312     XferElement *elt)
313 {
314     XferSourcePush *self = XFER_SOURCE_PUSH(elt);
315
316     simpleprng_seed(&self->prng, RANDOM_SEED);
317
318     self->thread = g_thread_create(source_push_thread, (gpointer)self, FALSE, NULL);
319
320     return TRUE;
321 }
322
323 static void
324 source_push_class_init(
325     XferSourcePushClass * klass)
326 {
327     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
328     static xfer_element_mech_pair_t mech_pairs[] = {
329         { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) },
330         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
331     };
332
333     xec->start = source_push_start_impl;
334     xec->mech_pairs = mech_pairs;
335 }
336
337 GType
338 xfer_source_push_get_type (void)
339 {
340     static GType type = 0;
341
342     if G_UNLIKELY(type == 0) {
343         static const GTypeInfo info = {
344             sizeof (XferSourcePushClass),
345             (GBaseInitFunc) NULL,
346             (GBaseFinalizeFunc) NULL,
347             (GClassInitFunc) source_push_class_init,
348             (GClassFinalizeFunc) NULL,
349             NULL /* class_data */,
350             sizeof (XferSourcePush),
351             0 /* n_preallocs */,
352             (GInstanceInitFunc) NULL,
353             NULL
354         };
355
356         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePush", &info, 0);
357     }
358
359     return type;
360 }
361
362 /* PULL_BUFFER */
363
364 static GType xfer_source_pull_get_type(void);
365 #define XFER_SOURCE_PULL_TYPE (xfer_source_pull_get_type())
366 #define XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull)
367 #define XFER_SOURCE_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull const)
368 #define XFER_SOURCE_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_pull_get_type(), XferSourcePullClass)
369 #define IS_XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_pull_get_type ())
370 #define XFER_SOURCE_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_pull_get_type(), XferSourcePullClass)
371
372 typedef struct XferSourcePull {
373     XferElement __parent__;
374
375     gint nbuffers;
376     GThread *thread;
377     simpleprng_state_t prng;
378 } XferSourcePull;
379
380 typedef struct {
381     XferElementClass __parent__;
382 } XferSourcePullClass;
383
384 static gpointer
385 source_pull_pull_buffer_impl(
386     XferElement *elt,
387     size_t *size)
388 {
389     XferSourcePull *self = XFER_SOURCE_PULL(elt);
390     char *buf;
391     size_t bufsiz;
392
393     if (self->nbuffers > TEST_BLOCK_COUNT) {
394         *size = 0;
395         return NULL;
396     }
397     bufsiz = (self->nbuffers != TEST_BLOCK_COUNT)? TEST_BLOCK_SIZE : TEST_BLOCK_EXTRA;
398
399     self->nbuffers++;
400
401     buf = g_malloc(bufsiz);
402     simpleprng_fill_buffer(&self->prng, buf, bufsiz);
403     *size = bufsiz;
404     return buf;
405 }
406
407 static gboolean
408 source_pull_setup_impl(
409     XferElement *elt)
410 {
411     XferSourcePull *self = XFER_SOURCE_PULL(elt);
412
413     simpleprng_seed(&self->prng, RANDOM_SEED);
414
415     return TRUE;
416 }
417
418 static void
419 source_pull_class_init(
420     XferSourcePullClass * klass)
421 {
422     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
423     static xfer_element_mech_pair_t mech_pairs[] = {
424         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) },
425         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
426     };
427
428     xec->pull_buffer = source_pull_pull_buffer_impl;
429     xec->setup = source_pull_setup_impl;
430     xec->mech_pairs = mech_pairs;
431 }
432
433 GType
434 xfer_source_pull_get_type (void)
435 {
436     static GType type = 0;
437
438     if G_UNLIKELY(type == 0) {
439         static const GTypeInfo info = {
440             sizeof (XferSourcePullClass),
441             (GBaseInitFunc) NULL,
442             (GBaseFinalizeFunc) NULL,
443             (GClassInitFunc) source_pull_class_init,
444             (GClassFinalizeFunc) NULL,
445             NULL /* class_data */,
446             sizeof (XferSourcePull),
447             0 /* n_preallocs */,
448             (GInstanceInitFunc) NULL,
449             NULL
450         };
451
452         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePull", &info, 0);
453     }
454
455     return type;
456 }
457
458 /* LISTEN */
459
460 static GType xfer_source_listen_get_type(void);
461 #define XFER_SOURCE_LISTEN_TYPE (xfer_source_listen_get_type())
462 #define XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen)
463 #define XFER_SOURCE_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen const)
464 #define XFER_SOURCE_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_listen_get_type(), XferSourceListenClass)
465 #define IS_XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_listen_get_type ())
466 #define XFER_SOURCE_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_listen_get_type(), XferSourceListenClass)
467
468 typedef struct XferSourceListen {
469     XferElement __parent__;
470
471     GThread *thread;
472     simpleprng_state_t prng;
473 } XferSourceListen;
474
475 typedef struct {
476     XferElementClass __parent__;
477 } XferSourceListenClass;
478
479 static gpointer
480 source_listen_thread(
481     gpointer data)
482 {
483     XferSourceListen *self = XFER_SOURCE_LISTEN(data);
484     XferElement *elt = XFER_ELEMENT(self);
485     DirectTCPAddr *addrs;
486     int sock;
487     char *buf;
488     int i;
489
490     /* set up the sockaddr -- IPv4 only */
491     addrs = elt->downstream->input_listen_addrs;
492     g_assert(addrs != NULL);
493
494     tu_dbg("making data connection to %s\n", str_sockaddr(addrs));
495     sock = socket(SU_GET_FAMILY(addrs), SOCK_STREAM, 0);
496     if (sock < 0) {
497         error("socket(): %s", strerror(errno));
498     }
499     if (connect(sock, (struct sockaddr *)addrs, SS_LEN(addrs)) < 0) {
500         error("connect(): %s", strerror(errno));
501     }
502
503     tu_dbg("connected to %s\n", str_sockaddr(addrs));
504
505     buf = g_malloc(TEST_BLOCK_SIZE);
506     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
507         simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
508         if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
509             error("error in full_write(): %s", strerror(errno));
510         }
511     }
512
513     /* send a smaller block */
514     simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
515     if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
516         error("error in full_write(): %s", strerror(errno));
517     }
518     g_free(buf);
519
520     /* send EOF by closing the socket */
521     close(sock);
522
523     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
524
525     return NULL;
526 }
527
528 static gboolean
529 source_listen_start_impl(
530     XferElement *elt)
531 {
532     XferSourceListen *self = XFER_SOURCE_LISTEN(elt);
533
534     simpleprng_seed(&self->prng, RANDOM_SEED);
535
536     self->thread = g_thread_create(source_listen_thread, (gpointer)self, FALSE, NULL);
537
538     return TRUE;
539 }
540
541 static void
542 source_listen_class_init(
543     XferSourceListenClass * klass)
544 {
545     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
546     static xfer_element_mech_pair_t mech_pairs[] = {
547         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) },
548         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
549     };
550
551     xec->start = source_listen_start_impl;
552     xec->mech_pairs = mech_pairs;
553 }
554
555 GType
556 xfer_source_listen_get_type (void)
557 {
558     static GType type = 0;
559
560     if G_UNLIKELY(type == 0) {
561         static const GTypeInfo info = {
562             sizeof (XferSourceListenClass),
563             (GBaseInitFunc) NULL,
564             (GBaseFinalizeFunc) NULL,
565             (GClassInitFunc) source_listen_class_init,
566             (GClassFinalizeFunc) NULL,
567             NULL /* class_data */,
568             sizeof (XferSourceListen),
569             0 /* n_preallocs */,
570             (GInstanceInitFunc) NULL,
571             NULL
572         };
573
574         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceListen", &info, 0);
575     }
576
577     return type;
578 }
579
580 /* CONNECT */
581
582 static GType xfer_source_connect_get_type(void);
583 #define XFER_SOURCE_CONNECT_TYPE (xfer_source_connect_get_type())
584 #define XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect)
585 #define XFER_SOURCE_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect const)
586 #define XFER_SOURCE_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_connect_get_type(), XferSourceConnectClass)
587 #define IS_XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_connect_get_type ())
588 #define XFER_SOURCE_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_connect_get_type(), XferSourceConnectClass)
589
590 typedef struct XferSourceConnect {
591     XferElement __parent__;
592
593     int listen_socket;
594
595     GThread *thread;
596     simpleprng_state_t prng;
597 } XferSourceConnect;
598
599 typedef struct {
600     XferElementClass __parent__;
601 } XferSourceConnectClass;
602
603 static gpointer
604 source_connect_thread(
605     gpointer data)
606 {
607     XferSourceConnect *self = XFER_SOURCE_CONNECT(data);
608     int sock;
609     char *buf;
610     int i;
611
612     g_assert(self->listen_socket != -1);
613
614     if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
615         xfer_cancel_with_error(XFER_ELEMENT(self),
616             _("Error accepting incoming connection: %s"), strerror(errno));
617         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
618         return NULL;
619     }
620
621     /* close the listening socket now, for good measure */
622     close(self->listen_socket);
623     self->listen_socket = -1;
624
625     tu_dbg("connection accepted\n");
626
627     buf = g_malloc(TEST_BLOCK_SIZE);
628     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
629         simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
630         if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
631             error("error in full_write(): %s", strerror(errno));
632         }
633     }
634
635     /* send a smaller block */
636     simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
637     if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
638         error("error in full_write(): %s", strerror(errno));
639     }
640     g_free(buf);
641
642     /* send EOF by closing the socket */
643     close(sock);
644
645     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
646
647     return NULL;
648 }
649
650 static gboolean
651 source_connect_setup_impl(
652     XferElement *elt)
653 {
654     XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
655     sockaddr_union addr;
656     DirectTCPAddr *addrs;
657     socklen_t len;
658     int sock;
659
660     /* set up self->listen_socket and set elt->output_listen_addrs */
661     sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
662     if (sock < 0)
663         error("socket(): %s", strerror(errno));
664
665     if (listen(sock, 1) < 0)
666         error("listen(): %s", strerror(errno));
667
668     len = sizeof(addr);
669     if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
670         error("getsockname(): %s", strerror(errno));
671     g_assert(SU_GET_FAMILY(&addr) == AF_INET);
672
673     addrs = g_new0(DirectTCPAddr, 2);
674     copy_sockaddr(&addrs[0], &addr);
675     elt->output_listen_addrs = addrs;
676
677     return TRUE;
678 }
679
680 static gboolean
681 source_connect_start_impl(
682     XferElement *elt)
683 {
684     XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
685
686     simpleprng_seed(&self->prng, RANDOM_SEED);
687
688     self->thread = g_thread_create(source_connect_thread, (gpointer)self, FALSE, NULL);
689
690     return TRUE;
691 }
692
693 static void
694 source_connect_class_init(
695     XferSourceConnectClass * klass)
696 {
697     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
698     static xfer_element_mech_pair_t mech_pairs[] = {
699         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) },
700         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
701     };
702
703     xec->setup = source_connect_setup_impl;
704     xec->start = source_connect_start_impl;
705     xec->mech_pairs = mech_pairs;
706 }
707
708 GType
709 xfer_source_connect_get_type (void)
710 {
711     static GType type = 0;
712
713     if G_UNLIKELY(type == 0) {
714         static const GTypeInfo info = {
715             sizeof (XferSourceConnectClass),
716             (GBaseInitFunc) NULL,
717             (GBaseFinalizeFunc) NULL,
718             (GClassInitFunc) source_connect_class_init,
719             (GClassFinalizeFunc) NULL,
720             NULL /* class_data */,
721             sizeof (XferSourceConnect),
722             0 /* n_preallocs */,
723             (GInstanceInitFunc) NULL,
724             NULL
725         };
726
727         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceConnect", &info, 0);
728     }
729
730     return type;
731 }
732
733 /* READFD */
734
735 static GType xfer_dest_readfd_get_type(void);
736 #define XFER_DEST_READFD_TYPE (xfer_dest_readfd_get_type())
737 #define XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd)
738 #define XFER_DEST_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd const)
739 #define XFER_DEST_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_readfd_get_type(), XferDestReadfdClass)
740 #define IS_XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_readfd_get_type ())
741 #define XFER_DEST_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_readfd_get_type(), XferDestReadfdClass)
742
743 typedef struct XferDestReadfd {
744     XferElement __parent__;
745
746     GThread *thread;
747     simpleprng_state_t prng;
748 } XferDestReadfd;
749
750 typedef struct {
751     XferElementClass __parent__;
752 } XferDestReadfdClass;
753
754 static gpointer
755 dest_readfd_thread(
756     gpointer data)
757 {
758     XferDestReadfd *self = XFER_DEST_READFD(data);
759     XferElement *elt = XFER_ELEMENT(data);
760     char buf[TEST_XFER_SIZE];
761     size_t remaining;
762     int fd = xfer_element_swap_output_fd(elt->upstream, -1);
763
764     /* this shouldn't happen, although non-test elements handle it gracefully */
765     g_assert(fd != -1);
766
767     remaining = sizeof(buf);
768     while (remaining) {
769         ssize_t nread;
770         if ((nread = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
771             error("error in read(): %s", strerror(errno));
772         }
773         remaining -= nread;
774     }
775
776     /* we should be at EOF here */
777     if (read(fd, buf, 10) != 0)
778         g_critical("too much data entering XferDestReadfd");
779
780     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
781         g_critical("data entering XferDestReadfd does not match");
782
783     close(fd);
784
785     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
786
787     return NULL;
788 }
789
790 static gboolean
791 dest_readfd_start_impl(
792     XferElement *elt)
793 {
794     XferDestReadfd *self = XFER_DEST_READFD(elt);
795
796     simpleprng_seed(&self->prng, RANDOM_SEED);
797
798     self->thread = g_thread_create(dest_readfd_thread, (gpointer)self, FALSE, NULL);
799
800     return TRUE;
801 }
802
803 static void
804 dest_readfd_class_init(
805     XferDestReadfdClass * klass)
806 {
807     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
808     static xfer_element_mech_pair_t mech_pairs[] = {
809         { XFER_MECH_READFD, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
810         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
811     };
812
813     xec->start = dest_readfd_start_impl;
814     xec->mech_pairs = mech_pairs;
815 }
816
817 GType
818 xfer_dest_readfd_get_type (void)
819 {
820     static GType type = 0;
821
822     if G_UNLIKELY(type == 0) {
823         static const GTypeInfo info = {
824             sizeof (XferDestReadfdClass),
825             (GBaseInitFunc) NULL,
826             (GBaseFinalizeFunc) NULL,
827             (GClassInitFunc) dest_readfd_class_init,
828             (GClassFinalizeFunc) NULL,
829             NULL /* class_data */,
830             sizeof (XferDestReadfd),
831             0 /* n_preallocs */,
832             (GInstanceInitFunc) NULL,
833             NULL
834         };
835
836         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestReadfd", &info, 0);
837     }
838
839     return type;
840 }
841
842 /* WRITEFD */
843
844 static GType xfer_dest_writefd_get_type(void);
845 #define XFER_DEST_WRITEFD_TYPE (xfer_dest_writefd_get_type())
846 #define XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd)
847 #define XFER_DEST_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd const)
848 #define XFER_DEST_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_writefd_get_type(), XferDestWritefdClass)
849 #define IS_XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_writefd_get_type ())
850 #define XFER_DEST_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_writefd_get_type(), XferDestWritefdClass)
851
852 typedef struct XferDestWritefd {
853     XferElement __parent__;
854
855     int read_fd;
856     GThread *thread;
857     simpleprng_state_t prng;
858 } XferDestWritefd;
859
860 typedef struct {
861     XferElementClass __parent__;
862 } XferDestWritefdClass;
863
864 static gpointer
865 dest_writefd_thread(
866     gpointer data)
867 {
868     XferDestWritefd *self = XFER_DEST_WRITEFD(data);
869     char buf[TEST_XFER_SIZE];
870     size_t remaining;
871     int fd = self->read_fd;
872
873     remaining = sizeof(buf);
874     while (remaining) {
875         ssize_t nwrite;
876         if ((nwrite = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
877             error("error in read(): %s", strerror(errno));
878         }
879         remaining -= nwrite;
880     }
881
882     /* we should be at EOF here */
883     if (read(fd, buf, 10) != 0)
884         g_critical("too much data entering XferDestWritefd");
885
886     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
887         g_critical("data entering XferDestWritefd does not match");
888
889     close(fd);
890
891     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
892
893     return NULL;
894 }
895
896 static gboolean
897 dest_writefd_setup_impl(
898     XferElement *elt)
899 {
900     XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
901     int p[2];
902
903     simpleprng_seed(&self->prng, RANDOM_SEED);
904
905     if (pipe(p) < 0)
906         g_critical("Error from pipe(): %s", strerror(errno));
907
908     self->read_fd = p[0];
909     g_assert(xfer_element_swap_input_fd(elt, p[1]) == -1);
910
911     return TRUE;
912 }
913
914 static gboolean
915 dest_writefd_start_impl(
916     XferElement *elt)
917 {
918     XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
919     self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL);
920
921     return TRUE;
922 }
923
924 static void
925 dest_writefd_class_init(
926     XferDestWritefdClass * klass)
927 {
928     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
929     static xfer_element_mech_pair_t mech_pairs[] = {
930         { XFER_MECH_WRITEFD, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
931         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
932     };
933
934     xec->setup = dest_writefd_setup_impl;
935     xec->start = dest_writefd_start_impl;
936     xec->mech_pairs = mech_pairs;
937 }
938
939 GType
940 xfer_dest_writefd_get_type (void)
941 {
942     static GType type = 0;
943
944     if G_UNLIKELY(type == 0) {
945         static const GTypeInfo info = {
946             sizeof (XferDestWritefdClass),
947             (GBaseInitFunc) NULL,
948             (GBaseFinalizeFunc) NULL,
949             (GClassInitFunc) dest_writefd_class_init,
950             (GClassFinalizeFunc) NULL,
951             NULL /* class_data */,
952             sizeof (XferDestWritefd),
953             0 /* n_preallocs */,
954             (GInstanceInitFunc) NULL,
955             NULL
956         };
957
958         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestWritefd", &info, 0);
959     }
960
961     return type;
962 }
963
964 /* PUSH_BUFFER */
965
966 static GType xfer_dest_push_get_type(void);
967 #define XFER_DEST_PUSH_TYPE (xfer_dest_push_get_type())
968 #define XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush)
969 #define XFER_DEST_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush const)
970 #define XFER_DEST_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_push_get_type(), XferDestPushClass)
971 #define IS_XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_push_get_type ())
972 #define XFER_DEST_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_push_get_type(), XferDestPushClass)
973
974 typedef struct XferDestPush {
975     XferElement __parent__;
976
977     char *buf;
978     size_t bufpos;
979
980     GThread *thread;
981     simpleprng_state_t prng;
982 } XferDestPush;
983
984 typedef struct {
985     XferElementClass __parent__;
986 } XferDestPushClass;
987
988 static void
989 dest_push_push_buffer_impl(
990     XferElement *elt,
991     gpointer buf,
992     size_t size)
993 {
994     XferDestPush *self = XFER_DEST_PUSH(elt);
995
996     if (buf == NULL) {
997         /* if we're at EOF, verify we got the right bytes */
998         g_assert(self->bufpos == TEST_XFER_SIZE);
999         if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE))
1000             g_critical("data entering XferDestPush does not match");
1001         g_free(self->buf);
1002         return;
1003     }
1004
1005     g_assert(self->bufpos + size <= TEST_XFER_SIZE);
1006     memcpy(self->buf + self->bufpos, buf, size);
1007     self->bufpos += size;
1008 }
1009
1010 static gboolean
1011 dest_push_setup_impl(
1012     XferElement *elt)
1013 {
1014     XferDestPush *self = XFER_DEST_PUSH(elt);
1015
1016     self->buf = g_malloc(TEST_XFER_SIZE);
1017     simpleprng_seed(&self->prng, RANDOM_SEED);
1018
1019     return TRUE;
1020 }
1021
1022 static void
1023 dest_push_class_init(
1024     XferDestPushClass * klass)
1025 {
1026     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1027     static xfer_element_mech_pair_t mech_pairs[] = {
1028         { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(0) },
1029         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1030     };
1031
1032     xec->push_buffer = dest_push_push_buffer_impl;
1033     xec->setup = dest_push_setup_impl;
1034     xec->mech_pairs = mech_pairs;
1035 }
1036
1037 GType
1038 xfer_dest_push_get_type (void)
1039 {
1040     static GType type = 0;
1041
1042     if G_UNLIKELY(type == 0) {
1043         static const GTypeInfo info = {
1044             sizeof (XferDestPushClass),
1045             (GBaseInitFunc) NULL,
1046             (GBaseFinalizeFunc) NULL,
1047             (GClassInitFunc) dest_push_class_init,
1048             (GClassFinalizeFunc) NULL,
1049             NULL /* class_data */,
1050             sizeof (XferDestPush),
1051             0 /* n_preallocs */,
1052             (GInstanceInitFunc) NULL,
1053             NULL
1054         };
1055
1056         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPush", &info, 0);
1057     }
1058
1059     return type;
1060 }
1061
1062 /* PULL_BUFFER */
1063
1064 static GType xfer_dest_pull_get_type(void);
1065 #define XFER_DEST_PULL_TYPE (xfer_dest_pull_get_type())
1066 #define XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull)
1067 #define XFER_DEST_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull const)
1068 #define XFER_DEST_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_pull_get_type(), XferDestPullClass)
1069 #define IS_XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_pull_get_type ())
1070 #define XFER_DEST_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_pull_get_type(), XferDestPullClass)
1071
1072 typedef struct XferDestPull {
1073     XferElement __parent__;
1074
1075     GThread *thread;
1076     simpleprng_state_t prng;
1077 } XferDestPull;
1078
1079 typedef struct {
1080     XferElementClass __parent__;
1081 } XferDestPullClass;
1082
1083 static gpointer
1084 dest_pull_thread(
1085     gpointer data)
1086 {
1087     XferDestPull *self = XFER_DEST_PULL(data);
1088     char fullbuf[TEST_XFER_SIZE];
1089     char *buf;
1090     size_t bufpos = 0;
1091     size_t size;
1092
1093     while ((buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size))) {
1094         g_assert(bufpos + size <= TEST_XFER_SIZE);
1095         memcpy(fullbuf + bufpos, buf, size);
1096         bufpos += size;
1097     }
1098
1099     /* we're at EOF, so verify we got the right bytes */
1100     g_assert(bufpos == TEST_XFER_SIZE);
1101     if (!simpleprng_verify_buffer(&self->prng, fullbuf, TEST_XFER_SIZE))
1102         g_critical("data entering XferDestPull does not match");
1103
1104     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1105
1106     return NULL;
1107 }
1108
1109 static gboolean
1110 dest_pull_start_impl(
1111     XferElement *elt)
1112 {
1113     XferDestPull *self = XFER_DEST_PULL(elt);
1114
1115     simpleprng_seed(&self->prng, RANDOM_SEED);
1116
1117     self->thread = g_thread_create(dest_pull_thread, (gpointer)self, FALSE, NULL);
1118
1119     return TRUE;
1120 }
1121
1122 static void
1123 dest_pull_class_init(
1124     XferDestPullClass * klass)
1125 {
1126     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1127     static xfer_element_mech_pair_t mech_pairs[] = {
1128         { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
1129         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1130     };
1131
1132     xec->start = dest_pull_start_impl;
1133     xec->mech_pairs = mech_pairs;
1134 }
1135
1136 GType
1137 xfer_dest_pull_get_type (void)
1138 {
1139     static GType type = 0;
1140
1141     if G_UNLIKELY(type == 0) {
1142         static const GTypeInfo info = {
1143             sizeof (XferDestPullClass),
1144             (GBaseInitFunc) NULL,
1145             (GBaseFinalizeFunc) NULL,
1146             (GClassInitFunc) dest_pull_class_init,
1147             (GClassFinalizeFunc) NULL,
1148             NULL /* class_data */,
1149             sizeof (XferDestPull),
1150             0 /* n_preallocs */,
1151             (GInstanceInitFunc) NULL,
1152             NULL
1153         };
1154
1155         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPull", &info, 0);
1156     }
1157
1158     return type;
1159 }
1160
1161 /* LISTEN */
1162
1163 static GType xfer_dest_listen_get_type(void);
1164 #define XFER_DEST_LISTEN_TYPE (xfer_dest_listen_get_type())
1165 #define XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen)
1166 #define XFER_DEST_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen const)
1167 #define XFER_DEST_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_listen_get_type(), XferDestListenClass)
1168 #define IS_XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_listen_get_type ())
1169 #define XFER_DEST_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_listen_get_type(), XferDestListenClass)
1170
1171 typedef struct XferDestListen {
1172     XferElement __parent__;
1173
1174     int listen_socket;
1175
1176     GThread *thread;
1177     simpleprng_state_t prng;
1178 } XferDestListen;
1179
1180 typedef struct {
1181     XferElementClass __parent__;
1182 } XferDestListenClass;
1183
1184 static gpointer
1185 dest_listen_thread(
1186     gpointer data)
1187 {
1188     XferDestListen *self = XFER_DEST_LISTEN(data);
1189     char *buf;
1190     size_t bytes = 0;
1191     int sock;
1192
1193     g_assert(self->listen_socket != -1);
1194
1195     if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
1196         xfer_cancel_with_error(XFER_ELEMENT(self),
1197             _("Error accepting incoming connection: %s"), strerror(errno));
1198         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1199         return NULL;
1200     }
1201
1202     /* close the listening socket now, for good measure */
1203     close(self->listen_socket);
1204     self->listen_socket = -1;
1205
1206     /* read from the socket until EOF or all of the data is read.  We try to
1207      * read one extra byte - if we get it, then upstream sent too much data */
1208     buf = g_malloc(TEST_XFER_SIZE+1);
1209     bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1210     g_assert(bytes == TEST_XFER_SIZE);
1211     close(sock);
1212
1213     /* we're at EOF, so verify we got the right bytes */
1214     g_assert(bytes == TEST_XFER_SIZE);
1215     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1216         g_critical("data entering XferDestListen does not match");
1217
1218     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1219
1220     return NULL;
1221 }
1222
1223 static gboolean
1224 dest_listen_setup_impl(
1225     XferElement *elt)
1226 {
1227     XferDestListen *self = XFER_DEST_LISTEN(elt);
1228     sockaddr_union addr;
1229     DirectTCPAddr *addrs;
1230     socklen_t len;
1231     int sock;
1232
1233     /* set up self->listen_socket and set elt->input_listen_addrs */
1234     sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
1235     if (sock < 0)
1236         error("socket(): %s", strerror(errno));
1237
1238     if (listen(sock, 1) < 0)
1239         error("listen(): %s", strerror(errno));
1240
1241     len = sizeof(addr);
1242     if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
1243         error("getsockname(): %s", strerror(errno));
1244     g_assert(SU_GET_FAMILY(&addr) == AF_INET);
1245
1246     addrs = g_new0(DirectTCPAddr, 2);
1247     copy_sockaddr(&addrs[0], &addr);
1248     elt->input_listen_addrs = addrs;
1249
1250     return TRUE;
1251 }
1252
1253 static gboolean
1254 dest_listen_start_impl(
1255     XferElement *elt)
1256 {
1257     XferDestListen *self = XFER_DEST_LISTEN(elt);
1258
1259     simpleprng_seed(&self->prng, RANDOM_SEED);
1260
1261     self->thread = g_thread_create(dest_listen_thread, (gpointer)self, FALSE, NULL);
1262
1263     return TRUE;
1264 }
1265
1266 static void
1267 dest_listen_class_init(
1268     XferDestListenClass * klass)
1269 {
1270     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1271     static xfer_element_mech_pair_t mech_pairs[] = {
1272         { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
1273         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1274     };
1275
1276     xec->setup = dest_listen_setup_impl;
1277     xec->start = dest_listen_start_impl;
1278     xec->mech_pairs = mech_pairs;
1279 }
1280
1281 GType
1282 xfer_dest_listen_get_type (void)
1283 {
1284     static GType type = 0;
1285
1286     if G_UNLIKELY(type == 0) {
1287         static const GTypeInfo info = {
1288             sizeof (XferDestListenClass),
1289             (GBaseInitFunc) NULL,
1290             (GBaseFinalizeFunc) NULL,
1291             (GClassInitFunc) dest_listen_class_init,
1292             (GClassFinalizeFunc) NULL,
1293             NULL /* class_data */,
1294             sizeof (XferDestListen),
1295             0 /* n_preallocs */,
1296             (GInstanceInitFunc) NULL,
1297             NULL
1298         };
1299
1300         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestListen", &info, 0);
1301     }
1302
1303     return type;
1304 }
1305
1306 /* CONNET */
1307
1308 static GType xfer_dest_connect_get_type(void);
1309 #define XFER_DEST_CONNECT_TYPE (xfer_dest_connect_get_type())
1310 #define XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect)
1311 #define XFER_DEST_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect const)
1312 #define XFER_DEST_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_connect_get_type(), XferDestConnectClass)
1313 #define IS_XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_connect_get_type ())
1314 #define XFER_DEST_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_connect_get_type(), XferDestConnectClass)
1315
1316 typedef struct XferDestConnect {
1317     XferElement __parent__;
1318
1319     int connect_socket;
1320
1321     GThread *thread;
1322     simpleprng_state_t prng;
1323 } XferDestConnect;
1324
1325 typedef struct {
1326     XferElementClass __parent__;
1327 } XferDestConnectClass;
1328
1329 static gpointer
1330 dest_connect_thread(
1331     gpointer data)
1332 {
1333     XferDestConnect *self = XFER_DEST_CONNECT(data);
1334     XferElement *elt = XFER_ELEMENT(self);
1335     DirectTCPAddr *addrs;
1336     sockaddr_union addr;
1337     char *buf;
1338     size_t bytes = 0;
1339     int sock;
1340
1341     /* set up the sockaddr -- IPv4 only */
1342     SU_INIT(&addr, AF_INET);
1343     addrs = elt->upstream->output_listen_addrs;
1344     g_assert(addrs != NULL);
1345     copy_sockaddr(&addr, addrs);
1346
1347     tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
1348     sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
1349     if (sock < 0) {
1350         error("socket(): %s", strerror(errno));
1351     }
1352     if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
1353         error("connect(): %s", strerror(errno));
1354     }
1355
1356     tu_dbg("connected to %s\n", str_sockaddr(&addr));
1357
1358     /* read from the socket until EOF or all of the data is read.  We try to
1359      * read one extra byte - if we get it, then upstream sent too much data */
1360     buf = g_malloc(TEST_XFER_SIZE+1);
1361     bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1362     g_assert(bytes == TEST_XFER_SIZE);
1363     close(sock);
1364
1365     /* we're at EOF, so verify we got the right bytes */
1366     g_assert(bytes == TEST_XFER_SIZE);
1367     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1368         g_critical("data entering XferDestConnect does not match");
1369
1370     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1371
1372     return NULL;
1373 }
1374
1375 static gboolean
1376 dest_connect_start_impl(
1377     XferElement *elt)
1378 {
1379     XferDestConnect *self = XFER_DEST_CONNECT(elt);
1380
1381     simpleprng_seed(&self->prng, RANDOM_SEED);
1382
1383     self->thread = g_thread_create(dest_connect_thread, (gpointer)self, FALSE, NULL);
1384
1385     return TRUE;
1386 }
1387
1388 static void
1389 dest_connect_class_init(
1390     XferDestConnectClass * klass)
1391 {
1392     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1393     static xfer_element_mech_pair_t mech_pairs[] = {
1394         { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
1395         { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1396     };
1397
1398     xec->start = dest_connect_start_impl;
1399     xec->mech_pairs = mech_pairs;
1400 }
1401
1402 GType
1403 xfer_dest_connect_get_type (void)
1404 {
1405     static GType type = 0;
1406
1407     if G_UNLIKELY(type == 0) {
1408         static const GTypeInfo info = {
1409             sizeof (XferDestConnectClass),
1410             (GBaseInitFunc) NULL,
1411             (GBaseFinalizeFunc) NULL,
1412             (GClassInitFunc) dest_connect_class_init,
1413             (GClassFinalizeFunc) NULL,
1414             NULL /* class_data */,
1415             sizeof (XferDestConnect),
1416             0 /* n_preallocs */,
1417             (GInstanceInitFunc) NULL,
1418             NULL
1419         };
1420
1421         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestConnect", &info, 0);
1422     }
1423
1424     return type;
1425 }
1426
1427
1428 /*
1429  * Tests
1430  */
1431
1432 static void
1433 test_xfer_generic_callback(
1434     gpointer data G_GNUC_UNUSED,
1435     XMsg *msg,
1436     Xfer *xfer)
1437 {
1438     tu_dbg("Received message %s\n", xmsg_repr(msg));
1439
1440     switch (msg->type) {
1441         case XMSG_DONE:
1442             /* are we done? */
1443             if (xfer->status == XFER_DONE) {
1444                 tu_dbg("all elements are done!\n");
1445                 g_main_loop_quit(default_main_loop());
1446             }
1447             break;
1448
1449         default:
1450             break;
1451     }
1452 }
1453
1454 /****
1455  * Run a simple transfer with some xor filters
1456  */
1457
1458 static int
1459 test_xfer_simple(void)
1460 {
1461     unsigned int i;
1462     GSource *src;
1463     XferElement *elements[] = {
1464         xfer_source_random(100*1024, RANDOM_SEED),
1465         xfer_filter_xor('d'),
1466         xfer_filter_xor('d'),
1467         xfer_dest_null(RANDOM_SEED),
1468     };
1469
1470     Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1471     src = xfer_get_source(xfer);
1472     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1473     g_source_attach(src, NULL);
1474     tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1475
1476     /* unreference the elements */
1477     for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1478         g_object_unref(elements[i]);
1479         g_assert(G_OBJECT(elements[i])->ref_count == 1);
1480         elements[i] = NULL;
1481     }
1482
1483     xfer_start(xfer, 0, 0);
1484
1485     g_main_loop_run(default_main_loop());
1486     g_assert(xfer->status == XFER_DONE);
1487
1488     xfer_unref(xfer);
1489
1490     return 1;
1491 }
1492
1493 /****
1494  * Run a transfer between two files, with or without filters
1495  */
1496
1497 static int
1498 test_xfer_files(gboolean add_filters)
1499 {
1500     unsigned int i;
1501     unsigned int elts;
1502     GSource *src;
1503     char *in_filename = __FILE__;
1504     char *out_filename = "xfer-test.tmp"; /* current directory is writeable */
1505     int rfd, wfd;
1506     Xfer *xfer;
1507     XferElement *elements[4];
1508
1509     rfd = open(in_filename, O_RDONLY, 0);
1510     if (rfd < 0)
1511         g_critical("Could not open '%s': %s", in_filename, strerror(errno));
1512
1513     wfd = open(out_filename, O_WRONLY|O_CREAT, 0777);
1514     if (wfd < 0)
1515         g_critical("Could not open '%s': %s", out_filename, strerror(errno));
1516
1517     elts = 0;
1518     elements[elts++] = xfer_source_fd(rfd);
1519     if (add_filters) {
1520         elements[elts++] = xfer_filter_xor(0xab);
1521         elements[elts++] = xfer_filter_xor(0xab);
1522     }
1523     elements[elts++] = xfer_dest_fd(wfd);
1524
1525     xfer = xfer_new(elements, elts);
1526     src = xfer_get_source(xfer);
1527     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1528     g_source_attach(src, NULL);
1529     tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1530
1531     /* unreference the elements */
1532     for (i = 0; i < elts; i++) {
1533         g_object_unref(elements[i]);
1534         g_assert(G_OBJECT(elements[i])->ref_count == 1);
1535         elements[i] = NULL;
1536     }
1537
1538     xfer_start(xfer, 0, 0);
1539
1540     g_main_loop_run(default_main_loop());
1541     g_assert(xfer->status == XFER_DONE);
1542
1543     xfer_unref(xfer);
1544
1545     unlink(out_filename); /* ignore any errors */
1546
1547     return 1;
1548 }
1549
1550 static int
1551 test_xfer_files_simple(void)
1552 {
1553     return test_xfer_files(FALSE);
1554 }
1555
1556 static int
1557 test_xfer_files_filter(void)
1558 {
1559     return test_xfer_files(TRUE);
1560 }
1561
1562 /*****
1563  * test each possible combination of source and destination mechansim
1564  */
1565
1566 static int
1567 test_glue_combo(
1568     XferElement *source,
1569     XferElement *dest)
1570 {
1571     unsigned int i;
1572     GSource *src;
1573     XferElement *elements[] = { source, dest };
1574
1575     Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1576     src = xfer_get_source(xfer);
1577     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1578     g_source_attach(src, NULL);
1579
1580     /* unreference the elements */
1581     for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1582         g_object_unref(elements[i]);
1583         g_assert(G_OBJECT(elements[i])->ref_count == 1);
1584         elements[i] = NULL;
1585     }
1586
1587     xfer_start(xfer, 0, 0);
1588
1589     g_main_loop_run(default_main_loop());
1590     g_assert(xfer->status == XFER_DONE);
1591
1592     xfer_unref(xfer);
1593
1594     return 1;
1595 }
1596
1597 #define make_test_glue(n, s, d) static int n(void) \
1598 {\
1599     return test_glue_combo((XferElement *)g_object_new(s, NULL), \
1600                            (XferElement *)g_object_new(d, NULL)); \
1601 }
1602 make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE)
1603 make_test_glue(test_glue_READFD_WRITEFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1604 make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE)
1605 make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE)
1606 make_test_glue(test_glue_READFD_LISTEN, XFER_SOURCE_READFD_TYPE, XFER_DEST_LISTEN_TYPE)
1607 make_test_glue(test_glue_READFD_CONNECT, XFER_SOURCE_READFD_TYPE, XFER_DEST_CONNECT_TYPE)
1608 make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE)
1609 make_test_glue(test_glue_WRITEFD_WRITEFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1610 make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE)
1611 make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE)
1612 make_test_glue(test_glue_WRITEFD_LISTEN, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_LISTEN_TYPE)
1613 make_test_glue(test_glue_WRITEFD_CONNECT, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_CONNECT_TYPE)
1614 make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE)
1615 make_test_glue(test_glue_PUSH_WRITEFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
1616 make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE)
1617 make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE)
1618 make_test_glue(test_glue_PUSH_LISTEN, XFER_SOURCE_PUSH_TYPE, XFER_DEST_LISTEN_TYPE)
1619 make_test_glue(test_glue_PUSH_CONNECT, XFER_SOURCE_PUSH_TYPE, XFER_DEST_CONNECT_TYPE)
1620 make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE)
1621 make_test_glue(test_glue_PULL_WRITEFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
1622 make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE)
1623 make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE)
1624 make_test_glue(test_glue_PULL_LISTEN, XFER_SOURCE_PULL_TYPE, XFER_DEST_LISTEN_TYPE)
1625 make_test_glue(test_glue_PULL_CONNECT, XFER_SOURCE_PULL_TYPE, XFER_DEST_CONNECT_TYPE)
1626 make_test_glue(test_glue_LISTEN_READFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_READFD_TYPE)
1627 make_test_glue(test_glue_LISTEN_WRITEFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_WRITEFD_TYPE)
1628 make_test_glue(test_glue_LISTEN_PUSH, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PUSH_TYPE)
1629 make_test_glue(test_glue_LISTEN_PULL, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PULL_TYPE)
1630 make_test_glue(test_glue_LISTEN_LISTEN, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_LISTEN_TYPE)
1631 make_test_glue(test_glue_LISTEN_CONNECT, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_CONNECT_TYPE)
1632 make_test_glue(test_glue_CONNECT_READFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_READFD_TYPE)
1633 make_test_glue(test_glue_CONNECT_WRITEFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_WRITEFD_TYPE)
1634 make_test_glue(test_glue_CONNECT_PUSH, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PUSH_TYPE)
1635 make_test_glue(test_glue_CONNECT_PULL, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PULL_TYPE)
1636 make_test_glue(test_glue_CONNECT_LISTEN, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_LISTEN_TYPE)
1637 make_test_glue(test_glue_CONNECT_CONNECT, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_CONNECT_TYPE)
1638
1639 /*
1640  * Main driver
1641  */
1642
1643 int
1644 main(int argc, char **argv)
1645 {
1646     static TestUtilsTest tests[] = {
1647         TU_TEST(test_xfer_simple, 90),
1648         TU_TEST(test_xfer_files_simple, 90),
1649         TU_TEST(test_xfer_files_filter, 90),
1650         TU_TEST(test_glue_READFD_READFD, 90),
1651         TU_TEST(test_glue_READFD_WRITEFD, 90),
1652         TU_TEST(test_glue_READFD_PUSH, 90),
1653         TU_TEST(test_glue_READFD_PULL, 90),
1654         TU_TEST(test_glue_READFD_LISTEN, 90),
1655         TU_TEST(test_glue_READFD_CONNECT, 90),
1656         TU_TEST(test_glue_WRITEFD_READFD, 90),
1657         TU_TEST(test_glue_WRITEFD_WRITEFD, 90),
1658         TU_TEST(test_glue_WRITEFD_PUSH, 90),
1659         TU_TEST(test_glue_WRITEFD_PULL, 90),
1660         TU_TEST(test_glue_WRITEFD_LISTEN, 90),
1661         TU_TEST(test_glue_WRITEFD_CONNECT, 90),
1662         TU_TEST(test_glue_PUSH_READFD, 90),
1663         TU_TEST(test_glue_PUSH_WRITEFD, 90),
1664         TU_TEST(test_glue_PUSH_PUSH, 90),
1665         TU_TEST(test_glue_PUSH_PULL, 90),
1666         TU_TEST(test_glue_PUSH_LISTEN, 90),
1667         TU_TEST(test_glue_PUSH_CONNECT, 90),
1668         TU_TEST(test_glue_PULL_READFD, 90),
1669         TU_TEST(test_glue_PULL_WRITEFD, 90),
1670         TU_TEST(test_glue_PULL_PUSH, 90),
1671         TU_TEST(test_glue_PULL_PULL, 90),
1672         TU_TEST(test_glue_PULL_LISTEN, 90),
1673         TU_TEST(test_glue_PULL_CONNECT, 90),
1674         TU_TEST(test_glue_LISTEN_READFD, 90),
1675         TU_TEST(test_glue_LISTEN_WRITEFD, 90),
1676         TU_TEST(test_glue_LISTEN_PUSH, 90),
1677         TU_TEST(test_glue_LISTEN_PULL, 90),
1678         TU_TEST(test_glue_LISTEN_LISTEN, 90),
1679         TU_TEST(test_glue_LISTEN_CONNECT, 90),
1680         TU_TEST(test_glue_CONNECT_READFD, 90),
1681         TU_TEST(test_glue_CONNECT_WRITEFD, 90),
1682         TU_TEST(test_glue_CONNECT_PUSH, 90),
1683         TU_TEST(test_glue_CONNECT_PULL, 90),
1684         TU_TEST(test_glue_CONNECT_LISTEN, 90),
1685         TU_TEST(test_glue_CONNECT_CONNECT, 90),
1686         TU_END()
1687     };
1688
1689     glib_init();
1690
1691     return testutils_run_tests(argc, argv, tests);
1692 }