2 * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 as published
6 * by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 * Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
18 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20 * Author: Dustin J. Mitchell <dustin@zmanda.com>
24 #include "glib-util.h"
25 #include "testutils.h"
28 #include "simpleprng.h"
29 #include "sockaddr-util.h"
31 /* Having tests repeat exactly is an advantage, so we use a hard-coded
33 #define RANDOM_SEED 0xf00d
36 * XferElement subclasses
38 * This file defines a few "private" element classes that each have only one
39 * mechanism pair. These classes are then used to test all of the possible
40 * combinations of glue.
43 /* constants to determine the total amount of data to be transfered; EXTRA is
44 * to test out partial-block handling; it should be prime. */
45 #define TEST_BLOCK_SIZE 32768
46 #define TEST_BLOCK_COUNT 10
47 #define TEST_BLOCK_EXTRA 97
48 #define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA)
52 static GType xfer_source_readfd_get_type(void);
53 #define XFER_SOURCE_READFD_TYPE (xfer_source_readfd_get_type())
54 #define XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd)
55 #define XFER_SOURCE_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd const)
56 #define XFER_SOURCE_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_readfd_get_type(), XferSourceReadfdClass)
57 #define IS_XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_readfd_get_type ())
58 #define XFER_SOURCE_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_readfd_get_type(), XferSourceReadfdClass)
60 typedef struct XferSourceReadfd {
61 XferElement __parent__;
65 simpleprng_state_t prng;
69 XferElementClass __parent__;
70 } XferSourceReadfdClass;
76 XferSourceReadfd *self = XFER_SOURCE_READFD(data);
77 char buf[TEST_XFER_SIZE];
78 int fd = self->write_fd;
80 simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
82 if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
83 error("error in full_write(): %s", strerror(errno));
88 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
94 source_readfd_setup_impl(
97 XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
100 simpleprng_seed(&self->prng, RANDOM_SEED);
103 g_critical("Error from pipe(): %s", strerror(errno));
105 self->write_fd = p[1];
106 XFER_ELEMENT(self)->output_fd = p[0];
112 source_readfd_start_impl(
115 XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
116 self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL);
122 source_readfd_class_init(
123 XferSourceReadfdClass * klass)
125 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
126 static xfer_element_mech_pair_t mech_pairs[] = {
127 { XFER_MECH_NONE, XFER_MECH_READFD, 1, 1},
128 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
131 xec->setup = source_readfd_setup_impl;
132 xec->start = source_readfd_start_impl;
133 xec->mech_pairs = mech_pairs;
137 xfer_source_readfd_get_type (void)
139 static GType type = 0;
141 if G_UNLIKELY(type == 0) {
142 static const GTypeInfo info = {
143 sizeof (XferSourceReadfdClass),
144 (GBaseInitFunc) NULL,
145 (GBaseFinalizeFunc) NULL,
146 (GClassInitFunc) source_readfd_class_init,
147 (GClassFinalizeFunc) NULL,
148 NULL /* class_data */,
149 sizeof (XferSourceReadfd),
151 (GInstanceInitFunc) NULL,
155 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceReadfd", &info, 0);
163 static GType xfer_source_writefd_get_type(void);
164 #define XFER_SOURCE_WRITEFD_TYPE (xfer_source_writefd_get_type())
165 #define XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd)
166 #define XFER_SOURCE_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd const)
167 #define XFER_SOURCE_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_writefd_get_type(), XferSourceWritefdClass)
168 #define IS_XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_writefd_get_type ())
169 #define XFER_SOURCE_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_writefd_get_type(), XferSourceWritefdClass)
171 typedef struct XferSourceWritefd {
172 XferElement __parent__;
175 simpleprng_state_t prng;
179 XferElementClass __parent__;
180 } XferSourceWritefdClass;
183 source_writefd_thread(
186 XferSourceWritefd *self = XFER_SOURCE_WRITEFD(data);
187 char buf[TEST_XFER_SIZE];
188 int fd = XFER_ELEMENT(self)->downstream->input_fd;
190 simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
192 if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
193 error("error in full_write(): %s", strerror(errno));
197 XFER_ELEMENT(self)->downstream->input_fd = -1;
199 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
205 source_writefd_start_impl(
208 XferSourceWritefd *self = XFER_SOURCE_WRITEFD(elt);
210 simpleprng_seed(&self->prng, RANDOM_SEED);
212 self->thread = g_thread_create(source_writefd_thread, (gpointer)self, FALSE, NULL);
218 source_writefd_class_init(
219 XferSourceWritefdClass * klass)
221 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
222 static xfer_element_mech_pair_t mech_pairs[] = {
223 { XFER_MECH_NONE, XFER_MECH_WRITEFD, 1, 1},
224 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
227 xec->start = source_writefd_start_impl;
228 xec->mech_pairs = mech_pairs;
232 xfer_source_writefd_get_type (void)
234 static GType type = 0;
236 if G_UNLIKELY(type == 0) {
237 static const GTypeInfo info = {
238 sizeof (XferSourceWritefdClass),
239 (GBaseInitFunc) NULL,
240 (GBaseFinalizeFunc) NULL,
241 (GClassInitFunc) source_writefd_class_init,
242 (GClassFinalizeFunc) NULL,
243 NULL /* class_data */,
244 sizeof (XferSourceWritefd),
246 (GInstanceInitFunc) NULL,
250 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceWritefd", &info, 0);
258 static GType xfer_source_push_get_type(void);
259 #define XFER_SOURCE_PUSH_TYPE (xfer_source_push_get_type())
260 #define XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush)
261 #define XFER_SOURCE_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush const)
262 #define XFER_SOURCE_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_push_get_type(), XferSourcePushClass)
263 #define IS_XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_push_get_type ())
264 #define XFER_SOURCE_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_push_get_type(), XferSourcePushClass)
266 typedef struct XferSourcePush {
267 XferElement __parent__;
270 simpleprng_state_t prng;
274 XferElementClass __parent__;
275 } XferSourcePushClass;
281 XferSourcePush *self = XFER_SOURCE_PUSH(data);
285 for (i = 0; i < TEST_BLOCK_COUNT; i++) {
286 buf = g_malloc(TEST_BLOCK_SIZE);
287 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
288 xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_SIZE);
292 /* send a smaller block */
293 buf = g_malloc(TEST_BLOCK_EXTRA);
294 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
295 xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_EXTRA);
299 xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, NULL, 0);
301 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
307 source_push_start_impl(
310 XferSourcePush *self = XFER_SOURCE_PUSH(elt);
312 simpleprng_seed(&self->prng, RANDOM_SEED);
314 self->thread = g_thread_create(source_push_thread, (gpointer)self, FALSE, NULL);
320 source_push_class_init(
321 XferSourcePushClass * klass)
323 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
324 static xfer_element_mech_pair_t mech_pairs[] = {
325 { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, 1, 1},
326 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
329 xec->start = source_push_start_impl;
330 xec->mech_pairs = mech_pairs;
334 xfer_source_push_get_type (void)
336 static GType type = 0;
338 if G_UNLIKELY(type == 0) {
339 static const GTypeInfo info = {
340 sizeof (XferSourcePushClass),
341 (GBaseInitFunc) NULL,
342 (GBaseFinalizeFunc) NULL,
343 (GClassInitFunc) source_push_class_init,
344 (GClassFinalizeFunc) NULL,
345 NULL /* class_data */,
346 sizeof (XferSourcePush),
348 (GInstanceInitFunc) NULL,
352 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePush", &info, 0);
360 static GType xfer_source_pull_get_type(void);
361 #define XFER_SOURCE_PULL_TYPE (xfer_source_pull_get_type())
362 #define XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull)
363 #define XFER_SOURCE_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull const)
364 #define XFER_SOURCE_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_pull_get_type(), XferSourcePullClass)
365 #define IS_XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_pull_get_type ())
366 #define XFER_SOURCE_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_pull_get_type(), XferSourcePullClass)
368 typedef struct XferSourcePull {
369 XferElement __parent__;
373 simpleprng_state_t prng;
377 XferElementClass __parent__;
378 } XferSourcePullClass;
381 source_pull_pull_buffer_impl(
385 XferSourcePull *self = XFER_SOURCE_PULL(elt);
389 if (self->nbuffers > TEST_BLOCK_COUNT) {
393 bufsiz = (self->nbuffers != TEST_BLOCK_COUNT)? TEST_BLOCK_SIZE : TEST_BLOCK_EXTRA;
397 buf = g_malloc(bufsiz);
398 simpleprng_fill_buffer(&self->prng, buf, bufsiz);
404 source_pull_setup_impl(
407 XferSourcePull *self = XFER_SOURCE_PULL(elt);
409 simpleprng_seed(&self->prng, RANDOM_SEED);
415 source_pull_class_init(
416 XferSourcePullClass * klass)
418 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
419 static xfer_element_mech_pair_t mech_pairs[] = {
420 { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
421 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
424 xec->pull_buffer = source_pull_pull_buffer_impl;
425 xec->setup = source_pull_setup_impl;
426 xec->mech_pairs = mech_pairs;
430 xfer_source_pull_get_type (void)
432 static GType type = 0;
434 if G_UNLIKELY(type == 0) {
435 static const GTypeInfo info = {
436 sizeof (XferSourcePullClass),
437 (GBaseInitFunc) NULL,
438 (GBaseFinalizeFunc) NULL,
439 (GClassInitFunc) source_pull_class_init,
440 (GClassFinalizeFunc) NULL,
441 NULL /* class_data */,
442 sizeof (XferSourcePull),
444 (GInstanceInitFunc) NULL,
448 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePull", &info, 0);
456 static GType xfer_source_listen_get_type(void);
457 #define XFER_SOURCE_LISTEN_TYPE (xfer_source_listen_get_type())
458 #define XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen)
459 #define XFER_SOURCE_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen const)
460 #define XFER_SOURCE_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_listen_get_type(), XferSourceListenClass)
461 #define IS_XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_listen_get_type ())
462 #define XFER_SOURCE_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_listen_get_type(), XferSourceListenClass)
464 typedef struct XferSourceListen {
465 XferElement __parent__;
468 simpleprng_state_t prng;
472 XferElementClass __parent__;
473 } XferSourceListenClass;
476 source_listen_thread(
479 XferSourceListen *self = XFER_SOURCE_LISTEN(data);
480 XferElement *elt = XFER_ELEMENT(self);
481 DirectTCPAddr *addrs;
487 /* set up the sockaddr -- IPv4 only */
488 SU_INIT(&addr, AF_INET);
489 addrs = elt->downstream->input_listen_addrs;
490 g_assert(addrs != NULL);
491 SU_SET_PORT(&addr, addrs->port);
492 ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
494 tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
495 sock = socket(AF_INET, SOCK_STREAM, 0);
497 error("socket(): %s", strerror(errno));
499 if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
500 error("connect(): %s", strerror(errno));
503 tu_dbg("connected to %s\n", str_sockaddr(&addr));
505 buf = g_malloc(TEST_BLOCK_SIZE);
506 for (i = 0; i < TEST_BLOCK_COUNT; i++) {
507 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
508 if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
509 error("error in full_write(): %s", strerror(errno));
513 /* send a smaller block */
514 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
515 if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
516 error("error in full_write(): %s", strerror(errno));
520 /* send EOF by closing the socket */
523 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
529 source_listen_start_impl(
532 XferSourceListen *self = XFER_SOURCE_LISTEN(elt);
534 simpleprng_seed(&self->prng, RANDOM_SEED);
536 self->thread = g_thread_create(source_listen_thread, (gpointer)self, FALSE, NULL);
542 source_listen_class_init(
543 XferSourceListenClass * klass)
545 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
546 static xfer_element_mech_pair_t mech_pairs[] = {
547 { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 1, 0},
548 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
551 xec->start = source_listen_start_impl;
552 xec->mech_pairs = mech_pairs;
556 xfer_source_listen_get_type (void)
558 static GType type = 0;
560 if G_UNLIKELY(type == 0) {
561 static const GTypeInfo info = {
562 sizeof (XferSourceListenClass),
563 (GBaseInitFunc) NULL,
564 (GBaseFinalizeFunc) NULL,
565 (GClassInitFunc) source_listen_class_init,
566 (GClassFinalizeFunc) NULL,
567 NULL /* class_data */,
568 sizeof (XferSourceListen),
570 (GInstanceInitFunc) NULL,
574 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceListen", &info, 0);
582 static GType xfer_source_connect_get_type(void);
583 #define XFER_SOURCE_CONNECT_TYPE (xfer_source_connect_get_type())
584 #define XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect)
585 #define XFER_SOURCE_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect const)
586 #define XFER_SOURCE_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_connect_get_type(), XferSourceConnectClass)
587 #define IS_XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_connect_get_type ())
588 #define XFER_SOURCE_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_connect_get_type(), XferSourceConnectClass)
590 typedef struct XferSourceConnect {
591 XferElement __parent__;
596 simpleprng_state_t prng;
600 XferElementClass __parent__;
601 } XferSourceConnectClass;
604 source_connect_thread(
607 XferSourceConnect *self = XFER_SOURCE_CONNECT(data);
612 g_assert(self->listen_socket != -1);
614 if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
615 xfer_cancel_with_error(XFER_ELEMENT(self),
616 _("Error accepting incoming connection: %s"), strerror(errno));
617 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
621 /* close the listening socket now, for good measure */
622 close(self->listen_socket);
623 self->listen_socket = -1;
625 tu_dbg("connection accepted\n");
627 buf = g_malloc(TEST_BLOCK_SIZE);
628 for (i = 0; i < TEST_BLOCK_COUNT; i++) {
629 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
630 if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
631 error("error in full_write(): %s", strerror(errno));
635 /* send a smaller block */
636 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
637 if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
638 error("error in full_write(): %s", strerror(errno));
642 /* send EOF by closing the socket */
645 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
651 source_connect_setup_impl(
654 XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
656 DirectTCPAddr *addrs;
660 /* set up self->listen_socket and set elt->output_listen_addrs */
661 sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
663 error("socket(): %s", strerror(errno));
665 if (listen(sock, 1) < 0)
666 error("listen(): %s", strerror(errno));
669 if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
670 error("getsockname(): %s", strerror(errno));
671 g_assert(SU_GET_FAMILY(&addr) == AF_INET);
673 addrs = g_new0(DirectTCPAddr, 2);
674 addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
675 addrs[0].port = SU_GET_PORT(&addr);
676 elt->output_listen_addrs = addrs;
682 source_connect_start_impl(
685 XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
687 simpleprng_seed(&self->prng, RANDOM_SEED);
689 self->thread = g_thread_create(source_connect_thread, (gpointer)self, FALSE, NULL);
695 source_connect_class_init(
696 XferSourceConnectClass * klass)
698 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
699 static xfer_element_mech_pair_t mech_pairs[] = {
700 { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 1, 0},
701 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
704 xec->setup = source_connect_setup_impl;
705 xec->start = source_connect_start_impl;
706 xec->mech_pairs = mech_pairs;
710 xfer_source_connect_get_type (void)
712 static GType type = 0;
714 if G_UNLIKELY(type == 0) {
715 static const GTypeInfo info = {
716 sizeof (XferSourceConnectClass),
717 (GBaseInitFunc) NULL,
718 (GBaseFinalizeFunc) NULL,
719 (GClassInitFunc) source_connect_class_init,
720 (GClassFinalizeFunc) NULL,
721 NULL /* class_data */,
722 sizeof (XferSourceConnect),
724 (GInstanceInitFunc) NULL,
728 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceConnect", &info, 0);
736 static GType xfer_dest_readfd_get_type(void);
737 #define XFER_DEST_READFD_TYPE (xfer_dest_readfd_get_type())
738 #define XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd)
739 #define XFER_DEST_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd const)
740 #define XFER_DEST_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_readfd_get_type(), XferDestReadfdClass)
741 #define IS_XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_readfd_get_type ())
742 #define XFER_DEST_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_readfd_get_type(), XferDestReadfdClass)
744 typedef struct XferDestReadfd {
745 XferElement __parent__;
748 simpleprng_state_t prng;
752 XferElementClass __parent__;
753 } XferDestReadfdClass;
759 XferDestReadfd *self = XFER_DEST_READFD(data);
760 char buf[TEST_XFER_SIZE];
762 int fd = XFER_ELEMENT(self)->upstream->output_fd;
764 remaining = sizeof(buf);
767 if ((nread = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
768 error("error in read(): %s", strerror(errno));
773 /* we should be at EOF here */
774 if (read(fd, buf, 10) != 0)
775 g_critical("too much data entering XferDestReadfd");
777 if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
778 g_critical("data entering XferDestReadfd does not match");
781 XFER_ELEMENT(self)->upstream->output_fd = -1;
783 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
789 dest_readfd_start_impl(
792 XferDestReadfd *self = XFER_DEST_READFD(elt);
794 simpleprng_seed(&self->prng, RANDOM_SEED);
796 self->thread = g_thread_create(dest_readfd_thread, (gpointer)self, FALSE, NULL);
802 dest_readfd_class_init(
803 XferDestReadfdClass * klass)
805 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
806 static xfer_element_mech_pair_t mech_pairs[] = {
807 { XFER_MECH_READFD, XFER_MECH_NONE, 1, 1},
808 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
811 xec->start = dest_readfd_start_impl;
812 xec->mech_pairs = mech_pairs;
816 xfer_dest_readfd_get_type (void)
818 static GType type = 0;
820 if G_UNLIKELY(type == 0) {
821 static const GTypeInfo info = {
822 sizeof (XferDestReadfdClass),
823 (GBaseInitFunc) NULL,
824 (GBaseFinalizeFunc) NULL,
825 (GClassInitFunc) dest_readfd_class_init,
826 (GClassFinalizeFunc) NULL,
827 NULL /* class_data */,
828 sizeof (XferDestReadfd),
830 (GInstanceInitFunc) NULL,
834 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestReadfd", &info, 0);
842 static GType xfer_dest_writefd_get_type(void);
843 #define XFER_DEST_WRITEFD_TYPE (xfer_dest_writefd_get_type())
844 #define XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd)
845 #define XFER_DEST_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd const)
846 #define XFER_DEST_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_writefd_get_type(), XferDestWritefdClass)
847 #define IS_XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_writefd_get_type ())
848 #define XFER_DEST_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_writefd_get_type(), XferDestWritefdClass)
850 typedef struct XferDestWritefd {
851 XferElement __parent__;
855 simpleprng_state_t prng;
859 XferElementClass __parent__;
860 } XferDestWritefdClass;
866 XferDestWritefd *self = XFER_DEST_WRITEFD(data);
867 char buf[TEST_XFER_SIZE];
869 int fd = self->read_fd;
871 remaining = sizeof(buf);
874 if ((nwrite = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
875 error("error in read(): %s", strerror(errno));
880 /* we should be at EOF here */
881 if (read(fd, buf, 10) != 0)
882 g_critical("too much data entering XferDestWritefd");
884 if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
885 g_critical("data entering XferDestWritefd does not match");
888 XFER_ELEMENT(self)->upstream->output_fd = -1;
890 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
896 dest_writefd_setup_impl(
899 XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
902 simpleprng_seed(&self->prng, RANDOM_SEED);
905 g_critical("Error from pipe(): %s", strerror(errno));
907 self->read_fd = p[0];
908 XFER_ELEMENT(self)->input_fd = p[1];
914 dest_writefd_start_impl(
917 XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
918 self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL);
924 dest_writefd_class_init(
925 XferDestWritefdClass * klass)
927 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
928 static xfer_element_mech_pair_t mech_pairs[] = {
929 { XFER_MECH_WRITEFD, XFER_MECH_NONE, 1, 1},
930 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
933 xec->setup = dest_writefd_setup_impl;
934 xec->start = dest_writefd_start_impl;
935 xec->mech_pairs = mech_pairs;
939 xfer_dest_writefd_get_type (void)
941 static GType type = 0;
943 if G_UNLIKELY(type == 0) {
944 static const GTypeInfo info = {
945 sizeof (XferDestWritefdClass),
946 (GBaseInitFunc) NULL,
947 (GBaseFinalizeFunc) NULL,
948 (GClassInitFunc) dest_writefd_class_init,
949 (GClassFinalizeFunc) NULL,
950 NULL /* class_data */,
951 sizeof (XferDestWritefd),
953 (GInstanceInitFunc) NULL,
957 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestWritefd", &info, 0);
965 static GType xfer_dest_push_get_type(void);
966 #define XFER_DEST_PUSH_TYPE (xfer_dest_push_get_type())
967 #define XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush)
968 #define XFER_DEST_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush const)
969 #define XFER_DEST_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_push_get_type(), XferDestPushClass)
970 #define IS_XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_push_get_type ())
971 #define XFER_DEST_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_push_get_type(), XferDestPushClass)
973 typedef struct XferDestPush {
974 XferElement __parent__;
980 simpleprng_state_t prng;
984 XferElementClass __parent__;
988 dest_push_push_buffer_impl(
993 XferDestPush *self = XFER_DEST_PUSH(elt);
996 /* if we're at EOF, verify we got the right bytes */
997 g_assert(self->bufpos == TEST_XFER_SIZE);
998 if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE))
999 g_critical("data entering XferDestPush does not match");
1004 g_assert(self->bufpos + size <= TEST_XFER_SIZE);
1005 memcpy(self->buf + self->bufpos, buf, size);
1006 self->bufpos += size;
1010 dest_push_setup_impl(
1013 XferDestPush *self = XFER_DEST_PUSH(elt);
1015 self->buf = g_malloc(TEST_XFER_SIZE);
1016 simpleprng_seed(&self->prng, RANDOM_SEED);
1022 dest_push_class_init(
1023 XferDestPushClass * klass)
1025 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1026 static xfer_element_mech_pair_t mech_pairs[] = {
1027 { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 0},
1028 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1031 xec->push_buffer = dest_push_push_buffer_impl;
1032 xec->setup = dest_push_setup_impl;
1033 xec->mech_pairs = mech_pairs;
1037 xfer_dest_push_get_type (void)
1039 static GType type = 0;
1041 if G_UNLIKELY(type == 0) {
1042 static const GTypeInfo info = {
1043 sizeof (XferDestPushClass),
1044 (GBaseInitFunc) NULL,
1045 (GBaseFinalizeFunc) NULL,
1046 (GClassInitFunc) dest_push_class_init,
1047 (GClassFinalizeFunc) NULL,
1048 NULL /* class_data */,
1049 sizeof (XferDestPush),
1050 0 /* n_preallocs */,
1051 (GInstanceInitFunc) NULL,
1055 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPush", &info, 0);
1063 static GType xfer_dest_pull_get_type(void);
1064 #define XFER_DEST_PULL_TYPE (xfer_dest_pull_get_type())
1065 #define XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull)
1066 #define XFER_DEST_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull const)
1067 #define XFER_DEST_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_pull_get_type(), XferDestPullClass)
1068 #define IS_XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_pull_get_type ())
1069 #define XFER_DEST_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_pull_get_type(), XferDestPullClass)
1071 typedef struct XferDestPull {
1072 XferElement __parent__;
1075 simpleprng_state_t prng;
1079 XferElementClass __parent__;
1080 } XferDestPullClass;
1086 XferDestPull *self = XFER_DEST_PULL(data);
1087 char fullbuf[TEST_XFER_SIZE];
1092 while ((buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size))) {
1093 g_assert(bufpos + size <= TEST_XFER_SIZE);
1094 memcpy(fullbuf + bufpos, buf, size);
1098 /* we're at EOF, so verify we got the right bytes */
1099 g_assert(bufpos == TEST_XFER_SIZE);
1100 if (!simpleprng_verify_buffer(&self->prng, fullbuf, TEST_XFER_SIZE))
1101 g_critical("data entering XferDestPull does not match");
1103 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1109 dest_pull_start_impl(
1112 XferDestPull *self = XFER_DEST_PULL(elt);
1114 simpleprng_seed(&self->prng, RANDOM_SEED);
1116 self->thread = g_thread_create(dest_pull_thread, (gpointer)self, FALSE, NULL);
1122 dest_pull_class_init(
1123 XferDestPullClass * klass)
1125 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1126 static xfer_element_mech_pair_t mech_pairs[] = {
1127 { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 1, 1},
1128 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1131 xec->start = dest_pull_start_impl;
1132 xec->mech_pairs = mech_pairs;
1136 xfer_dest_pull_get_type (void)
1138 static GType type = 0;
1140 if G_UNLIKELY(type == 0) {
1141 static const GTypeInfo info = {
1142 sizeof (XferDestPullClass),
1143 (GBaseInitFunc) NULL,
1144 (GBaseFinalizeFunc) NULL,
1145 (GClassInitFunc) dest_pull_class_init,
1146 (GClassFinalizeFunc) NULL,
1147 NULL /* class_data */,
1148 sizeof (XferDestPull),
1149 0 /* n_preallocs */,
1150 (GInstanceInitFunc) NULL,
1154 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPull", &info, 0);
1162 static GType xfer_dest_listen_get_type(void);
1163 #define XFER_DEST_LISTEN_TYPE (xfer_dest_listen_get_type())
1164 #define XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen)
1165 #define XFER_DEST_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen const)
1166 #define XFER_DEST_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_listen_get_type(), XferDestListenClass)
1167 #define IS_XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_listen_get_type ())
1168 #define XFER_DEST_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_listen_get_type(), XferDestListenClass)
1170 typedef struct XferDestListen {
1171 XferElement __parent__;
1176 simpleprng_state_t prng;
1180 XferElementClass __parent__;
1181 } XferDestListenClass;
1187 XferDestListen *self = XFER_DEST_LISTEN(data);
1192 g_assert(self->listen_socket != -1);
1194 if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
1195 xfer_cancel_with_error(XFER_ELEMENT(self),
1196 _("Error accepting incoming connection: %s"), strerror(errno));
1197 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1201 /* close the listening socket now, for good measure */
1202 close(self->listen_socket);
1203 self->listen_socket = -1;
1205 /* read from the socket until EOF or all of the data is read. We try to
1206 * read one extra byte - if we get it, then upstream sent too much data */
1207 buf = g_malloc(TEST_XFER_SIZE+1);
1208 bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1209 g_assert(bytes == TEST_XFER_SIZE);
1212 /* we're at EOF, so verify we got the right bytes */
1213 g_assert(bytes == TEST_XFER_SIZE);
1214 if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1215 g_critical("data entering XferDestListen does not match");
1217 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1223 dest_listen_setup_impl(
1226 XferDestListen *self = XFER_DEST_LISTEN(elt);
1227 sockaddr_union addr;
1228 DirectTCPAddr *addrs;
1232 /* set up self->listen_socket and set elt->input_listen_addrs */
1233 sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
1235 error("socket(): %s", strerror(errno));
1237 if (listen(sock, 1) < 0)
1238 error("listen(): %s", strerror(errno));
1241 if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
1242 error("getsockname(): %s", strerror(errno));
1243 g_assert(SU_GET_FAMILY(&addr) == AF_INET);
1245 addrs = g_new0(DirectTCPAddr, 2);
1246 addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
1247 addrs[0].port = SU_GET_PORT(&addr);
1248 elt->input_listen_addrs = addrs;
1254 dest_listen_start_impl(
1257 XferDestListen *self = XFER_DEST_LISTEN(elt);
1259 simpleprng_seed(&self->prng, RANDOM_SEED);
1261 self->thread = g_thread_create(dest_listen_thread, (gpointer)self, FALSE, NULL);
1267 dest_listen_class_init(
1268 XferDestListenClass * klass)
1270 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1271 static xfer_element_mech_pair_t mech_pairs[] = {
1272 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 1, 1},
1273 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1276 xec->setup = dest_listen_setup_impl;
1277 xec->start = dest_listen_start_impl;
1278 xec->mech_pairs = mech_pairs;
1282 xfer_dest_listen_get_type (void)
1284 static GType type = 0;
1286 if G_UNLIKELY(type == 0) {
1287 static const GTypeInfo info = {
1288 sizeof (XferDestListenClass),
1289 (GBaseInitFunc) NULL,
1290 (GBaseFinalizeFunc) NULL,
1291 (GClassInitFunc) dest_listen_class_init,
1292 (GClassFinalizeFunc) NULL,
1293 NULL /* class_data */,
1294 sizeof (XferDestListen),
1295 0 /* n_preallocs */,
1296 (GInstanceInitFunc) NULL,
1300 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestListen", &info, 0);
1308 static GType xfer_dest_connect_get_type(void);
1309 #define XFER_DEST_CONNECT_TYPE (xfer_dest_connect_get_type())
1310 #define XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect)
1311 #define XFER_DEST_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect const)
1312 #define XFER_DEST_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_connect_get_type(), XferDestConnectClass)
1313 #define IS_XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_connect_get_type ())
1314 #define XFER_DEST_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_connect_get_type(), XferDestConnectClass)
1316 typedef struct XferDestConnect {
1317 XferElement __parent__;
1322 simpleprng_state_t prng;
1326 XferElementClass __parent__;
1327 } XferDestConnectClass;
1330 dest_connect_thread(
1333 XferDestConnect *self = XFER_DEST_CONNECT(data);
1334 XferElement *elt = XFER_ELEMENT(self);
1335 DirectTCPAddr *addrs;
1336 sockaddr_union addr;
1341 /* set up the sockaddr -- IPv4 only */
1342 SU_INIT(&addr, AF_INET);
1343 addrs = elt->upstream->output_listen_addrs;
1344 g_assert(addrs != NULL);
1345 SU_SET_PORT(&addr, addrs->port);
1346 ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
1348 tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
1349 sock = socket(AF_INET, SOCK_STREAM, 0);
1351 error("socket(): %s", strerror(errno));
1353 if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
1354 error("connect(): %s", strerror(errno));
1357 tu_dbg("connected to %s\n", str_sockaddr(&addr));
1359 /* read from the socket until EOF or all of the data is read. We try to
1360 * read one extra byte - if we get it, then upstream sent too much data */
1361 buf = g_malloc(TEST_XFER_SIZE+1);
1362 bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1363 g_assert(bytes == TEST_XFER_SIZE);
1366 /* we're at EOF, so verify we got the right bytes */
1367 g_assert(bytes == TEST_XFER_SIZE);
1368 if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1369 g_critical("data entering XferDestConnect does not match");
1371 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1377 dest_connect_start_impl(
1380 XferDestConnect *self = XFER_DEST_CONNECT(elt);
1382 simpleprng_seed(&self->prng, RANDOM_SEED);
1384 self->thread = g_thread_create(dest_connect_thread, (gpointer)self, FALSE, NULL);
1390 dest_connect_class_init(
1391 XferDestConnectClass * klass)
1393 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1394 static xfer_element_mech_pair_t mech_pairs[] = {
1395 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, 1, 1},
1396 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1399 xec->start = dest_connect_start_impl;
1400 xec->mech_pairs = mech_pairs;
1404 xfer_dest_connect_get_type (void)
1406 static GType type = 0;
1408 if G_UNLIKELY(type == 0) {
1409 static const GTypeInfo info = {
1410 sizeof (XferDestConnectClass),
1411 (GBaseInitFunc) NULL,
1412 (GBaseFinalizeFunc) NULL,
1413 (GClassInitFunc) dest_connect_class_init,
1414 (GClassFinalizeFunc) NULL,
1415 NULL /* class_data */,
1416 sizeof (XferDestConnect),
1417 0 /* n_preallocs */,
1418 (GInstanceInitFunc) NULL,
1422 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestConnect", &info, 0);
1434 test_xfer_generic_callback(
1435 gpointer data G_GNUC_UNUSED,
1439 tu_dbg("Received message %s\n", xmsg_repr(msg));
1441 switch (msg->type) {
1444 if (xfer->status == XFER_DONE) {
1445 tu_dbg("all elements are done!\n");
1446 g_main_loop_quit(default_main_loop());
1456 * Run a simple transfer with some xor filters
1460 test_xfer_simple(void)
1464 XferElement *elements[] = {
1465 xfer_source_random(100*1024, RANDOM_SEED),
1466 xfer_filter_xor('d'),
1467 xfer_filter_xor('d'),
1468 xfer_dest_null(RANDOM_SEED),
1471 Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1472 src = xfer_get_source(xfer);
1473 g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1474 g_source_attach(src, NULL);
1475 tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1477 /* unreference the elements */
1478 for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1479 g_object_unref(elements[i]);
1480 g_assert(G_OBJECT(elements[i])->ref_count == 1);
1486 g_main_loop_run(default_main_loop());
1487 g_assert(xfer->status == XFER_DONE);
1495 * Run a transfer between two files, with or without filters
1499 test_xfer_files(gboolean add_filters)
1504 char *in_filename = __FILE__;
1505 char *out_filename = "xfer-test.tmp"; /* current directory is writeable */
1508 XferElement *elements[4];
1510 rfd = open(in_filename, O_RDONLY, 0);
1512 g_critical("Could not open '%s': %s", in_filename, strerror(errno));
1514 wfd = open(out_filename, O_WRONLY|O_CREAT, 0777);
1516 g_critical("Could not open '%s': %s", out_filename, strerror(errno));
1519 elements[elts++] = xfer_source_fd(rfd);
1521 elements[elts++] = xfer_filter_xor(0xab);
1522 elements[elts++] = xfer_filter_xor(0xab);
1524 elements[elts++] = xfer_dest_fd(wfd);
1526 xfer = xfer_new(elements, elts);
1527 src = xfer_get_source(xfer);
1528 g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1529 g_source_attach(src, NULL);
1530 tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1532 /* unreference the elements */
1533 for (i = 0; i < elts; i++) {
1534 g_object_unref(elements[i]);
1535 g_assert(G_OBJECT(elements[i])->ref_count == 1);
1541 g_main_loop_run(default_main_loop());
1542 g_assert(xfer->status == XFER_DONE);
1546 unlink(out_filename); /* ignore any errors */
1552 test_xfer_files_simple(void)
1554 return test_xfer_files(FALSE);
1558 test_xfer_files_filter(void)
1560 return test_xfer_files(TRUE);
1564 * test each possible combination of source and destination mechansim
1569 XferElement *source,
1574 XferElement *elements[] = { source, dest };
1576 Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1577 src = xfer_get_source(xfer);
1578 g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1579 g_source_attach(src, NULL);
1581 /* unreference the elements */
1582 for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1583 g_object_unref(elements[i]);
1584 g_assert(G_OBJECT(elements[i])->ref_count == 1);
1590 g_main_loop_run(default_main_loop());
1591 g_assert(xfer->status == XFER_DONE);
1598 #define make_test_glue(n, s, d) static int n(void) \
1600 return test_glue_combo((XferElement *)g_object_new(s, NULL), \
1601 (XferElement *)g_object_new(d, NULL)); \
1603 make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE)
1604 make_test_glue(test_glue_READFD_WRITEFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1605 make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE)
1606 make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE)
1607 make_test_glue(test_glue_READFD_LISTEN, XFER_SOURCE_READFD_TYPE, XFER_DEST_LISTEN_TYPE)
1608 make_test_glue(test_glue_READFD_CONNECT, XFER_SOURCE_READFD_TYPE, XFER_DEST_CONNECT_TYPE)
1609 make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE)
1610 make_test_glue(test_glue_WRITEFD_WRITEFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1611 make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE)
1612 make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE)
1613 make_test_glue(test_glue_WRITEFD_LISTEN, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_LISTEN_TYPE)
1614 make_test_glue(test_glue_WRITEFD_CONNECT, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_CONNECT_TYPE)
1615 make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE)
1616 make_test_glue(test_glue_PUSH_WRITEFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
1617 make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE)
1618 make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE)
1619 make_test_glue(test_glue_PUSH_LISTEN, XFER_SOURCE_PUSH_TYPE, XFER_DEST_LISTEN_TYPE)
1620 make_test_glue(test_glue_PUSH_CONNECT, XFER_SOURCE_PUSH_TYPE, XFER_DEST_CONNECT_TYPE)
1621 make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE)
1622 make_test_glue(test_glue_PULL_WRITEFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
1623 make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE)
1624 make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE)
1625 make_test_glue(test_glue_PULL_LISTEN, XFER_SOURCE_PULL_TYPE, XFER_DEST_LISTEN_TYPE)
1626 make_test_glue(test_glue_PULL_CONNECT, XFER_SOURCE_PULL_TYPE, XFER_DEST_CONNECT_TYPE)
1627 make_test_glue(test_glue_LISTEN_READFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_READFD_TYPE)
1628 make_test_glue(test_glue_LISTEN_WRITEFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_WRITEFD_TYPE)
1629 make_test_glue(test_glue_LISTEN_PUSH, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PUSH_TYPE)
1630 make_test_glue(test_glue_LISTEN_PULL, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PULL_TYPE)
1631 make_test_glue(test_glue_LISTEN_LISTEN, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_LISTEN_TYPE)
1632 make_test_glue(test_glue_LISTEN_CONNECT, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_CONNECT_TYPE)
1633 make_test_glue(test_glue_CONNECT_READFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_READFD_TYPE)
1634 make_test_glue(test_glue_CONNECT_WRITEFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_WRITEFD_TYPE)
1635 make_test_glue(test_glue_CONNECT_PUSH, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PUSH_TYPE)
1636 make_test_glue(test_glue_CONNECT_PULL, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PULL_TYPE)
1637 make_test_glue(test_glue_CONNECT_LISTEN, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_LISTEN_TYPE)
1638 make_test_glue(test_glue_CONNECT_CONNECT, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_CONNECT_TYPE)
1645 main(int argc, char **argv)
1647 static TestUtilsTest tests[] = {
1648 TU_TEST(test_xfer_simple, 90),
1649 TU_TEST(test_xfer_files_simple, 90),
1650 TU_TEST(test_xfer_files_filter, 90),
1651 TU_TEST(test_glue_READFD_READFD, 90),
1652 TU_TEST(test_glue_READFD_WRITEFD, 90),
1653 TU_TEST(test_glue_READFD_PUSH, 90),
1654 TU_TEST(test_glue_READFD_PULL, 90),
1655 TU_TEST(test_glue_READFD_LISTEN, 90),
1656 TU_TEST(test_glue_READFD_CONNECT, 90),
1657 TU_TEST(test_glue_WRITEFD_READFD, 90),
1658 TU_TEST(test_glue_WRITEFD_WRITEFD, 90),
1659 TU_TEST(test_glue_WRITEFD_PUSH, 90),
1660 TU_TEST(test_glue_WRITEFD_PULL, 90),
1661 TU_TEST(test_glue_WRITEFD_LISTEN, 90),
1662 TU_TEST(test_glue_WRITEFD_CONNECT, 90),
1663 TU_TEST(test_glue_PUSH_READFD, 90),
1664 TU_TEST(test_glue_PUSH_WRITEFD, 90),
1665 TU_TEST(test_glue_PUSH_PUSH, 90),
1666 TU_TEST(test_glue_PUSH_PULL, 90),
1667 TU_TEST(test_glue_PUSH_LISTEN, 90),
1668 TU_TEST(test_glue_PUSH_CONNECT, 90),
1669 TU_TEST(test_glue_PULL_READFD, 90),
1670 TU_TEST(test_glue_PULL_WRITEFD, 90),
1671 TU_TEST(test_glue_PULL_PUSH, 90),
1672 TU_TEST(test_glue_PULL_PULL, 90),
1673 TU_TEST(test_glue_PULL_LISTEN, 90),
1674 TU_TEST(test_glue_PULL_CONNECT, 90),
1675 TU_TEST(test_glue_LISTEN_READFD, 90),
1676 TU_TEST(test_glue_LISTEN_WRITEFD, 90),
1677 TU_TEST(test_glue_LISTEN_PUSH, 90),
1678 TU_TEST(test_glue_LISTEN_PULL, 90),
1679 TU_TEST(test_glue_LISTEN_LISTEN, 90),
1680 TU_TEST(test_glue_LISTEN_CONNECT, 90),
1681 TU_TEST(test_glue_CONNECT_READFD, 90),
1682 TU_TEST(test_glue_CONNECT_WRITEFD, 90),
1683 TU_TEST(test_glue_CONNECT_PUSH, 90),
1684 TU_TEST(test_glue_CONNECT_PULL, 90),
1685 TU_TEST(test_glue_CONNECT_LISTEN, 90),
1686 TU_TEST(test_glue_CONNECT_CONNECT, 90),
1692 return testutils_run_tests(argc, argv, tests);