f50cf132b52502ac794e6998c489a7a1de294d90
[debian/amanda] / xfer-src / xfer-test.c
1 /*
2  * Copyright (c) 2008 Zmanda Inc.  All Rights Reserved.
3  * 
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  * 
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  * 
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  * 
17  * Contact information: Zmanda Inc, 465 N Mathlida Ave, Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  *
20  * Author: Dustin J. Mitchell <dustin@zmanda.com>
21  */
22
23 #include "amxfer.h"
24 #include "glib-util.h"
25 #include "testutils.h"
26 #include "amanda.h"
27 #include "event.h"
28 #include "simpleprng.h"
29
30 /* Having tests repeat exactly is an advantage, so we use a hard-coded
31  * random seed. */
32 #define RANDOM_SEED 0xf00d
33
34 /*
35  * XferElement subclasses
36  *
37  * This file defines a few "private" element classes that each have only one
38  * mechanism pair.  These classes are then used to test all of the possible
39  * combinations of glue.
40  */
41
42 /* constants to determine the total amount of data to be transfered; EXTRA is
43  * to test out partial-block handling; it should be prime. */
44 #define TEST_BLOCK_SIZE 1024
45 #define TEST_BLOCK_COUNT 10
46 #define TEST_BLOCK_EXTRA 97
47 #define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA)
48
49 /* READFD */
50
51 static GType xfer_source_readfd_get_type(void);
52 #define XFER_SOURCE_READFD_TYPE (xfer_source_readfd_get_type())
53 #define XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd)
54 #define XFER_SOURCE_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd const)
55 #define XFER_SOURCE_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_readfd_get_type(), XferSourceReadfdClass)
56 #define IS_XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_readfd_get_type ())
57 #define XFER_SOURCE_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_readfd_get_type(), XferSourceReadfdClass)
58
59 typedef struct XferSourceReadfd {
60     XferElement __parent__;
61
62     int write_fd;
63     GThread *thread;
64     simpleprng_state_t prng;
65 } XferSourceReadfd;
66
67 typedef struct {
68     XferElementClass __parent__;
69 } XferSourceReadfdClass;
70
71 static gpointer
72 source_readfd_thread(
73     gpointer data)
74 {
75     XferSourceReadfd *self = (XferSourceReadfd *)data;
76     char buf[TEST_XFER_SIZE];
77     int fd = self->write_fd;
78
79     simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
80
81     if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
82         error("error in full_write(): %s", strerror(errno));
83     }
84
85     close(fd);
86
87     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
88
89     return NULL;
90 }
91
92 static void
93 source_readfd_setup_impl(
94     XferElement *elt)
95 {
96     XferSourceReadfd *self = (XferSourceReadfd *)elt;
97     int p[2];
98
99     simpleprng_seed(&self->prng, RANDOM_SEED);
100
101     if (pipe(p) < 0)
102         g_critical("Error from pipe(): %s", strerror(errno));
103
104     self->write_fd = p[1];
105     XFER_ELEMENT(self)->output_fd = p[0];
106 }
107
108 static gboolean
109 source_readfd_start_impl(
110     XferElement *elt)
111 {
112     XferSourceReadfd *self = (XferSourceReadfd *)elt;
113     self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL);
114
115     return TRUE;
116 }
117
118 static void
119 source_readfd_class_init(
120     XferSourceReadfdClass * klass)
121 {
122     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
123     static xfer_element_mech_pair_t mech_pairs[] = {
124         { XFER_MECH_NONE, XFER_MECH_READFD, 1, 1},
125         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
126     };
127
128     xec->setup = source_readfd_setup_impl;
129     xec->start = source_readfd_start_impl;
130     xec->mech_pairs = mech_pairs;
131 }
132
133 GType
134 xfer_source_readfd_get_type (void)
135 {
136     static GType type = 0;
137
138     if G_UNLIKELY(type == 0) {
139         static const GTypeInfo info = {
140             sizeof (XferSourceReadfdClass),
141             (GBaseInitFunc) NULL,
142             (GBaseFinalizeFunc) NULL,
143             (GClassInitFunc) source_readfd_class_init,
144             (GClassFinalizeFunc) NULL,
145             NULL /* class_data */,
146             sizeof (XferSourceReadfd),
147             0 /* n_preallocs */,
148             (GInstanceInitFunc) NULL,
149             NULL
150         };
151
152         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceReadfd", &info, 0);
153     }
154
155     return type;
156 }
157
158 /* WRITEFD */
159
160 static GType xfer_source_writefd_get_type(void);
161 #define XFER_SOURCE_WRITEFD_TYPE (xfer_source_writefd_get_type())
162 #define XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd)
163 #define XFER_SOURCE_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd const)
164 #define XFER_SOURCE_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_writefd_get_type(), XferSourceWritefdClass)
165 #define IS_XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_writefd_get_type ())
166 #define XFER_SOURCE_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_writefd_get_type(), XferSourceWritefdClass)
167
168 typedef struct XferSourceWritefd {
169     XferElement __parent__;
170
171     GThread *thread;
172     simpleprng_state_t prng;
173 } XferSourceWritefd;
174
175 typedef struct {
176     XferElementClass __parent__;
177 } XferSourceWritefdClass;
178
179 static gpointer
180 source_writefd_thread(
181     gpointer data)
182 {
183     XferSourceWritefd *self = (XferSourceWritefd *)data;
184     char buf[TEST_XFER_SIZE];
185     int fd = XFER_ELEMENT(self)->downstream->input_fd;
186
187     simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
188
189     if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
190         error("error in full_write(): %s", strerror(errno));
191     }
192
193     close(fd);
194     XFER_ELEMENT(self)->downstream->input_fd = -1;
195
196     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
197
198     return NULL;
199 }
200
201 static gboolean
202 source_writefd_start_impl(
203     XferElement *elt)
204 {
205     XferSourceWritefd *self = (XferSourceWritefd *)elt;
206
207     simpleprng_seed(&self->prng, RANDOM_SEED);
208
209     self->thread = g_thread_create(source_writefd_thread, (gpointer)self, FALSE, NULL);
210
211     return TRUE;
212 }
213
214 static void
215 source_writefd_class_init(
216     XferSourceWritefdClass * klass)
217 {
218     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
219     static xfer_element_mech_pair_t mech_pairs[] = {
220         { XFER_MECH_NONE, XFER_MECH_WRITEFD, 1, 1},
221         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
222     };
223
224     xec->start = source_writefd_start_impl;
225     xec->mech_pairs = mech_pairs;
226 }
227
228 GType
229 xfer_source_writefd_get_type (void)
230 {
231     static GType type = 0;
232
233     if G_UNLIKELY(type == 0) {
234         static const GTypeInfo info = {
235             sizeof (XferSourceWritefdClass),
236             (GBaseInitFunc) NULL,
237             (GBaseFinalizeFunc) NULL,
238             (GClassInitFunc) source_writefd_class_init,
239             (GClassFinalizeFunc) NULL,
240             NULL /* class_data */,
241             sizeof (XferSourceWritefd),
242             0 /* n_preallocs */,
243             (GInstanceInitFunc) NULL,
244             NULL
245         };
246
247         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceWritefd", &info, 0);
248     }
249
250     return type;
251 }
252
253 /* PUSH_BUFFER */
254
255 static GType xfer_source_push_get_type(void);
256 #define XFER_SOURCE_PUSH_TYPE (xfer_source_push_get_type())
257 #define XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush)
258 #define XFER_SOURCE_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush const)
259 #define XFER_SOURCE_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_push_get_type(), XferSourcePushClass)
260 #define IS_XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_push_get_type ())
261 #define XFER_SOURCE_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_push_get_type(), XferSourcePushClass)
262
263 typedef struct XferSourcePush {
264     XferElement __parent__;
265
266     GThread *thread;
267     simpleprng_state_t prng;
268 } XferSourcePush;
269
270 typedef struct {
271     XferElementClass __parent__;
272 } XferSourcePushClass;
273
274 static gpointer
275 source_push_thread(
276     gpointer data)
277 {
278     XferSourcePush *self = (XferSourcePush *)data;
279     char *buf;
280     int i;
281
282     for (i = 0; i < TEST_BLOCK_COUNT; i++) {
283         buf = g_malloc(TEST_BLOCK_SIZE);
284         simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
285         xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_SIZE);
286         buf = NULL;
287     }
288
289     /* send a smaller block */
290     buf = g_malloc(TEST_BLOCK_EXTRA);
291     simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
292     xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_EXTRA);
293     buf = NULL;
294
295     /* send EOF */
296     xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, NULL, 0);
297
298     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
299
300     return NULL;
301 }
302
303 static gboolean
304 source_push_start_impl(
305     XferElement *elt)
306 {
307     XferSourcePush *self = (XferSourcePush *)elt;
308
309     simpleprng_seed(&self->prng, RANDOM_SEED);
310
311     self->thread = g_thread_create(source_push_thread, (gpointer)self, FALSE, NULL);
312
313     return TRUE;
314 }
315
316 static void
317 source_push_class_init(
318     XferSourcePushClass * klass)
319 {
320     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
321     static xfer_element_mech_pair_t mech_pairs[] = {
322         { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, 1, 1},
323         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
324     };
325
326     xec->start = source_push_start_impl;
327     xec->mech_pairs = mech_pairs;
328 }
329
330 GType
331 xfer_source_push_get_type (void)
332 {
333     static GType type = 0;
334
335     if G_UNLIKELY(type == 0) {
336         static const GTypeInfo info = {
337             sizeof (XferSourcePushClass),
338             (GBaseInitFunc) NULL,
339             (GBaseFinalizeFunc) NULL,
340             (GClassInitFunc) source_push_class_init,
341             (GClassFinalizeFunc) NULL,
342             NULL /* class_data */,
343             sizeof (XferSourcePush),
344             0 /* n_preallocs */,
345             (GInstanceInitFunc) NULL,
346             NULL
347         };
348
349         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePush", &info, 0);
350     }
351
352     return type;
353 }
354
355 /* PULL_BUFFER */
356
357 static GType xfer_source_pull_get_type(void);
358 #define XFER_SOURCE_PULL_TYPE (xfer_source_pull_get_type())
359 #define XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull)
360 #define XFER_SOURCE_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull const)
361 #define XFER_SOURCE_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_pull_get_type(), XferSourcePullClass)
362 #define IS_XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_pull_get_type ())
363 #define XFER_SOURCE_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_pull_get_type(), XferSourcePullClass)
364
365 typedef struct XferSourcePull {
366     XferElement __parent__;
367
368     gint nbuffers;
369     GThread *thread;
370     simpleprng_state_t prng;
371 } XferSourcePull;
372
373 typedef struct {
374     XferElementClass __parent__;
375 } XferSourcePullClass;
376
377 static gpointer
378 source_pull_pull_buffer_impl(
379     XferElement *elt,
380     size_t *size)
381 {
382     XferSourcePull *self = (XferSourcePull *)elt;
383     char *buf;
384     size_t bufsiz;
385
386     if (self->nbuffers > TEST_BLOCK_COUNT) {
387         *size = 0;
388         return NULL;
389     }
390     bufsiz = (self->nbuffers != TEST_BLOCK_COUNT)? TEST_BLOCK_SIZE : TEST_BLOCK_EXTRA;
391
392     self->nbuffers++;
393
394     buf = g_malloc(bufsiz);
395     simpleprng_fill_buffer(&self->prng, buf, bufsiz);
396     *size = bufsiz;
397     return buf;
398 }
399
400 static void
401 source_pull_setup_impl(
402     XferElement *elt)
403 {
404     XferSourcePull *self = (XferSourcePull *)elt;
405
406     simpleprng_seed(&self->prng, RANDOM_SEED);
407 }
408
409 static void
410 source_pull_class_init(
411     XferSourcePullClass * klass)
412 {
413     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
414     static xfer_element_mech_pair_t mech_pairs[] = {
415         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
416         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
417     };
418
419     xec->pull_buffer = source_pull_pull_buffer_impl;
420     xec->setup = source_pull_setup_impl;
421     xec->mech_pairs = mech_pairs;
422 }
423
424 GType
425 xfer_source_pull_get_type (void)
426 {
427     static GType type = 0;
428
429     if G_UNLIKELY(type == 0) {
430         static const GTypeInfo info = {
431             sizeof (XferSourcePullClass),
432             (GBaseInitFunc) NULL,
433             (GBaseFinalizeFunc) NULL,
434             (GClassInitFunc) source_pull_class_init,
435             (GClassFinalizeFunc) NULL,
436             NULL /* class_data */,
437             sizeof (XferSourcePull),
438             0 /* n_preallocs */,
439             (GInstanceInitFunc) NULL,
440             NULL
441         };
442
443         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePull", &info, 0);
444     }
445
446     return type;
447 }
448
449 /* READFD */
450
451 static GType xfer_dest_readfd_get_type(void);
452 #define XFER_DEST_READFD_TYPE (xfer_dest_readfd_get_type())
453 #define XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd)
454 #define XFER_DEST_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd const)
455 #define XFER_DEST_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_readfd_get_type(), XferDestReadfdClass)
456 #define IS_XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_readfd_get_type ())
457 #define XFER_DEST_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_readfd_get_type(), XferDestReadfdClass)
458
459 typedef struct XferDestReadfd {
460     XferElement __parent__;
461
462     GThread *thread;
463     simpleprng_state_t prng;
464 } XferDestReadfd;
465
466 typedef struct {
467     XferElementClass __parent__;
468 } XferDestReadfdClass;
469
470 static gpointer
471 dest_readfd_thread(
472     gpointer data)
473 {
474     XferDestReadfd *self = (XferDestReadfd *)data;
475     char buf[TEST_XFER_SIZE];
476     size_t remaining;
477     int fd = XFER_ELEMENT(self)->upstream->output_fd;
478
479     remaining = sizeof(buf);
480     while (remaining) {
481         ssize_t nread;
482         if ((nread = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
483             error("error in read(): %s", strerror(errno));
484         }
485         remaining -= nread;
486     }
487
488     /* we should be at EOF here */
489     if (read(fd, buf, 10) != 0)
490         g_critical("too much data entering XferDestReadfd");
491
492     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
493         g_critical("data entering XferDestReadfd does not match");
494
495     close(fd);
496     XFER_ELEMENT(self)->upstream->output_fd = -1;
497
498     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
499
500     return NULL;
501 }
502
503 static gboolean
504 dest_readfd_start_impl(
505     XferElement *elt)
506 {
507     XferDestReadfd *self = (XferDestReadfd *)elt;
508
509     simpleprng_seed(&self->prng, RANDOM_SEED);
510
511     self->thread = g_thread_create(dest_readfd_thread, (gpointer)self, FALSE, NULL);
512
513     return TRUE;
514 }
515
516 static void
517 dest_readfd_class_init(
518     XferDestReadfdClass * klass)
519 {
520     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
521     static xfer_element_mech_pair_t mech_pairs[] = {
522         { XFER_MECH_READFD, XFER_MECH_NONE, 1, 1},
523         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
524     };
525
526     xec->start = dest_readfd_start_impl;
527     xec->mech_pairs = mech_pairs;
528 }
529
530 GType
531 xfer_dest_readfd_get_type (void)
532 {
533     static GType type = 0;
534
535     if G_UNLIKELY(type == 0) {
536         static const GTypeInfo info = {
537             sizeof (XferDestReadfdClass),
538             (GBaseInitFunc) NULL,
539             (GBaseFinalizeFunc) NULL,
540             (GClassInitFunc) dest_readfd_class_init,
541             (GClassFinalizeFunc) NULL,
542             NULL /* class_data */,
543             sizeof (XferDestReadfd),
544             0 /* n_preallocs */,
545             (GInstanceInitFunc) NULL,
546             NULL
547         };
548
549         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestReadfd", &info, 0);
550     }
551
552     return type;
553 }
554
555 /* WRITEFD */
556
557 static GType xfer_dest_writefd_get_type(void);
558 #define XFER_DEST_WRITEFD_TYPE (xfer_dest_writefd_get_type())
559 #define XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd)
560 #define XFER_DEST_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd const)
561 #define XFER_DEST_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_writefd_get_type(), XferDestWritefdClass)
562 #define IS_XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_writefd_get_type ())
563 #define XFER_DEST_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_writefd_get_type(), XferDestWritefdClass)
564
565 typedef struct XferDestWritefd {
566     XferElement __parent__;
567
568     int read_fd;
569     GThread *thread;
570     simpleprng_state_t prng;
571 } XferDestWritefd;
572
573 typedef struct {
574     XferElementClass __parent__;
575 } XferDestWritefdClass;
576
577 static gpointer
578 dest_writefd_thread(
579     gpointer data)
580 {
581     XferDestWritefd *self = (XferDestWritefd *)data;
582     char buf[TEST_XFER_SIZE];
583     size_t remaining;
584     int fd = self->read_fd;
585
586     remaining = sizeof(buf);
587     while (remaining) {
588         ssize_t nwrite;
589         if ((nwrite = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
590             error("error in read(): %s", strerror(errno));
591         }
592         remaining -= nwrite;
593     }
594
595     /* we should be at EOF here */
596     if (read(fd, buf, 10) != 0)
597         g_critical("too much data entering XferDestWritefd");
598
599     if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
600         g_critical("data entering XferDestWritefd does not match");
601
602     close(fd);
603     XFER_ELEMENT(self)->upstream->output_fd = -1;
604
605     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
606
607     return NULL;
608 }
609
610 static void
611 dest_writefd_setup_impl(
612     XferElement *elt)
613 {
614     XferDestWritefd *self = (XferDestWritefd *)elt;
615     int p[2];
616
617     simpleprng_seed(&self->prng, RANDOM_SEED);
618
619     if (pipe(p) < 0)
620         g_critical("Error from pipe(): %s", strerror(errno));
621
622     self->read_fd = p[0];
623     XFER_ELEMENT(self)->input_fd = p[1];
624 }
625
626 static gboolean
627 dest_writefd_start_impl(
628     XferElement *elt)
629 {
630     XferDestWritefd *self = (XferDestWritefd *)elt;
631     self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL);
632
633     return TRUE;
634 }
635
636 static void
637 dest_writefd_class_init(
638     XferDestWritefdClass * klass)
639 {
640     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
641     static xfer_element_mech_pair_t mech_pairs[] = {
642         { XFER_MECH_WRITEFD, XFER_MECH_NONE, 1, 1},
643         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
644     };
645
646     xec->setup = dest_writefd_setup_impl;
647     xec->start = dest_writefd_start_impl;
648     xec->mech_pairs = mech_pairs;
649 }
650
651 GType
652 xfer_dest_writefd_get_type (void)
653 {
654     static GType type = 0;
655
656     if G_UNLIKELY(type == 0) {
657         static const GTypeInfo info = {
658             sizeof (XferDestWritefdClass),
659             (GBaseInitFunc) NULL,
660             (GBaseFinalizeFunc) NULL,
661             (GClassInitFunc) dest_writefd_class_init,
662             (GClassFinalizeFunc) NULL,
663             NULL /* class_data */,
664             sizeof (XferDestWritefd),
665             0 /* n_preallocs */,
666             (GInstanceInitFunc) NULL,
667             NULL
668         };
669
670         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestWritefd", &info, 0);
671     }
672
673     return type;
674 }
675
676 /* PUSH_BUFFER */
677
678 static GType xfer_dest_push_get_type(void);
679 #define XFER_DEST_PUSH_TYPE (xfer_dest_push_get_type())
680 #define XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush)
681 #define XFER_DEST_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush const)
682 #define XFER_DEST_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_push_get_type(), XferDestPushClass)
683 #define IS_XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_push_get_type ())
684 #define XFER_DEST_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_push_get_type(), XferDestPushClass)
685
686 typedef struct XferDestPush {
687     XferElement __parent__;
688
689     char buf[TEST_XFER_SIZE];
690     size_t bufpos;
691
692     GThread *thread;
693     simpleprng_state_t prng;
694 } XferDestPush;
695
696 typedef struct {
697     XferElementClass __parent__;
698 } XferDestPushClass;
699
700 static void
701 dest_push_push_buffer_impl(
702     XferElement *elt,
703     gpointer buf,
704     size_t size)
705 {
706     XferDestPush *self = (XferDestPush *)elt;
707
708     if (buf == NULL) {
709         /* if we're at EOF, verify we got the right bytes */
710         g_assert(self->bufpos == TEST_XFER_SIZE);
711         if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE))
712             g_critical("data entering XferDestPush does not match");
713         return;
714     }
715
716     g_assert(self->bufpos + size <= TEST_XFER_SIZE);
717     memcpy(self->buf + self->bufpos, buf, size);
718     self->bufpos += size;
719 }
720
721 static void
722 dest_push_setup_impl(
723     XferElement *elt)
724 {
725     XferDestPush *self = (XferDestPush *)elt;
726
727     simpleprng_seed(&self->prng, RANDOM_SEED);
728 }
729
730 static void
731 dest_push_class_init(
732     XferDestPushClass * klass)
733 {
734     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
735     static xfer_element_mech_pair_t mech_pairs[] = {
736         { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 0},
737         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
738     };
739
740     xec->push_buffer = dest_push_push_buffer_impl;
741     xec->setup = dest_push_setup_impl;
742     xec->mech_pairs = mech_pairs;
743 }
744
745 GType
746 xfer_dest_push_get_type (void)
747 {
748     static GType type = 0;
749
750     if G_UNLIKELY(type == 0) {
751         static const GTypeInfo info = {
752             sizeof (XferDestPushClass),
753             (GBaseInitFunc) NULL,
754             (GBaseFinalizeFunc) NULL,
755             (GClassInitFunc) dest_push_class_init,
756             (GClassFinalizeFunc) NULL,
757             NULL /* class_data */,
758             sizeof (XferDestPush),
759             0 /* n_preallocs */,
760             (GInstanceInitFunc) NULL,
761             NULL
762         };
763
764         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPush", &info, 0);
765     }
766
767     return type;
768 }
769
770 /* PULL_BUFFER */
771
772 static GType xfer_dest_pull_get_type(void);
773 #define XFER_DEST_PULL_TYPE (xfer_dest_pull_get_type())
774 #define XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull)
775 #define XFER_DEST_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull const)
776 #define XFER_DEST_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_pull_get_type(), XferDestPullClass)
777 #define IS_XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_pull_get_type ())
778 #define XFER_DEST_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_pull_get_type(), XferDestPullClass)
779
780 typedef struct XferDestPull {
781     XferElement __parent__;
782
783     GThread *thread;
784     simpleprng_state_t prng;
785 } XferDestPull;
786
787 typedef struct {
788     XferElementClass __parent__;
789 } XferDestPullClass;
790
791 static gpointer
792 dest_pull_thread(
793     gpointer data)
794 {
795     XferDestPull *self = (XferDestPull *)data;
796     char fullbuf[TEST_XFER_SIZE];
797     char *buf;
798     size_t bufpos = 0;
799     size_t size;
800
801     while ((buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size))) {
802         g_assert(bufpos + size <= TEST_XFER_SIZE);
803         memcpy(fullbuf + bufpos, buf, size);
804         bufpos += size;
805     }
806
807     /* we're at EOF, so verify we got the right bytes */
808     g_assert(bufpos == TEST_XFER_SIZE);
809     if (!simpleprng_verify_buffer(&self->prng, fullbuf, TEST_XFER_SIZE))
810         g_critical("data entering XferDestPull does not match");
811
812     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
813
814     return NULL;
815 }
816
817 static gboolean
818 dest_pull_start_impl(
819     XferElement *elt)
820 {
821     XferDestPull *self = (XferDestPull *)elt;
822
823     simpleprng_seed(&self->prng, RANDOM_SEED);
824
825     self->thread = g_thread_create(dest_pull_thread, (gpointer)self, FALSE, NULL);
826
827     return TRUE;
828 }
829
830 static void
831 dest_pull_class_init(
832     XferDestPullClass * klass)
833 {
834     XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
835     static xfer_element_mech_pair_t mech_pairs[] = {
836         { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 1, 1},
837         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
838     };
839
840     xec->start = dest_pull_start_impl;
841     xec->mech_pairs = mech_pairs;
842 }
843
844 GType
845 xfer_dest_pull_get_type (void)
846 {
847     static GType type = 0;
848
849     if G_UNLIKELY(type == 0) {
850         static const GTypeInfo info = {
851             sizeof (XferDestPullClass),
852             (GBaseInitFunc) NULL,
853             (GBaseFinalizeFunc) NULL,
854             (GClassInitFunc) dest_pull_class_init,
855             (GClassFinalizeFunc) NULL,
856             NULL /* class_data */,
857             sizeof (XferDestPull),
858             0 /* n_preallocs */,
859             (GInstanceInitFunc) NULL,
860             NULL
861         };
862
863         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPull", &info, 0);
864     }
865
866     return type;
867 }
868
869
870 /*
871  * Tests
872  */
873
874 static void
875 test_xfer_generic_callback(
876     gpointer data G_GNUC_UNUSED,
877     XMsg *msg,
878     Xfer *xfer)
879 {
880     tu_dbg("Received message %s\n", xmsg_repr(msg));
881
882     switch (msg->type) {
883         case XMSG_DONE:
884             /* are we done? */
885             if (xfer->status == XFER_DONE) {
886                 tu_dbg("all elements are done!\n");
887                 g_main_loop_quit(default_main_loop());
888             }
889             break;
890
891         default:
892             break;
893     }
894 }
895
896 /****
897  * Run a simple transfer with some xor filters
898  */
899
900 static int
901 test_xfer_simple(void)
902 {
903     unsigned int i;
904     GSource *src;
905     XferElement *elements[] = {
906         xfer_source_random(100*1024, RANDOM_SEED),
907         xfer_filter_xor('d'),
908         xfer_filter_xor('d'),
909         xfer_dest_null(RANDOM_SEED),
910     };
911
912     Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
913     src = xfer_get_source(xfer);
914     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
915     g_source_attach(src, NULL);
916     tu_dbg("Transfer: %s\n", xfer_repr(xfer));
917
918     /* unreference the elements */
919     for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
920         g_object_unref(elements[i]);
921         g_assert(G_OBJECT(elements[i])->ref_count == 1);
922         elements[i] = NULL;
923     }
924
925     xfer_start(xfer);
926
927     g_main_loop_run(default_main_loop());
928     g_assert(xfer->status == XFER_DONE);
929
930     xfer_unref(xfer);
931
932     return 1;
933 }
934
935 /****
936  * Run a transfer between two files, with or without filters
937  */
938
939 static int
940 test_xfer_files(gboolean add_filters)
941 {
942     unsigned int i;
943     unsigned int elts;
944     GSource *src;
945     char *in_filename = __FILE__;
946     char *out_filename = "xfer-test.tmp"; /* current directory is writeable */
947     int rfd, wfd;
948     Xfer *xfer;
949     XferElement *elements[4];
950
951     rfd = open(in_filename, O_RDONLY, 0);
952     if (rfd < 0)
953         g_critical("Could not open '%s': %s", in_filename, strerror(errno));
954
955     wfd = open(out_filename, O_WRONLY|O_CREAT, 0777);
956     if (wfd < 0)
957         g_critical("Could not open '%s': %s", out_filename, strerror(errno));
958
959     elts = 0;
960     elements[elts++] = xfer_source_fd(rfd);
961     if (add_filters) {
962         elements[elts++] = xfer_filter_xor(0xab);
963         elements[elts++] = xfer_filter_xor(0xab);
964     }
965     elements[elts++] = xfer_dest_fd(wfd);
966
967     xfer = xfer_new(elements, elts);
968     src = xfer_get_source(xfer);
969     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
970     g_source_attach(src, NULL);
971     tu_dbg("Transfer: %s\n", xfer_repr(xfer));
972
973     /* unreference the elements */
974     for (i = 0; i < elts; i++) {
975         g_object_unref(elements[i]);
976         g_assert(G_OBJECT(elements[i])->ref_count == 1);
977         elements[i] = NULL;
978     }
979
980     xfer_start(xfer);
981
982     g_main_loop_run(default_main_loop());
983     g_assert(xfer->status == XFER_DONE);
984
985     xfer_unref(xfer);
986
987     unlink(out_filename); /* ignore any errors */
988
989     return 1;
990 }
991
992 static int
993 test_xfer_files_simple(void)
994 {
995     return test_xfer_files(FALSE);
996 }
997
998 static int
999 test_xfer_files_filter(void)
1000 {
1001     return test_xfer_files(TRUE);
1002 }
1003
1004 /*****
1005  * test each possible combination of source and destination mechansim
1006  */
1007
1008 static int
1009 test_glue_combo(
1010     XferElement *source,
1011     XferElement *dest)
1012 {
1013     unsigned int i;
1014     GSource *src;
1015     XferElement *elements[] = { source, dest };
1016
1017     Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1018     src = xfer_get_source(xfer);
1019     g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1020     g_source_attach(src, NULL);
1021
1022     /* unreference the elements */
1023     for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1024         g_object_unref(elements[i]);
1025         g_assert(G_OBJECT(elements[i])->ref_count == 1);
1026         elements[i] = NULL;
1027     }
1028
1029     xfer_start(xfer);
1030
1031     g_main_loop_run(default_main_loop());
1032     g_assert(xfer->status == XFER_DONE);
1033
1034     xfer_unref(xfer);
1035
1036     return 1;
1037 }
1038
1039 #define make_test_glue(n, s, d) static int n(void) \
1040 {\
1041     return test_glue_combo((XferElement *)g_object_new(s, NULL), \
1042                            (XferElement *)g_object_new(d, NULL)); \
1043 }
1044 make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE)
1045 make_test_glue(test_glue_READFD_WRITE, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1046 make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE)
1047 make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE)
1048 make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE)
1049 make_test_glue(test_glue_WRITEFD_WRITE, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1050 make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE)
1051 make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE)
1052 make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE)
1053 make_test_glue(test_glue_PUSH_WRITE, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
1054 make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE)
1055 make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE)
1056 make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE)
1057 make_test_glue(test_glue_PULL_WRITE, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
1058 make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE)
1059 make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE)
1060
1061 /*
1062  * Main driver
1063  */
1064
1065 int
1066 main(int argc, char **argv)
1067 {
1068     static TestUtilsTest tests[] = {
1069         TU_TEST(test_xfer_simple, 10),
1070         TU_TEST(test_xfer_files_simple, 10),
1071         TU_TEST(test_xfer_files_filter, 10),
1072         TU_TEST(test_glue_READFD_READFD, 5),
1073         TU_TEST(test_glue_READFD_WRITE, 5),
1074         TU_TEST(test_glue_READFD_PUSH, 5),
1075         TU_TEST(test_glue_READFD_PULL, 5),
1076         TU_TEST(test_glue_WRITEFD_READFD, 5),
1077         TU_TEST(test_glue_WRITEFD_WRITE, 5),
1078         TU_TEST(test_glue_WRITEFD_PUSH, 5),
1079         TU_TEST(test_glue_WRITEFD_PULL, 5),
1080         TU_TEST(test_glue_PUSH_READFD, 5),
1081         TU_TEST(test_glue_PUSH_WRITE, 5),
1082         TU_TEST(test_glue_PUSH_PUSH, 5),
1083         TU_TEST(test_glue_PUSH_PULL, 5),
1084         TU_TEST(test_glue_PULL_READFD, 5),
1085         TU_TEST(test_glue_PULL_WRITE, 5),
1086         TU_TEST(test_glue_PULL_PUSH, 5),
1087         TU_TEST(test_glue_PULL_PULL, 5),
1088         TU_END()
1089     };
1090
1091     glib_init();
1092
1093     return testutils_run_tests(argc, argv, tests);
1094 }