2 * Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; either version 2
7 * of the License, or (at your option) any later version.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 * Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
19 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21 * Author: Dustin J. Mitchell <dustin@zmanda.com>
26 #include "glib-util.h"
27 #include "testutils.h"
29 #include "simpleprng.h"
30 #include "sockaddr-util.h"
32 /* Having tests repeat exactly is an advantage, so we use a hard-coded
34 #define RANDOM_SEED 0xf00d
37 * XferElement subclasses
39 * This file defines a few "private" element classes that each have only one
40 * mechanism pair. These classes are then used to test all of the possible
41 * combinations of glue.
44 /* constants to determine the total amount of data to be transfered; EXTRA is
45 * to test out partial-block handling; it should be prime. */
46 #define TEST_BLOCK_SIZE 32768
47 #define TEST_BLOCK_COUNT 10
48 #define TEST_BLOCK_EXTRA 97
49 #define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA)
53 static GType xfer_source_readfd_get_type(void);
54 #define XFER_SOURCE_READFD_TYPE (xfer_source_readfd_get_type())
55 #define XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd)
56 #define XFER_SOURCE_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd const)
57 #define XFER_SOURCE_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_readfd_get_type(), XferSourceReadfdClass)
58 #define IS_XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_readfd_get_type ())
59 #define XFER_SOURCE_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_readfd_get_type(), XferSourceReadfdClass)
61 typedef struct XferSourceReadfd {
62 XferElement __parent__;
66 simpleprng_state_t prng;
70 XferElementClass __parent__;
71 } XferSourceReadfdClass;
77 XferSourceReadfd *self = XFER_SOURCE_READFD(data);
78 char buf[TEST_XFER_SIZE];
79 int fd = self->write_fd;
81 simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
83 if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
84 error("error in full_write(): %s", strerror(errno));
89 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
95 source_readfd_setup_impl(
98 XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
101 simpleprng_seed(&self->prng, RANDOM_SEED);
104 g_critical("Error from pipe(): %s", strerror(errno));
106 self->write_fd = p[1];
107 g_assert(xfer_element_swap_output_fd(elt, p[0]) == -1);
113 source_readfd_start_impl(
116 XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
117 self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL);
123 source_readfd_class_init(
124 XferSourceReadfdClass * klass)
126 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
127 static xfer_element_mech_pair_t mech_pairs[] = {
128 { XFER_MECH_NONE, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) },
129 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
132 xec->setup = source_readfd_setup_impl;
133 xec->start = source_readfd_start_impl;
134 xec->mech_pairs = mech_pairs;
138 xfer_source_readfd_get_type (void)
140 static GType type = 0;
142 if G_UNLIKELY(type == 0) {
143 static const GTypeInfo info = {
144 sizeof (XferSourceReadfdClass),
145 (GBaseInitFunc) NULL,
146 (GBaseFinalizeFunc) NULL,
147 (GClassInitFunc) source_readfd_class_init,
148 (GClassFinalizeFunc) NULL,
149 NULL /* class_data */,
150 sizeof (XferSourceReadfd),
152 (GInstanceInitFunc) NULL,
156 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceReadfd", &info, 0);
164 static GType xfer_source_writefd_get_type(void);
165 #define XFER_SOURCE_WRITEFD_TYPE (xfer_source_writefd_get_type())
166 #define XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd)
167 #define XFER_SOURCE_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd const)
168 #define XFER_SOURCE_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_writefd_get_type(), XferSourceWritefdClass)
169 #define IS_XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_writefd_get_type ())
170 #define XFER_SOURCE_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_writefd_get_type(), XferSourceWritefdClass)
172 typedef struct XferSourceWritefd {
173 XferElement __parent__;
176 simpleprng_state_t prng;
180 XferElementClass __parent__;
181 } XferSourceWritefdClass;
184 source_writefd_thread(
187 XferSourceWritefd *self = XFER_SOURCE_WRITEFD(data);
188 XferElement *elt = XFER_ELEMENT(data);
189 char buf[TEST_XFER_SIZE];
190 int fd = xfer_element_swap_input_fd(elt->downstream, -1);
192 /* this shouldn't happen, although non-test elements handle it gracefully */
195 simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
197 if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
198 error("error in full_write(): %s", strerror(errno));
203 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
209 source_writefd_start_impl(
212 XferSourceWritefd *self = XFER_SOURCE_WRITEFD(elt);
214 simpleprng_seed(&self->prng, RANDOM_SEED);
216 self->thread = g_thread_create(source_writefd_thread, (gpointer)self, FALSE, NULL);
222 source_writefd_class_init(
223 XferSourceWritefdClass * klass)
225 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
226 static xfer_element_mech_pair_t mech_pairs[] = {
227 { XFER_MECH_NONE, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) },
228 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
231 xec->start = source_writefd_start_impl;
232 xec->mech_pairs = mech_pairs;
236 xfer_source_writefd_get_type (void)
238 static GType type = 0;
240 if G_UNLIKELY(type == 0) {
241 static const GTypeInfo info = {
242 sizeof (XferSourceWritefdClass),
243 (GBaseInitFunc) NULL,
244 (GBaseFinalizeFunc) NULL,
245 (GClassInitFunc) source_writefd_class_init,
246 (GClassFinalizeFunc) NULL,
247 NULL /* class_data */,
248 sizeof (XferSourceWritefd),
250 (GInstanceInitFunc) NULL,
254 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceWritefd", &info, 0);
262 static GType xfer_source_push_get_type(void);
263 #define XFER_SOURCE_PUSH_TYPE (xfer_source_push_get_type())
264 #define XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush)
265 #define XFER_SOURCE_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush const)
266 #define XFER_SOURCE_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_push_get_type(), XferSourcePushClass)
267 #define IS_XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_push_get_type ())
268 #define XFER_SOURCE_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_push_get_type(), XferSourcePushClass)
270 typedef struct XferSourcePush {
271 XferElement __parent__;
274 simpleprng_state_t prng;
278 XferElementClass __parent__;
279 } XferSourcePushClass;
285 XferSourcePush *self = XFER_SOURCE_PUSH(data);
289 for (i = 0; i < TEST_BLOCK_COUNT; i++) {
290 buf = g_malloc(TEST_BLOCK_SIZE);
291 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
292 xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_SIZE);
296 /* send a smaller block */
297 buf = g_malloc(TEST_BLOCK_EXTRA);
298 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
299 xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_EXTRA);
303 xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, NULL, 0);
305 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
311 source_push_start_impl(
314 XferSourcePush *self = XFER_SOURCE_PUSH(elt);
316 simpleprng_seed(&self->prng, RANDOM_SEED);
318 self->thread = g_thread_create(source_push_thread, (gpointer)self, FALSE, NULL);
324 source_push_class_init(
325 XferSourcePushClass * klass)
327 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
328 static xfer_element_mech_pair_t mech_pairs[] = {
329 { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) },
330 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
333 xec->start = source_push_start_impl;
334 xec->mech_pairs = mech_pairs;
338 xfer_source_push_get_type (void)
340 static GType type = 0;
342 if G_UNLIKELY(type == 0) {
343 static const GTypeInfo info = {
344 sizeof (XferSourcePushClass),
345 (GBaseInitFunc) NULL,
346 (GBaseFinalizeFunc) NULL,
347 (GClassInitFunc) source_push_class_init,
348 (GClassFinalizeFunc) NULL,
349 NULL /* class_data */,
350 sizeof (XferSourcePush),
352 (GInstanceInitFunc) NULL,
356 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePush", &info, 0);
364 static GType xfer_source_pull_get_type(void);
365 #define XFER_SOURCE_PULL_TYPE (xfer_source_pull_get_type())
366 #define XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull)
367 #define XFER_SOURCE_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull const)
368 #define XFER_SOURCE_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_pull_get_type(), XferSourcePullClass)
369 #define IS_XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_pull_get_type ())
370 #define XFER_SOURCE_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_pull_get_type(), XferSourcePullClass)
372 typedef struct XferSourcePull {
373 XferElement __parent__;
377 simpleprng_state_t prng;
381 XferElementClass __parent__;
382 } XferSourcePullClass;
385 source_pull_pull_buffer_impl(
389 XferSourcePull *self = XFER_SOURCE_PULL(elt);
393 if (self->nbuffers > TEST_BLOCK_COUNT) {
397 bufsiz = (self->nbuffers != TEST_BLOCK_COUNT)? TEST_BLOCK_SIZE : TEST_BLOCK_EXTRA;
401 buf = g_malloc(bufsiz);
402 simpleprng_fill_buffer(&self->prng, buf, bufsiz);
408 source_pull_setup_impl(
411 XferSourcePull *self = XFER_SOURCE_PULL(elt);
413 simpleprng_seed(&self->prng, RANDOM_SEED);
419 source_pull_class_init(
420 XferSourcePullClass * klass)
422 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
423 static xfer_element_mech_pair_t mech_pairs[] = {
424 { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) },
425 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
428 xec->pull_buffer = source_pull_pull_buffer_impl;
429 xec->setup = source_pull_setup_impl;
430 xec->mech_pairs = mech_pairs;
434 xfer_source_pull_get_type (void)
436 static GType type = 0;
438 if G_UNLIKELY(type == 0) {
439 static const GTypeInfo info = {
440 sizeof (XferSourcePullClass),
441 (GBaseInitFunc) NULL,
442 (GBaseFinalizeFunc) NULL,
443 (GClassInitFunc) source_pull_class_init,
444 (GClassFinalizeFunc) NULL,
445 NULL /* class_data */,
446 sizeof (XferSourcePull),
448 (GInstanceInitFunc) NULL,
452 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePull", &info, 0);
460 static GType xfer_source_listen_get_type(void);
461 #define XFER_SOURCE_LISTEN_TYPE (xfer_source_listen_get_type())
462 #define XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen)
463 #define XFER_SOURCE_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen const)
464 #define XFER_SOURCE_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_listen_get_type(), XferSourceListenClass)
465 #define IS_XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_listen_get_type ())
466 #define XFER_SOURCE_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_listen_get_type(), XferSourceListenClass)
468 typedef struct XferSourceListen {
469 XferElement __parent__;
472 simpleprng_state_t prng;
476 XferElementClass __parent__;
477 } XferSourceListenClass;
480 source_listen_thread(
483 XferSourceListen *self = XFER_SOURCE_LISTEN(data);
484 XferElement *elt = XFER_ELEMENT(self);
485 DirectTCPAddr *addrs;
490 /* set up the sockaddr -- IPv4 only */
491 addrs = elt->downstream->input_listen_addrs;
492 g_assert(addrs != NULL);
494 tu_dbg("making data connection to %s\n", str_sockaddr(addrs));
495 sock = socket(SU_GET_FAMILY(addrs), SOCK_STREAM, 0);
497 error("socket(): %s", strerror(errno));
499 if (connect(sock, (struct sockaddr *)addrs, SS_LEN(addrs)) < 0) {
500 error("connect(): %s", strerror(errno));
503 tu_dbg("connected to %s\n", str_sockaddr(addrs));
505 buf = g_malloc(TEST_BLOCK_SIZE);
506 for (i = 0; i < TEST_BLOCK_COUNT; i++) {
507 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
508 if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
509 error("error in full_write(): %s", strerror(errno));
513 /* send a smaller block */
514 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
515 if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
516 error("error in full_write(): %s", strerror(errno));
520 /* send EOF by closing the socket */
523 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
529 source_listen_start_impl(
532 XferSourceListen *self = XFER_SOURCE_LISTEN(elt);
534 simpleprng_seed(&self->prng, RANDOM_SEED);
536 self->thread = g_thread_create(source_listen_thread, (gpointer)self, FALSE, NULL);
542 source_listen_class_init(
543 XferSourceListenClass * klass)
545 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
546 static xfer_element_mech_pair_t mech_pairs[] = {
547 { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) },
548 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
551 xec->start = source_listen_start_impl;
552 xec->mech_pairs = mech_pairs;
556 xfer_source_listen_get_type (void)
558 static GType type = 0;
560 if G_UNLIKELY(type == 0) {
561 static const GTypeInfo info = {
562 sizeof (XferSourceListenClass),
563 (GBaseInitFunc) NULL,
564 (GBaseFinalizeFunc) NULL,
565 (GClassInitFunc) source_listen_class_init,
566 (GClassFinalizeFunc) NULL,
567 NULL /* class_data */,
568 sizeof (XferSourceListen),
570 (GInstanceInitFunc) NULL,
574 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceListen", &info, 0);
582 static GType xfer_source_connect_get_type(void);
583 #define XFER_SOURCE_CONNECT_TYPE (xfer_source_connect_get_type())
584 #define XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect)
585 #define XFER_SOURCE_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect const)
586 #define XFER_SOURCE_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_connect_get_type(), XferSourceConnectClass)
587 #define IS_XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_connect_get_type ())
588 #define XFER_SOURCE_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_connect_get_type(), XferSourceConnectClass)
590 typedef struct XferSourceConnect {
591 XferElement __parent__;
596 simpleprng_state_t prng;
600 XferElementClass __parent__;
601 } XferSourceConnectClass;
604 source_connect_thread(
607 XferSourceConnect *self = XFER_SOURCE_CONNECT(data);
612 g_assert(self->listen_socket != -1);
614 if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
615 xfer_cancel_with_error(XFER_ELEMENT(self),
616 _("Error accepting incoming connection: %s"), strerror(errno));
617 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
621 /* close the listening socket now, for good measure */
622 close(self->listen_socket);
623 self->listen_socket = -1;
625 tu_dbg("connection accepted\n");
627 buf = g_malloc(TEST_BLOCK_SIZE);
628 for (i = 0; i < TEST_BLOCK_COUNT; i++) {
629 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
630 if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
631 error("error in full_write(): %s", strerror(errno));
635 /* send a smaller block */
636 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
637 if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
638 error("error in full_write(): %s", strerror(errno));
642 /* send EOF by closing the socket */
645 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
651 source_connect_setup_impl(
654 XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
656 DirectTCPAddr *addrs;
660 /* set up self->listen_socket and set elt->output_listen_addrs */
661 sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
663 error("socket(): %s", strerror(errno));
665 if (listen(sock, 1) < 0)
666 error("listen(): %s", strerror(errno));
669 if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
670 error("getsockname(): %s", strerror(errno));
671 g_assert(SU_GET_FAMILY(&addr) == AF_INET);
673 addrs = g_new0(DirectTCPAddr, 2);
674 copy_sockaddr(&addrs[0], &addr);
675 elt->output_listen_addrs = addrs;
681 source_connect_start_impl(
684 XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
686 simpleprng_seed(&self->prng, RANDOM_SEED);
688 self->thread = g_thread_create(source_connect_thread, (gpointer)self, FALSE, NULL);
694 source_connect_class_init(
695 XferSourceConnectClass * klass)
697 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
698 static xfer_element_mech_pair_t mech_pairs[] = {
699 { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) },
700 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
703 xec->setup = source_connect_setup_impl;
704 xec->start = source_connect_start_impl;
705 xec->mech_pairs = mech_pairs;
709 xfer_source_connect_get_type (void)
711 static GType type = 0;
713 if G_UNLIKELY(type == 0) {
714 static const GTypeInfo info = {
715 sizeof (XferSourceConnectClass),
716 (GBaseInitFunc) NULL,
717 (GBaseFinalizeFunc) NULL,
718 (GClassInitFunc) source_connect_class_init,
719 (GClassFinalizeFunc) NULL,
720 NULL /* class_data */,
721 sizeof (XferSourceConnect),
723 (GInstanceInitFunc) NULL,
727 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceConnect", &info, 0);
735 static GType xfer_dest_readfd_get_type(void);
736 #define XFER_DEST_READFD_TYPE (xfer_dest_readfd_get_type())
737 #define XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd)
738 #define XFER_DEST_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd const)
739 #define XFER_DEST_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_readfd_get_type(), XferDestReadfdClass)
740 #define IS_XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_readfd_get_type ())
741 #define XFER_DEST_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_readfd_get_type(), XferDestReadfdClass)
743 typedef struct XferDestReadfd {
744 XferElement __parent__;
747 simpleprng_state_t prng;
751 XferElementClass __parent__;
752 } XferDestReadfdClass;
758 XferDestReadfd *self = XFER_DEST_READFD(data);
759 XferElement *elt = XFER_ELEMENT(data);
760 char buf[TEST_XFER_SIZE];
762 int fd = xfer_element_swap_output_fd(elt->upstream, -1);
764 /* this shouldn't happen, although non-test elements handle it gracefully */
767 remaining = sizeof(buf);
770 if ((nread = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
771 error("error in read(): %s", strerror(errno));
776 /* we should be at EOF here */
777 if (read(fd, buf, 10) != 0)
778 g_critical("too much data entering XferDestReadfd");
780 if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
781 g_critical("data entering XferDestReadfd does not match");
785 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
791 dest_readfd_start_impl(
794 XferDestReadfd *self = XFER_DEST_READFD(elt);
796 simpleprng_seed(&self->prng, RANDOM_SEED);
798 self->thread = g_thread_create(dest_readfd_thread, (gpointer)self, FALSE, NULL);
804 dest_readfd_class_init(
805 XferDestReadfdClass * klass)
807 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
808 static xfer_element_mech_pair_t mech_pairs[] = {
809 { XFER_MECH_READFD, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
810 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
813 xec->start = dest_readfd_start_impl;
814 xec->mech_pairs = mech_pairs;
818 xfer_dest_readfd_get_type (void)
820 static GType type = 0;
822 if G_UNLIKELY(type == 0) {
823 static const GTypeInfo info = {
824 sizeof (XferDestReadfdClass),
825 (GBaseInitFunc) NULL,
826 (GBaseFinalizeFunc) NULL,
827 (GClassInitFunc) dest_readfd_class_init,
828 (GClassFinalizeFunc) NULL,
829 NULL /* class_data */,
830 sizeof (XferDestReadfd),
832 (GInstanceInitFunc) NULL,
836 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestReadfd", &info, 0);
844 static GType xfer_dest_writefd_get_type(void);
845 #define XFER_DEST_WRITEFD_TYPE (xfer_dest_writefd_get_type())
846 #define XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd)
847 #define XFER_DEST_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd const)
848 #define XFER_DEST_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_writefd_get_type(), XferDestWritefdClass)
849 #define IS_XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_writefd_get_type ())
850 #define XFER_DEST_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_writefd_get_type(), XferDestWritefdClass)
852 typedef struct XferDestWritefd {
853 XferElement __parent__;
857 simpleprng_state_t prng;
861 XferElementClass __parent__;
862 } XferDestWritefdClass;
868 XferDestWritefd *self = XFER_DEST_WRITEFD(data);
869 char buf[TEST_XFER_SIZE];
871 int fd = self->read_fd;
873 remaining = sizeof(buf);
876 if ((nwrite = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
877 error("error in read(): %s", strerror(errno));
882 /* we should be at EOF here */
883 if (read(fd, buf, 10) != 0)
884 g_critical("too much data entering XferDestWritefd");
886 if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
887 g_critical("data entering XferDestWritefd does not match");
891 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
897 dest_writefd_setup_impl(
900 XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
903 simpleprng_seed(&self->prng, RANDOM_SEED);
906 g_critical("Error from pipe(): %s", strerror(errno));
908 self->read_fd = p[0];
909 g_assert(xfer_element_swap_input_fd(elt, p[1]) == -1);
915 dest_writefd_start_impl(
918 XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
919 self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL);
925 dest_writefd_class_init(
926 XferDestWritefdClass * klass)
928 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
929 static xfer_element_mech_pair_t mech_pairs[] = {
930 { XFER_MECH_WRITEFD, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
931 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
934 xec->setup = dest_writefd_setup_impl;
935 xec->start = dest_writefd_start_impl;
936 xec->mech_pairs = mech_pairs;
940 xfer_dest_writefd_get_type (void)
942 static GType type = 0;
944 if G_UNLIKELY(type == 0) {
945 static const GTypeInfo info = {
946 sizeof (XferDestWritefdClass),
947 (GBaseInitFunc) NULL,
948 (GBaseFinalizeFunc) NULL,
949 (GClassInitFunc) dest_writefd_class_init,
950 (GClassFinalizeFunc) NULL,
951 NULL /* class_data */,
952 sizeof (XferDestWritefd),
954 (GInstanceInitFunc) NULL,
958 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestWritefd", &info, 0);
966 static GType xfer_dest_push_get_type(void);
967 #define XFER_DEST_PUSH_TYPE (xfer_dest_push_get_type())
968 #define XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush)
969 #define XFER_DEST_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush const)
970 #define XFER_DEST_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_push_get_type(), XferDestPushClass)
971 #define IS_XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_push_get_type ())
972 #define XFER_DEST_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_push_get_type(), XferDestPushClass)
974 typedef struct XferDestPush {
975 XferElement __parent__;
981 simpleprng_state_t prng;
985 XferElementClass __parent__;
989 dest_push_push_buffer_impl(
994 XferDestPush *self = XFER_DEST_PUSH(elt);
997 /* if we're at EOF, verify we got the right bytes */
998 g_assert(self->bufpos == TEST_XFER_SIZE);
999 if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE))
1000 g_critical("data entering XferDestPush does not match");
1005 g_assert(self->bufpos + size <= TEST_XFER_SIZE);
1006 memcpy(self->buf + self->bufpos, buf, size);
1007 self->bufpos += size;
1011 dest_push_setup_impl(
1014 XferDestPush *self = XFER_DEST_PUSH(elt);
1016 self->buf = g_malloc(TEST_XFER_SIZE);
1017 simpleprng_seed(&self->prng, RANDOM_SEED);
1023 dest_push_class_init(
1024 XferDestPushClass * klass)
1026 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1027 static xfer_element_mech_pair_t mech_pairs[] = {
1028 { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(0) },
1029 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1032 xec->push_buffer = dest_push_push_buffer_impl;
1033 xec->setup = dest_push_setup_impl;
1034 xec->mech_pairs = mech_pairs;
1038 xfer_dest_push_get_type (void)
1040 static GType type = 0;
1042 if G_UNLIKELY(type == 0) {
1043 static const GTypeInfo info = {
1044 sizeof (XferDestPushClass),
1045 (GBaseInitFunc) NULL,
1046 (GBaseFinalizeFunc) NULL,
1047 (GClassInitFunc) dest_push_class_init,
1048 (GClassFinalizeFunc) NULL,
1049 NULL /* class_data */,
1050 sizeof (XferDestPush),
1051 0 /* n_preallocs */,
1052 (GInstanceInitFunc) NULL,
1056 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPush", &info, 0);
1064 static GType xfer_dest_pull_get_type(void);
1065 #define XFER_DEST_PULL_TYPE (xfer_dest_pull_get_type())
1066 #define XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull)
1067 #define XFER_DEST_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull const)
1068 #define XFER_DEST_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_pull_get_type(), XferDestPullClass)
1069 #define IS_XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_pull_get_type ())
1070 #define XFER_DEST_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_pull_get_type(), XferDestPullClass)
1072 typedef struct XferDestPull {
1073 XferElement __parent__;
1076 simpleprng_state_t prng;
1080 XferElementClass __parent__;
1081 } XferDestPullClass;
1087 XferDestPull *self = XFER_DEST_PULL(data);
1088 char fullbuf[TEST_XFER_SIZE];
1093 while ((buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size))) {
1094 g_assert(bufpos + size <= TEST_XFER_SIZE);
1095 memcpy(fullbuf + bufpos, buf, size);
1099 /* we're at EOF, so verify we got the right bytes */
1100 g_assert(bufpos == TEST_XFER_SIZE);
1101 if (!simpleprng_verify_buffer(&self->prng, fullbuf, TEST_XFER_SIZE))
1102 g_critical("data entering XferDestPull does not match");
1104 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1110 dest_pull_start_impl(
1113 XferDestPull *self = XFER_DEST_PULL(elt);
1115 simpleprng_seed(&self->prng, RANDOM_SEED);
1117 self->thread = g_thread_create(dest_pull_thread, (gpointer)self, FALSE, NULL);
1123 dest_pull_class_init(
1124 XferDestPullClass * klass)
1126 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1127 static xfer_element_mech_pair_t mech_pairs[] = {
1128 { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
1129 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1132 xec->start = dest_pull_start_impl;
1133 xec->mech_pairs = mech_pairs;
1137 xfer_dest_pull_get_type (void)
1139 static GType type = 0;
1141 if G_UNLIKELY(type == 0) {
1142 static const GTypeInfo info = {
1143 sizeof (XferDestPullClass),
1144 (GBaseInitFunc) NULL,
1145 (GBaseFinalizeFunc) NULL,
1146 (GClassInitFunc) dest_pull_class_init,
1147 (GClassFinalizeFunc) NULL,
1148 NULL /* class_data */,
1149 sizeof (XferDestPull),
1150 0 /* n_preallocs */,
1151 (GInstanceInitFunc) NULL,
1155 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPull", &info, 0);
1163 static GType xfer_dest_listen_get_type(void);
1164 #define XFER_DEST_LISTEN_TYPE (xfer_dest_listen_get_type())
1165 #define XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen)
1166 #define XFER_DEST_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen const)
1167 #define XFER_DEST_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_listen_get_type(), XferDestListenClass)
1168 #define IS_XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_listen_get_type ())
1169 #define XFER_DEST_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_listen_get_type(), XferDestListenClass)
1171 typedef struct XferDestListen {
1172 XferElement __parent__;
1177 simpleprng_state_t prng;
1181 XferElementClass __parent__;
1182 } XferDestListenClass;
1188 XferDestListen *self = XFER_DEST_LISTEN(data);
1193 g_assert(self->listen_socket != -1);
1195 if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
1196 xfer_cancel_with_error(XFER_ELEMENT(self),
1197 _("Error accepting incoming connection: %s"), strerror(errno));
1198 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1202 /* close the listening socket now, for good measure */
1203 close(self->listen_socket);
1204 self->listen_socket = -1;
1206 /* read from the socket until EOF or all of the data is read. We try to
1207 * read one extra byte - if we get it, then upstream sent too much data */
1208 buf = g_malloc(TEST_XFER_SIZE+1);
1209 bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1210 g_assert(bytes == TEST_XFER_SIZE);
1213 /* we're at EOF, so verify we got the right bytes */
1214 g_assert(bytes == TEST_XFER_SIZE);
1215 if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1216 g_critical("data entering XferDestListen does not match");
1218 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1224 dest_listen_setup_impl(
1227 XferDestListen *self = XFER_DEST_LISTEN(elt);
1228 sockaddr_union addr;
1229 DirectTCPAddr *addrs;
1233 /* set up self->listen_socket and set elt->input_listen_addrs */
1234 sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
1236 error("socket(): %s", strerror(errno));
1238 if (listen(sock, 1) < 0)
1239 error("listen(): %s", strerror(errno));
1242 if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
1243 error("getsockname(): %s", strerror(errno));
1244 g_assert(SU_GET_FAMILY(&addr) == AF_INET);
1246 addrs = g_new0(DirectTCPAddr, 2);
1247 copy_sockaddr(&addrs[0], &addr);
1248 elt->input_listen_addrs = addrs;
1254 dest_listen_start_impl(
1257 XferDestListen *self = XFER_DEST_LISTEN(elt);
1259 simpleprng_seed(&self->prng, RANDOM_SEED);
1261 self->thread = g_thread_create(dest_listen_thread, (gpointer)self, FALSE, NULL);
1267 dest_listen_class_init(
1268 XferDestListenClass * klass)
1270 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1271 static xfer_element_mech_pair_t mech_pairs[] = {
1272 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
1273 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1276 xec->setup = dest_listen_setup_impl;
1277 xec->start = dest_listen_start_impl;
1278 xec->mech_pairs = mech_pairs;
1282 xfer_dest_listen_get_type (void)
1284 static GType type = 0;
1286 if G_UNLIKELY(type == 0) {
1287 static const GTypeInfo info = {
1288 sizeof (XferDestListenClass),
1289 (GBaseInitFunc) NULL,
1290 (GBaseFinalizeFunc) NULL,
1291 (GClassInitFunc) dest_listen_class_init,
1292 (GClassFinalizeFunc) NULL,
1293 NULL /* class_data */,
1294 sizeof (XferDestListen),
1295 0 /* n_preallocs */,
1296 (GInstanceInitFunc) NULL,
1300 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestListen", &info, 0);
1308 static GType xfer_dest_connect_get_type(void);
1309 #define XFER_DEST_CONNECT_TYPE (xfer_dest_connect_get_type())
1310 #define XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect)
1311 #define XFER_DEST_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect const)
1312 #define XFER_DEST_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_connect_get_type(), XferDestConnectClass)
1313 #define IS_XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_connect_get_type ())
1314 #define XFER_DEST_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_connect_get_type(), XferDestConnectClass)
1316 typedef struct XferDestConnect {
1317 XferElement __parent__;
1322 simpleprng_state_t prng;
1326 XferElementClass __parent__;
1327 } XferDestConnectClass;
1330 dest_connect_thread(
1333 XferDestConnect *self = XFER_DEST_CONNECT(data);
1334 XferElement *elt = XFER_ELEMENT(self);
1335 DirectTCPAddr *addrs;
1336 sockaddr_union addr;
1341 /* set up the sockaddr -- IPv4 only */
1342 SU_INIT(&addr, AF_INET);
1343 addrs = elt->upstream->output_listen_addrs;
1344 g_assert(addrs != NULL);
1345 copy_sockaddr(&addr, addrs);
1347 tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
1348 sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
1350 error("socket(): %s", strerror(errno));
1352 if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
1353 error("connect(): %s", strerror(errno));
1356 tu_dbg("connected to %s\n", str_sockaddr(&addr));
1358 /* read from the socket until EOF or all of the data is read. We try to
1359 * read one extra byte - if we get it, then upstream sent too much data */
1360 buf = g_malloc(TEST_XFER_SIZE+1);
1361 bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1362 g_assert(bytes == TEST_XFER_SIZE);
1365 /* we're at EOF, so verify we got the right bytes */
1366 g_assert(bytes == TEST_XFER_SIZE);
1367 if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1368 g_critical("data entering XferDestConnect does not match");
1370 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1376 dest_connect_start_impl(
1379 XferDestConnect *self = XFER_DEST_CONNECT(elt);
1381 simpleprng_seed(&self->prng, RANDOM_SEED);
1383 self->thread = g_thread_create(dest_connect_thread, (gpointer)self, FALSE, NULL);
1389 dest_connect_class_init(
1390 XferDestConnectClass * klass)
1392 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1393 static xfer_element_mech_pair_t mech_pairs[] = {
1394 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, XFER_NROPS(1), XFER_NTHREADS(1) },
1395 { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
1398 xec->start = dest_connect_start_impl;
1399 xec->mech_pairs = mech_pairs;
1403 xfer_dest_connect_get_type (void)
1405 static GType type = 0;
1407 if G_UNLIKELY(type == 0) {
1408 static const GTypeInfo info = {
1409 sizeof (XferDestConnectClass),
1410 (GBaseInitFunc) NULL,
1411 (GBaseFinalizeFunc) NULL,
1412 (GClassInitFunc) dest_connect_class_init,
1413 (GClassFinalizeFunc) NULL,
1414 NULL /* class_data */,
1415 sizeof (XferDestConnect),
1416 0 /* n_preallocs */,
1417 (GInstanceInitFunc) NULL,
1421 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestConnect", &info, 0);
1433 test_xfer_generic_callback(
1434 gpointer data G_GNUC_UNUSED,
1438 tu_dbg("Received message %s\n", xmsg_repr(msg));
1440 switch (msg->type) {
1443 if (xfer->status == XFER_DONE) {
1444 tu_dbg("all elements are done!\n");
1445 g_main_loop_quit(default_main_loop());
1455 * Run a simple transfer with some xor filters
1459 test_xfer_simple(void)
1463 XferElement *elements[] = {
1464 xfer_source_random(100*1024, RANDOM_SEED),
1465 xfer_filter_xor('d'),
1466 xfer_filter_xor('d'),
1467 xfer_dest_null(RANDOM_SEED),
1470 Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1471 src = xfer_get_source(xfer);
1472 g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1473 g_source_attach(src, NULL);
1474 tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1476 /* unreference the elements */
1477 for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1478 g_object_unref(elements[i]);
1479 g_assert(G_OBJECT(elements[i])->ref_count == 1);
1483 xfer_start(xfer, 0, 0);
1485 g_main_loop_run(default_main_loop());
1486 g_assert(xfer->status == XFER_DONE);
1494 * Run a transfer between two files, with or without filters
1498 test_xfer_files(gboolean add_filters)
1503 char *in_filename = __FILE__;
1504 char *out_filename = "xfer-test.tmp"; /* current directory is writeable */
1507 XferElement *elements[4];
1509 rfd = open(in_filename, O_RDONLY, 0);
1511 g_critical("Could not open '%s': %s", in_filename, strerror(errno));
1513 wfd = open(out_filename, O_WRONLY|O_CREAT, 0777);
1515 g_critical("Could not open '%s': %s", out_filename, strerror(errno));
1518 elements[elts++] = xfer_source_fd(rfd);
1520 elements[elts++] = xfer_filter_xor(0xab);
1521 elements[elts++] = xfer_filter_xor(0xab);
1523 elements[elts++] = xfer_dest_fd(wfd);
1525 xfer = xfer_new(elements, elts);
1526 src = xfer_get_source(xfer);
1527 g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1528 g_source_attach(src, NULL);
1529 tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1531 /* unreference the elements */
1532 for (i = 0; i < elts; i++) {
1533 g_object_unref(elements[i]);
1534 g_assert(G_OBJECT(elements[i])->ref_count == 1);
1538 xfer_start(xfer, 0, 0);
1540 g_main_loop_run(default_main_loop());
1541 g_assert(xfer->status == XFER_DONE);
1545 unlink(out_filename); /* ignore any errors */
1551 test_xfer_files_simple(void)
1553 return test_xfer_files(FALSE);
1557 test_xfer_files_filter(void)
1559 return test_xfer_files(TRUE);
1563 * test each possible combination of source and destination mechansim
1568 XferElement *source,
1573 XferElement *elements[] = { source, dest };
1575 Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1576 src = xfer_get_source(xfer);
1577 g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1578 g_source_attach(src, NULL);
1580 /* unreference the elements */
1581 for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1582 g_object_unref(elements[i]);
1583 g_assert(G_OBJECT(elements[i])->ref_count == 1);
1587 xfer_start(xfer, 0, 0);
1589 g_main_loop_run(default_main_loop());
1590 g_assert(xfer->status == XFER_DONE);
1597 #define make_test_glue(n, s, d) static int n(void) \
1599 return test_glue_combo((XferElement *)g_object_new(s, NULL), \
1600 (XferElement *)g_object_new(d, NULL)); \
1602 make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE)
1603 make_test_glue(test_glue_READFD_WRITEFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1604 make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE)
1605 make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE)
1606 make_test_glue(test_glue_READFD_LISTEN, XFER_SOURCE_READFD_TYPE, XFER_DEST_LISTEN_TYPE)
1607 make_test_glue(test_glue_READFD_CONNECT, XFER_SOURCE_READFD_TYPE, XFER_DEST_CONNECT_TYPE)
1608 make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE)
1609 make_test_glue(test_glue_WRITEFD_WRITEFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1610 make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE)
1611 make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE)
1612 make_test_glue(test_glue_WRITEFD_LISTEN, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_LISTEN_TYPE)
1613 make_test_glue(test_glue_WRITEFD_CONNECT, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_CONNECT_TYPE)
1614 make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE)
1615 make_test_glue(test_glue_PUSH_WRITEFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
1616 make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE)
1617 make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE)
1618 make_test_glue(test_glue_PUSH_LISTEN, XFER_SOURCE_PUSH_TYPE, XFER_DEST_LISTEN_TYPE)
1619 make_test_glue(test_glue_PUSH_CONNECT, XFER_SOURCE_PUSH_TYPE, XFER_DEST_CONNECT_TYPE)
1620 make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE)
1621 make_test_glue(test_glue_PULL_WRITEFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
1622 make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE)
1623 make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE)
1624 make_test_glue(test_glue_PULL_LISTEN, XFER_SOURCE_PULL_TYPE, XFER_DEST_LISTEN_TYPE)
1625 make_test_glue(test_glue_PULL_CONNECT, XFER_SOURCE_PULL_TYPE, XFER_DEST_CONNECT_TYPE)
1626 make_test_glue(test_glue_LISTEN_READFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_READFD_TYPE)
1627 make_test_glue(test_glue_LISTEN_WRITEFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_WRITEFD_TYPE)
1628 make_test_glue(test_glue_LISTEN_PUSH, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PUSH_TYPE)
1629 make_test_glue(test_glue_LISTEN_PULL, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PULL_TYPE)
1630 make_test_glue(test_glue_LISTEN_LISTEN, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_LISTEN_TYPE)
1631 make_test_glue(test_glue_LISTEN_CONNECT, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_CONNECT_TYPE)
1632 make_test_glue(test_glue_CONNECT_READFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_READFD_TYPE)
1633 make_test_glue(test_glue_CONNECT_WRITEFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_WRITEFD_TYPE)
1634 make_test_glue(test_glue_CONNECT_PUSH, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PUSH_TYPE)
1635 make_test_glue(test_glue_CONNECT_PULL, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PULL_TYPE)
1636 make_test_glue(test_glue_CONNECT_LISTEN, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_LISTEN_TYPE)
1637 make_test_glue(test_glue_CONNECT_CONNECT, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_CONNECT_TYPE)
1644 main(int argc, char **argv)
1646 static TestUtilsTest tests[] = {
1647 TU_TEST(test_xfer_simple, 90),
1648 TU_TEST(test_xfer_files_simple, 90),
1649 TU_TEST(test_xfer_files_filter, 90),
1650 TU_TEST(test_glue_READFD_READFD, 90),
1651 TU_TEST(test_glue_READFD_WRITEFD, 90),
1652 TU_TEST(test_glue_READFD_PUSH, 90),
1653 TU_TEST(test_glue_READFD_PULL, 90),
1654 TU_TEST(test_glue_READFD_LISTEN, 90),
1655 TU_TEST(test_glue_READFD_CONNECT, 90),
1656 TU_TEST(test_glue_WRITEFD_READFD, 90),
1657 TU_TEST(test_glue_WRITEFD_WRITEFD, 90),
1658 TU_TEST(test_glue_WRITEFD_PUSH, 90),
1659 TU_TEST(test_glue_WRITEFD_PULL, 90),
1660 TU_TEST(test_glue_WRITEFD_LISTEN, 90),
1661 TU_TEST(test_glue_WRITEFD_CONNECT, 90),
1662 TU_TEST(test_glue_PUSH_READFD, 90),
1663 TU_TEST(test_glue_PUSH_WRITEFD, 90),
1664 TU_TEST(test_glue_PUSH_PUSH, 90),
1665 TU_TEST(test_glue_PUSH_PULL, 90),
1666 TU_TEST(test_glue_PUSH_LISTEN, 90),
1667 TU_TEST(test_glue_PUSH_CONNECT, 90),
1668 TU_TEST(test_glue_PULL_READFD, 90),
1669 TU_TEST(test_glue_PULL_WRITEFD, 90),
1670 TU_TEST(test_glue_PULL_PUSH, 90),
1671 TU_TEST(test_glue_PULL_PULL, 90),
1672 TU_TEST(test_glue_PULL_LISTEN, 90),
1673 TU_TEST(test_glue_PULL_CONNECT, 90),
1674 TU_TEST(test_glue_LISTEN_READFD, 90),
1675 TU_TEST(test_glue_LISTEN_WRITEFD, 90),
1676 TU_TEST(test_glue_LISTEN_PUSH, 90),
1677 TU_TEST(test_glue_LISTEN_PULL, 90),
1678 TU_TEST(test_glue_LISTEN_LISTEN, 90),
1679 TU_TEST(test_glue_LISTEN_CONNECT, 90),
1680 TU_TEST(test_glue_CONNECT_READFD, 90),
1681 TU_TEST(test_glue_CONNECT_WRITEFD, 90),
1682 TU_TEST(test_glue_CONNECT_PUSH, 90),
1683 TU_TEST(test_glue_CONNECT_PULL, 90),
1684 TU_TEST(test_glue_CONNECT_LISTEN, 90),
1685 TU_TEST(test_glue_CONNECT_CONNECT, 90),
1691 return testutils_run_tests(argc, argv, tests);