2 * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 as published
6 * by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 * Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
18 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20 * Author: Dustin J. Mitchell <dustin@zmanda.com>
25 #include "glib-util.h"
26 #include "testutils.h"
28 #include "simpleprng.h"
29 #include "sockaddr-util.h"
31 /* Having tests repeat exactly is an advantage, so we use a hard-coded
33 #define RANDOM_SEED 0xf00d
36 * XferElement subclasses
38 * This file defines a few "private" element classes that each have only one
39 * mechanism pair. These classes are then used to test all of the possible
40 * combinations of glue.
43 /* constants to determine the total amount of data to be transfered; EXTRA is
44 * to test out partial-block handling; it should be prime. */
45 #define TEST_BLOCK_SIZE 32768
46 #define TEST_BLOCK_COUNT 10
47 #define TEST_BLOCK_EXTRA 97
48 #define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA)
52 static GType xfer_source_readfd_get_type(void);
53 #define XFER_SOURCE_READFD_TYPE (xfer_source_readfd_get_type())
54 #define XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd)
55 #define XFER_SOURCE_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd const)
56 #define XFER_SOURCE_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_readfd_get_type(), XferSourceReadfdClass)
57 #define IS_XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_readfd_get_type ())
58 #define XFER_SOURCE_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_readfd_get_type(), XferSourceReadfdClass)
60 typedef struct XferSourceReadfd {
61 XferElement __parent__;
65 simpleprng_state_t prng;
69 XferElementClass __parent__;
70 } XferSourceReadfdClass;
76 XferSourceReadfd *self = XFER_SOURCE_READFD(data);
77 char buf[TEST_XFER_SIZE];
78 int fd = self->write_fd;
80 simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
82 if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
83 error("error in full_write(): %s", strerror(errno));
88 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
94 source_readfd_setup_impl(
97 XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
100 simpleprng_seed(&self->prng, RANDOM_SEED);
103 g_critical("Error from pipe(): %s", strerror(errno));
105 self->write_fd = p[1];
106 g_assert(xfer_element_swap_output_fd(elt, p[0]) == -1);
112 source_readfd_start_impl(
115 XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
116 self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL);
122 source_readfd_class_init(
123 XferSourceReadfdClass * klass)
125 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
126 static xfer_element_mech_pair_t mech_pairs[] = {
127 { XFER_MECH_NONE, XFER_MECH_READFD, 1, 1},
128 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
131 xec->setup = source_readfd_setup_impl;
132 xec->start = source_readfd_start_impl;
133 xec->mech_pairs = mech_pairs;
137 xfer_source_readfd_get_type (void)
139 static GType type = 0;
141 if G_UNLIKELY(type == 0) {
142 static const GTypeInfo info = {
143 sizeof (XferSourceReadfdClass),
144 (GBaseInitFunc) NULL,
145 (GBaseFinalizeFunc) NULL,
146 (GClassInitFunc) source_readfd_class_init,
147 (GClassFinalizeFunc) NULL,
148 NULL /* class_data */,
149 sizeof (XferSourceReadfd),
151 (GInstanceInitFunc) NULL,
155 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceReadfd", &info, 0);
163 static GType xfer_source_writefd_get_type(void);
164 #define XFER_SOURCE_WRITEFD_TYPE (xfer_source_writefd_get_type())
165 #define XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd)
166 #define XFER_SOURCE_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd const)
167 #define XFER_SOURCE_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_writefd_get_type(), XferSourceWritefdClass)
168 #define IS_XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_writefd_get_type ())
169 #define XFER_SOURCE_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_writefd_get_type(), XferSourceWritefdClass)
171 typedef struct XferSourceWritefd {
172 XferElement __parent__;
175 simpleprng_state_t prng;
179 XferElementClass __parent__;
180 } XferSourceWritefdClass;
183 source_writefd_thread(
186 XferSourceWritefd *self = XFER_SOURCE_WRITEFD(data);
187 XferElement *elt = XFER_ELEMENT(data);
188 char buf[TEST_XFER_SIZE];
189 int fd = xfer_element_swap_input_fd(elt->downstream, -1);
191 /* this shouldn't happen, although non-test elements handle it gracefully */
194 simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
196 if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
197 error("error in full_write(): %s", strerror(errno));
202 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
208 source_writefd_start_impl(
211 XferSourceWritefd *self = XFER_SOURCE_WRITEFD(elt);
213 simpleprng_seed(&self->prng, RANDOM_SEED);
215 self->thread = g_thread_create(source_writefd_thread, (gpointer)self, FALSE, NULL);
221 source_writefd_class_init(
222 XferSourceWritefdClass * klass)
224 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
225 static xfer_element_mech_pair_t mech_pairs[] = {
226 { XFER_MECH_NONE, XFER_MECH_WRITEFD, 1, 1},
227 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
230 xec->start = source_writefd_start_impl;
231 xec->mech_pairs = mech_pairs;
235 xfer_source_writefd_get_type (void)
237 static GType type = 0;
239 if G_UNLIKELY(type == 0) {
240 static const GTypeInfo info = {
241 sizeof (XferSourceWritefdClass),
242 (GBaseInitFunc) NULL,
243 (GBaseFinalizeFunc) NULL,
244 (GClassInitFunc) source_writefd_class_init,
245 (GClassFinalizeFunc) NULL,
246 NULL /* class_data */,
247 sizeof (XferSourceWritefd),
249 (GInstanceInitFunc) NULL,
253 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceWritefd", &info, 0);
261 static GType xfer_source_push_get_type(void);
262 #define XFER_SOURCE_PUSH_TYPE (xfer_source_push_get_type())
263 #define XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush)
264 #define XFER_SOURCE_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush const)
265 #define XFER_SOURCE_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_push_get_type(), XferSourcePushClass)
266 #define IS_XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_push_get_type ())
267 #define XFER_SOURCE_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_push_get_type(), XferSourcePushClass)
269 typedef struct XferSourcePush {
270 XferElement __parent__;
273 simpleprng_state_t prng;
277 XferElementClass __parent__;
278 } XferSourcePushClass;
284 XferSourcePush *self = XFER_SOURCE_PUSH(data);
288 for (i = 0; i < TEST_BLOCK_COUNT; i++) {
289 buf = g_malloc(TEST_BLOCK_SIZE);
290 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
291 xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_SIZE);
295 /* send a smaller block */
296 buf = g_malloc(TEST_BLOCK_EXTRA);
297 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
298 xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_EXTRA);
302 xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, NULL, 0);
304 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
310 source_push_start_impl(
313 XferSourcePush *self = XFER_SOURCE_PUSH(elt);
315 simpleprng_seed(&self->prng, RANDOM_SEED);
317 self->thread = g_thread_create(source_push_thread, (gpointer)self, FALSE, NULL);
323 source_push_class_init(
324 XferSourcePushClass * klass)
326 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
327 static xfer_element_mech_pair_t mech_pairs[] = {
328 { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, 1, 1},
329 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
332 xec->start = source_push_start_impl;
333 xec->mech_pairs = mech_pairs;
337 xfer_source_push_get_type (void)
339 static GType type = 0;
341 if G_UNLIKELY(type == 0) {
342 static const GTypeInfo info = {
343 sizeof (XferSourcePushClass),
344 (GBaseInitFunc) NULL,
345 (GBaseFinalizeFunc) NULL,
346 (GClassInitFunc) source_push_class_init,
347 (GClassFinalizeFunc) NULL,
348 NULL /* class_data */,
349 sizeof (XferSourcePush),
351 (GInstanceInitFunc) NULL,
355 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePush", &info, 0);
363 static GType xfer_source_pull_get_type(void);
364 #define XFER_SOURCE_PULL_TYPE (xfer_source_pull_get_type())
365 #define XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull)
366 #define XFER_SOURCE_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull const)
367 #define XFER_SOURCE_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_pull_get_type(), XferSourcePullClass)
368 #define IS_XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_pull_get_type ())
369 #define XFER_SOURCE_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_pull_get_type(), XferSourcePullClass)
371 typedef struct XferSourcePull {
372 XferElement __parent__;
376 simpleprng_state_t prng;
380 XferElementClass __parent__;
381 } XferSourcePullClass;
384 source_pull_pull_buffer_impl(
388 XferSourcePull *self = XFER_SOURCE_PULL(elt);
392 if (self->nbuffers > TEST_BLOCK_COUNT) {
396 bufsiz = (self->nbuffers != TEST_BLOCK_COUNT)? TEST_BLOCK_SIZE : TEST_BLOCK_EXTRA;
400 buf = g_malloc(bufsiz);
401 simpleprng_fill_buffer(&self->prng, buf, bufsiz);
407 source_pull_setup_impl(
410 XferSourcePull *self = XFER_SOURCE_PULL(elt);
412 simpleprng_seed(&self->prng, RANDOM_SEED);
418 source_pull_class_init(
419 XferSourcePullClass * klass)
421 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
422 static xfer_element_mech_pair_t mech_pairs[] = {
423 { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
424 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
427 xec->pull_buffer = source_pull_pull_buffer_impl;
428 xec->setup = source_pull_setup_impl;
429 xec->mech_pairs = mech_pairs;
433 xfer_source_pull_get_type (void)
435 static GType type = 0;
437 if G_UNLIKELY(type == 0) {
438 static const GTypeInfo info = {
439 sizeof (XferSourcePullClass),
440 (GBaseInitFunc) NULL,
441 (GBaseFinalizeFunc) NULL,
442 (GClassInitFunc) source_pull_class_init,
443 (GClassFinalizeFunc) NULL,
444 NULL /* class_data */,
445 sizeof (XferSourcePull),
447 (GInstanceInitFunc) NULL,
451 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePull", &info, 0);
459 static GType xfer_source_listen_get_type(void);
460 #define XFER_SOURCE_LISTEN_TYPE (xfer_source_listen_get_type())
461 #define XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen)
462 #define XFER_SOURCE_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen const)
463 #define XFER_SOURCE_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_listen_get_type(), XferSourceListenClass)
464 #define IS_XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_listen_get_type ())
465 #define XFER_SOURCE_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_listen_get_type(), XferSourceListenClass)
467 typedef struct XferSourceListen {
468 XferElement __parent__;
471 simpleprng_state_t prng;
475 XferElementClass __parent__;
476 } XferSourceListenClass;
479 source_listen_thread(
482 XferSourceListen *self = XFER_SOURCE_LISTEN(data);
483 XferElement *elt = XFER_ELEMENT(self);
484 DirectTCPAddr *addrs;
490 /* set up the sockaddr -- IPv4 only */
491 SU_INIT(&addr, AF_INET);
492 addrs = elt->downstream->input_listen_addrs;
493 g_assert(addrs != NULL);
494 SU_SET_PORT(&addr, addrs->port);
495 ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
497 tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
498 sock = socket(AF_INET, SOCK_STREAM, 0);
500 error("socket(): %s", strerror(errno));
502 if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
503 error("connect(): %s", strerror(errno));
506 tu_dbg("connected to %s\n", str_sockaddr(&addr));
508 buf = g_malloc(TEST_BLOCK_SIZE);
509 for (i = 0; i < TEST_BLOCK_COUNT; i++) {
510 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
511 if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
512 error("error in full_write(): %s", strerror(errno));
516 /* send a smaller block */
517 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
518 if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
519 error("error in full_write(): %s", strerror(errno));
523 /* send EOF by closing the socket */
526 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
532 source_listen_start_impl(
535 XferSourceListen *self = XFER_SOURCE_LISTEN(elt);
537 simpleprng_seed(&self->prng, RANDOM_SEED);
539 self->thread = g_thread_create(source_listen_thread, (gpointer)self, FALSE, NULL);
545 source_listen_class_init(
546 XferSourceListenClass * klass)
548 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
549 static xfer_element_mech_pair_t mech_pairs[] = {
550 { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 1, 0},
551 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
554 xec->start = source_listen_start_impl;
555 xec->mech_pairs = mech_pairs;
559 xfer_source_listen_get_type (void)
561 static GType type = 0;
563 if G_UNLIKELY(type == 0) {
564 static const GTypeInfo info = {
565 sizeof (XferSourceListenClass),
566 (GBaseInitFunc) NULL,
567 (GBaseFinalizeFunc) NULL,
568 (GClassInitFunc) source_listen_class_init,
569 (GClassFinalizeFunc) NULL,
570 NULL /* class_data */,
571 sizeof (XferSourceListen),
573 (GInstanceInitFunc) NULL,
577 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceListen", &info, 0);
585 static GType xfer_source_connect_get_type(void);
586 #define XFER_SOURCE_CONNECT_TYPE (xfer_source_connect_get_type())
587 #define XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect)
588 #define XFER_SOURCE_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect const)
589 #define XFER_SOURCE_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_connect_get_type(), XferSourceConnectClass)
590 #define IS_XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_connect_get_type ())
591 #define XFER_SOURCE_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_connect_get_type(), XferSourceConnectClass)
593 typedef struct XferSourceConnect {
594 XferElement __parent__;
599 simpleprng_state_t prng;
603 XferElementClass __parent__;
604 } XferSourceConnectClass;
607 source_connect_thread(
610 XferSourceConnect *self = XFER_SOURCE_CONNECT(data);
615 g_assert(self->listen_socket != -1);
617 if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
618 xfer_cancel_with_error(XFER_ELEMENT(self),
619 _("Error accepting incoming connection: %s"), strerror(errno));
620 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
624 /* close the listening socket now, for good measure */
625 close(self->listen_socket);
626 self->listen_socket = -1;
628 tu_dbg("connection accepted\n");
630 buf = g_malloc(TEST_BLOCK_SIZE);
631 for (i = 0; i < TEST_BLOCK_COUNT; i++) {
632 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
633 if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
634 error("error in full_write(): %s", strerror(errno));
638 /* send a smaller block */
639 simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
640 if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
641 error("error in full_write(): %s", strerror(errno));
645 /* send EOF by closing the socket */
648 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
654 source_connect_setup_impl(
657 XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
659 DirectTCPAddr *addrs;
663 /* set up self->listen_socket and set elt->output_listen_addrs */
664 sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
666 error("socket(): %s", strerror(errno));
668 if (listen(sock, 1) < 0)
669 error("listen(): %s", strerror(errno));
672 if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
673 error("getsockname(): %s", strerror(errno));
674 g_assert(SU_GET_FAMILY(&addr) == AF_INET);
676 addrs = g_new0(DirectTCPAddr, 2);
677 addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
678 addrs[0].port = SU_GET_PORT(&addr);
679 elt->output_listen_addrs = addrs;
685 source_connect_start_impl(
688 XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
690 simpleprng_seed(&self->prng, RANDOM_SEED);
692 self->thread = g_thread_create(source_connect_thread, (gpointer)self, FALSE, NULL);
698 source_connect_class_init(
699 XferSourceConnectClass * klass)
701 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
702 static xfer_element_mech_pair_t mech_pairs[] = {
703 { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 1, 0},
704 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
707 xec->setup = source_connect_setup_impl;
708 xec->start = source_connect_start_impl;
709 xec->mech_pairs = mech_pairs;
713 xfer_source_connect_get_type (void)
715 static GType type = 0;
717 if G_UNLIKELY(type == 0) {
718 static const GTypeInfo info = {
719 sizeof (XferSourceConnectClass),
720 (GBaseInitFunc) NULL,
721 (GBaseFinalizeFunc) NULL,
722 (GClassInitFunc) source_connect_class_init,
723 (GClassFinalizeFunc) NULL,
724 NULL /* class_data */,
725 sizeof (XferSourceConnect),
727 (GInstanceInitFunc) NULL,
731 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceConnect", &info, 0);
739 static GType xfer_dest_readfd_get_type(void);
740 #define XFER_DEST_READFD_TYPE (xfer_dest_readfd_get_type())
741 #define XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd)
742 #define XFER_DEST_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd const)
743 #define XFER_DEST_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_readfd_get_type(), XferDestReadfdClass)
744 #define IS_XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_readfd_get_type ())
745 #define XFER_DEST_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_readfd_get_type(), XferDestReadfdClass)
747 typedef struct XferDestReadfd {
748 XferElement __parent__;
751 simpleprng_state_t prng;
755 XferElementClass __parent__;
756 } XferDestReadfdClass;
762 XferDestReadfd *self = XFER_DEST_READFD(data);
763 XferElement *elt = XFER_ELEMENT(data);
764 char buf[TEST_XFER_SIZE];
766 int fd = xfer_element_swap_output_fd(elt->upstream, -1);
768 /* this shouldn't happen, although non-test elements handle it gracefully */
771 remaining = sizeof(buf);
774 if ((nread = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
775 error("error in read(): %s", strerror(errno));
780 /* we should be at EOF here */
781 if (read(fd, buf, 10) != 0)
782 g_critical("too much data entering XferDestReadfd");
784 if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
785 g_critical("data entering XferDestReadfd does not match");
789 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
795 dest_readfd_start_impl(
798 XferDestReadfd *self = XFER_DEST_READFD(elt);
800 simpleprng_seed(&self->prng, RANDOM_SEED);
802 self->thread = g_thread_create(dest_readfd_thread, (gpointer)self, FALSE, NULL);
808 dest_readfd_class_init(
809 XferDestReadfdClass * klass)
811 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
812 static xfer_element_mech_pair_t mech_pairs[] = {
813 { XFER_MECH_READFD, XFER_MECH_NONE, 1, 1},
814 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
817 xec->start = dest_readfd_start_impl;
818 xec->mech_pairs = mech_pairs;
822 xfer_dest_readfd_get_type (void)
824 static GType type = 0;
826 if G_UNLIKELY(type == 0) {
827 static const GTypeInfo info = {
828 sizeof (XferDestReadfdClass),
829 (GBaseInitFunc) NULL,
830 (GBaseFinalizeFunc) NULL,
831 (GClassInitFunc) dest_readfd_class_init,
832 (GClassFinalizeFunc) NULL,
833 NULL /* class_data */,
834 sizeof (XferDestReadfd),
836 (GInstanceInitFunc) NULL,
840 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestReadfd", &info, 0);
848 static GType xfer_dest_writefd_get_type(void);
849 #define XFER_DEST_WRITEFD_TYPE (xfer_dest_writefd_get_type())
850 #define XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd)
851 #define XFER_DEST_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd const)
852 #define XFER_DEST_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_writefd_get_type(), XferDestWritefdClass)
853 #define IS_XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_writefd_get_type ())
854 #define XFER_DEST_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_writefd_get_type(), XferDestWritefdClass)
856 typedef struct XferDestWritefd {
857 XferElement __parent__;
861 simpleprng_state_t prng;
865 XferElementClass __parent__;
866 } XferDestWritefdClass;
872 XferDestWritefd *self = XFER_DEST_WRITEFD(data);
873 char buf[TEST_XFER_SIZE];
875 int fd = self->read_fd;
877 remaining = sizeof(buf);
880 if ((nwrite = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
881 error("error in read(): %s", strerror(errno));
886 /* we should be at EOF here */
887 if (read(fd, buf, 10) != 0)
888 g_critical("too much data entering XferDestWritefd");
890 if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
891 g_critical("data entering XferDestWritefd does not match");
895 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
901 dest_writefd_setup_impl(
904 XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
907 simpleprng_seed(&self->prng, RANDOM_SEED);
910 g_critical("Error from pipe(): %s", strerror(errno));
912 self->read_fd = p[0];
913 g_assert(xfer_element_swap_input_fd(elt, p[1]) == -1);
919 dest_writefd_start_impl(
922 XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
923 self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL);
929 dest_writefd_class_init(
930 XferDestWritefdClass * klass)
932 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
933 static xfer_element_mech_pair_t mech_pairs[] = {
934 { XFER_MECH_WRITEFD, XFER_MECH_NONE, 1, 1},
935 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
938 xec->setup = dest_writefd_setup_impl;
939 xec->start = dest_writefd_start_impl;
940 xec->mech_pairs = mech_pairs;
944 xfer_dest_writefd_get_type (void)
946 static GType type = 0;
948 if G_UNLIKELY(type == 0) {
949 static const GTypeInfo info = {
950 sizeof (XferDestWritefdClass),
951 (GBaseInitFunc) NULL,
952 (GBaseFinalizeFunc) NULL,
953 (GClassInitFunc) dest_writefd_class_init,
954 (GClassFinalizeFunc) NULL,
955 NULL /* class_data */,
956 sizeof (XferDestWritefd),
958 (GInstanceInitFunc) NULL,
962 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestWritefd", &info, 0);
970 static GType xfer_dest_push_get_type(void);
971 #define XFER_DEST_PUSH_TYPE (xfer_dest_push_get_type())
972 #define XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush)
973 #define XFER_DEST_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush const)
974 #define XFER_DEST_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_push_get_type(), XferDestPushClass)
975 #define IS_XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_push_get_type ())
976 #define XFER_DEST_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_push_get_type(), XferDestPushClass)
978 typedef struct XferDestPush {
979 XferElement __parent__;
985 simpleprng_state_t prng;
989 XferElementClass __parent__;
993 dest_push_push_buffer_impl(
998 XferDestPush *self = XFER_DEST_PUSH(elt);
1001 /* if we're at EOF, verify we got the right bytes */
1002 g_assert(self->bufpos == TEST_XFER_SIZE);
1003 if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE))
1004 g_critical("data entering XferDestPush does not match");
1009 g_assert(self->bufpos + size <= TEST_XFER_SIZE);
1010 memcpy(self->buf + self->bufpos, buf, size);
1011 self->bufpos += size;
1015 dest_push_setup_impl(
1018 XferDestPush *self = XFER_DEST_PUSH(elt);
1020 self->buf = g_malloc(TEST_XFER_SIZE);
1021 simpleprng_seed(&self->prng, RANDOM_SEED);
1027 dest_push_class_init(
1028 XferDestPushClass * klass)
1030 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1031 static xfer_element_mech_pair_t mech_pairs[] = {
1032 { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 0},
1033 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1036 xec->push_buffer = dest_push_push_buffer_impl;
1037 xec->setup = dest_push_setup_impl;
1038 xec->mech_pairs = mech_pairs;
1042 xfer_dest_push_get_type (void)
1044 static GType type = 0;
1046 if G_UNLIKELY(type == 0) {
1047 static const GTypeInfo info = {
1048 sizeof (XferDestPushClass),
1049 (GBaseInitFunc) NULL,
1050 (GBaseFinalizeFunc) NULL,
1051 (GClassInitFunc) dest_push_class_init,
1052 (GClassFinalizeFunc) NULL,
1053 NULL /* class_data */,
1054 sizeof (XferDestPush),
1055 0 /* n_preallocs */,
1056 (GInstanceInitFunc) NULL,
1060 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPush", &info, 0);
1068 static GType xfer_dest_pull_get_type(void);
1069 #define XFER_DEST_PULL_TYPE (xfer_dest_pull_get_type())
1070 #define XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull)
1071 #define XFER_DEST_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull const)
1072 #define XFER_DEST_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_pull_get_type(), XferDestPullClass)
1073 #define IS_XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_pull_get_type ())
1074 #define XFER_DEST_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_pull_get_type(), XferDestPullClass)
1076 typedef struct XferDestPull {
1077 XferElement __parent__;
1080 simpleprng_state_t prng;
1084 XferElementClass __parent__;
1085 } XferDestPullClass;
1091 XferDestPull *self = XFER_DEST_PULL(data);
1092 char fullbuf[TEST_XFER_SIZE];
1097 while ((buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size))) {
1098 g_assert(bufpos + size <= TEST_XFER_SIZE);
1099 memcpy(fullbuf + bufpos, buf, size);
1103 /* we're at EOF, so verify we got the right bytes */
1104 g_assert(bufpos == TEST_XFER_SIZE);
1105 if (!simpleprng_verify_buffer(&self->prng, fullbuf, TEST_XFER_SIZE))
1106 g_critical("data entering XferDestPull does not match");
1108 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1114 dest_pull_start_impl(
1117 XferDestPull *self = XFER_DEST_PULL(elt);
1119 simpleprng_seed(&self->prng, RANDOM_SEED);
1121 self->thread = g_thread_create(dest_pull_thread, (gpointer)self, FALSE, NULL);
1127 dest_pull_class_init(
1128 XferDestPullClass * klass)
1130 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1131 static xfer_element_mech_pair_t mech_pairs[] = {
1132 { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 1, 1},
1133 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1136 xec->start = dest_pull_start_impl;
1137 xec->mech_pairs = mech_pairs;
1141 xfer_dest_pull_get_type (void)
1143 static GType type = 0;
1145 if G_UNLIKELY(type == 0) {
1146 static const GTypeInfo info = {
1147 sizeof (XferDestPullClass),
1148 (GBaseInitFunc) NULL,
1149 (GBaseFinalizeFunc) NULL,
1150 (GClassInitFunc) dest_pull_class_init,
1151 (GClassFinalizeFunc) NULL,
1152 NULL /* class_data */,
1153 sizeof (XferDestPull),
1154 0 /* n_preallocs */,
1155 (GInstanceInitFunc) NULL,
1159 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPull", &info, 0);
1167 static GType xfer_dest_listen_get_type(void);
1168 #define XFER_DEST_LISTEN_TYPE (xfer_dest_listen_get_type())
1169 #define XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen)
1170 #define XFER_DEST_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen const)
1171 #define XFER_DEST_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_listen_get_type(), XferDestListenClass)
1172 #define IS_XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_listen_get_type ())
1173 #define XFER_DEST_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_listen_get_type(), XferDestListenClass)
1175 typedef struct XferDestListen {
1176 XferElement __parent__;
1181 simpleprng_state_t prng;
1185 XferElementClass __parent__;
1186 } XferDestListenClass;
1192 XferDestListen *self = XFER_DEST_LISTEN(data);
1197 g_assert(self->listen_socket != -1);
1199 if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
1200 xfer_cancel_with_error(XFER_ELEMENT(self),
1201 _("Error accepting incoming connection: %s"), strerror(errno));
1202 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1206 /* close the listening socket now, for good measure */
1207 close(self->listen_socket);
1208 self->listen_socket = -1;
1210 /* read from the socket until EOF or all of the data is read. We try to
1211 * read one extra byte - if we get it, then upstream sent too much data */
1212 buf = g_malloc(TEST_XFER_SIZE+1);
1213 bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1214 g_assert(bytes == TEST_XFER_SIZE);
1217 /* we're at EOF, so verify we got the right bytes */
1218 g_assert(bytes == TEST_XFER_SIZE);
1219 if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1220 g_critical("data entering XferDestListen does not match");
1222 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1228 dest_listen_setup_impl(
1231 XferDestListen *self = XFER_DEST_LISTEN(elt);
1232 sockaddr_union addr;
1233 DirectTCPAddr *addrs;
1237 /* set up self->listen_socket and set elt->input_listen_addrs */
1238 sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
1240 error("socket(): %s", strerror(errno));
1242 if (listen(sock, 1) < 0)
1243 error("listen(): %s", strerror(errno));
1246 if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
1247 error("getsockname(): %s", strerror(errno));
1248 g_assert(SU_GET_FAMILY(&addr) == AF_INET);
1250 addrs = g_new0(DirectTCPAddr, 2);
1251 addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
1252 addrs[0].port = SU_GET_PORT(&addr);
1253 elt->input_listen_addrs = addrs;
1259 dest_listen_start_impl(
1262 XferDestListen *self = XFER_DEST_LISTEN(elt);
1264 simpleprng_seed(&self->prng, RANDOM_SEED);
1266 self->thread = g_thread_create(dest_listen_thread, (gpointer)self, FALSE, NULL);
1272 dest_listen_class_init(
1273 XferDestListenClass * klass)
1275 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1276 static xfer_element_mech_pair_t mech_pairs[] = {
1277 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 1, 1},
1278 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1281 xec->setup = dest_listen_setup_impl;
1282 xec->start = dest_listen_start_impl;
1283 xec->mech_pairs = mech_pairs;
1287 xfer_dest_listen_get_type (void)
1289 static GType type = 0;
1291 if G_UNLIKELY(type == 0) {
1292 static const GTypeInfo info = {
1293 sizeof (XferDestListenClass),
1294 (GBaseInitFunc) NULL,
1295 (GBaseFinalizeFunc) NULL,
1296 (GClassInitFunc) dest_listen_class_init,
1297 (GClassFinalizeFunc) NULL,
1298 NULL /* class_data */,
1299 sizeof (XferDestListen),
1300 0 /* n_preallocs */,
1301 (GInstanceInitFunc) NULL,
1305 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestListen", &info, 0);
1313 static GType xfer_dest_connect_get_type(void);
1314 #define XFER_DEST_CONNECT_TYPE (xfer_dest_connect_get_type())
1315 #define XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect)
1316 #define XFER_DEST_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect const)
1317 #define XFER_DEST_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_connect_get_type(), XferDestConnectClass)
1318 #define IS_XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_connect_get_type ())
1319 #define XFER_DEST_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_connect_get_type(), XferDestConnectClass)
1321 typedef struct XferDestConnect {
1322 XferElement __parent__;
1327 simpleprng_state_t prng;
1331 XferElementClass __parent__;
1332 } XferDestConnectClass;
1335 dest_connect_thread(
1338 XferDestConnect *self = XFER_DEST_CONNECT(data);
1339 XferElement *elt = XFER_ELEMENT(self);
1340 DirectTCPAddr *addrs;
1341 sockaddr_union addr;
1346 /* set up the sockaddr -- IPv4 only */
1347 SU_INIT(&addr, AF_INET);
1348 addrs = elt->upstream->output_listen_addrs;
1349 g_assert(addrs != NULL);
1350 SU_SET_PORT(&addr, addrs->port);
1351 ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
1353 tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
1354 sock = socket(AF_INET, SOCK_STREAM, 0);
1356 error("socket(): %s", strerror(errno));
1358 if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
1359 error("connect(): %s", strerror(errno));
1362 tu_dbg("connected to %s\n", str_sockaddr(&addr));
1364 /* read from the socket until EOF or all of the data is read. We try to
1365 * read one extra byte - if we get it, then upstream sent too much data */
1366 buf = g_malloc(TEST_XFER_SIZE+1);
1367 bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
1368 g_assert(bytes == TEST_XFER_SIZE);
1371 /* we're at EOF, so verify we got the right bytes */
1372 g_assert(bytes == TEST_XFER_SIZE);
1373 if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
1374 g_critical("data entering XferDestConnect does not match");
1376 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1382 dest_connect_start_impl(
1385 XferDestConnect *self = XFER_DEST_CONNECT(elt);
1387 simpleprng_seed(&self->prng, RANDOM_SEED);
1389 self->thread = g_thread_create(dest_connect_thread, (gpointer)self, FALSE, NULL);
1395 dest_connect_class_init(
1396 XferDestConnectClass * klass)
1398 XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
1399 static xfer_element_mech_pair_t mech_pairs[] = {
1400 { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, 1, 1},
1401 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1404 xec->start = dest_connect_start_impl;
1405 xec->mech_pairs = mech_pairs;
1409 xfer_dest_connect_get_type (void)
1411 static GType type = 0;
1413 if G_UNLIKELY(type == 0) {
1414 static const GTypeInfo info = {
1415 sizeof (XferDestConnectClass),
1416 (GBaseInitFunc) NULL,
1417 (GBaseFinalizeFunc) NULL,
1418 (GClassInitFunc) dest_connect_class_init,
1419 (GClassFinalizeFunc) NULL,
1420 NULL /* class_data */,
1421 sizeof (XferDestConnect),
1422 0 /* n_preallocs */,
1423 (GInstanceInitFunc) NULL,
1427 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestConnect", &info, 0);
1439 test_xfer_generic_callback(
1440 gpointer data G_GNUC_UNUSED,
1444 tu_dbg("Received message %s\n", xmsg_repr(msg));
1446 switch (msg->type) {
1449 if (xfer->status == XFER_DONE) {
1450 tu_dbg("all elements are done!\n");
1451 g_main_loop_quit(default_main_loop());
1461 * Run a simple transfer with some xor filters
1465 test_xfer_simple(void)
1469 XferElement *elements[] = {
1470 xfer_source_random(100*1024, RANDOM_SEED),
1471 xfer_filter_xor('d'),
1472 xfer_filter_xor('d'),
1473 xfer_dest_null(RANDOM_SEED),
1476 Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1477 src = xfer_get_source(xfer);
1478 g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1479 g_source_attach(src, NULL);
1480 tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1482 /* unreference the elements */
1483 for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1484 g_object_unref(elements[i]);
1485 g_assert(G_OBJECT(elements[i])->ref_count == 1);
1491 g_main_loop_run(default_main_loop());
1492 g_assert(xfer->status == XFER_DONE);
1500 * Run a transfer between two files, with or without filters
1504 test_xfer_files(gboolean add_filters)
1509 char *in_filename = __FILE__;
1510 char *out_filename = "xfer-test.tmp"; /* current directory is writeable */
1513 XferElement *elements[4];
1515 rfd = open(in_filename, O_RDONLY, 0);
1517 g_critical("Could not open '%s': %s", in_filename, strerror(errno));
1519 wfd = open(out_filename, O_WRONLY|O_CREAT, 0777);
1521 g_critical("Could not open '%s': %s", out_filename, strerror(errno));
1524 elements[elts++] = xfer_source_fd(rfd);
1526 elements[elts++] = xfer_filter_xor(0xab);
1527 elements[elts++] = xfer_filter_xor(0xab);
1529 elements[elts++] = xfer_dest_fd(wfd);
1531 xfer = xfer_new(elements, elts);
1532 src = xfer_get_source(xfer);
1533 g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1534 g_source_attach(src, NULL);
1535 tu_dbg("Transfer: %s\n", xfer_repr(xfer));
1537 /* unreference the elements */
1538 for (i = 0; i < elts; i++) {
1539 g_object_unref(elements[i]);
1540 g_assert(G_OBJECT(elements[i])->ref_count == 1);
1546 g_main_loop_run(default_main_loop());
1547 g_assert(xfer->status == XFER_DONE);
1551 unlink(out_filename); /* ignore any errors */
1557 test_xfer_files_simple(void)
1559 return test_xfer_files(FALSE);
1563 test_xfer_files_filter(void)
1565 return test_xfer_files(TRUE);
1569 * test each possible combination of source and destination mechansim
1574 XferElement *source,
1579 XferElement *elements[] = { source, dest };
1581 Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
1582 src = xfer_get_source(xfer);
1583 g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
1584 g_source_attach(src, NULL);
1586 /* unreference the elements */
1587 for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
1588 g_object_unref(elements[i]);
1589 g_assert(G_OBJECT(elements[i])->ref_count == 1);
1595 g_main_loop_run(default_main_loop());
1596 g_assert(xfer->status == XFER_DONE);
1603 #define make_test_glue(n, s, d) static int n(void) \
1605 return test_glue_combo((XferElement *)g_object_new(s, NULL), \
1606 (XferElement *)g_object_new(d, NULL)); \
1608 make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE)
1609 make_test_glue(test_glue_READFD_WRITEFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1610 make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE)
1611 make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE)
1612 make_test_glue(test_glue_READFD_LISTEN, XFER_SOURCE_READFD_TYPE, XFER_DEST_LISTEN_TYPE)
1613 make_test_glue(test_glue_READFD_CONNECT, XFER_SOURCE_READFD_TYPE, XFER_DEST_CONNECT_TYPE)
1614 make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE)
1615 make_test_glue(test_glue_WRITEFD_WRITEFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
1616 make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE)
1617 make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE)
1618 make_test_glue(test_glue_WRITEFD_LISTEN, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_LISTEN_TYPE)
1619 make_test_glue(test_glue_WRITEFD_CONNECT, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_CONNECT_TYPE)
1620 make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE)
1621 make_test_glue(test_glue_PUSH_WRITEFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
1622 make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE)
1623 make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE)
1624 make_test_glue(test_glue_PUSH_LISTEN, XFER_SOURCE_PUSH_TYPE, XFER_DEST_LISTEN_TYPE)
1625 make_test_glue(test_glue_PUSH_CONNECT, XFER_SOURCE_PUSH_TYPE, XFER_DEST_CONNECT_TYPE)
1626 make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE)
1627 make_test_glue(test_glue_PULL_WRITEFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
1628 make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE)
1629 make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE)
1630 make_test_glue(test_glue_PULL_LISTEN, XFER_SOURCE_PULL_TYPE, XFER_DEST_LISTEN_TYPE)
1631 make_test_glue(test_glue_PULL_CONNECT, XFER_SOURCE_PULL_TYPE, XFER_DEST_CONNECT_TYPE)
1632 make_test_glue(test_glue_LISTEN_READFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_READFD_TYPE)
1633 make_test_glue(test_glue_LISTEN_WRITEFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_WRITEFD_TYPE)
1634 make_test_glue(test_glue_LISTEN_PUSH, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PUSH_TYPE)
1635 make_test_glue(test_glue_LISTEN_PULL, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PULL_TYPE)
1636 make_test_glue(test_glue_LISTEN_LISTEN, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_LISTEN_TYPE)
1637 make_test_glue(test_glue_LISTEN_CONNECT, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_CONNECT_TYPE)
1638 make_test_glue(test_glue_CONNECT_READFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_READFD_TYPE)
1639 make_test_glue(test_glue_CONNECT_WRITEFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_WRITEFD_TYPE)
1640 make_test_glue(test_glue_CONNECT_PUSH, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PUSH_TYPE)
1641 make_test_glue(test_glue_CONNECT_PULL, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PULL_TYPE)
1642 make_test_glue(test_glue_CONNECT_LISTEN, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_LISTEN_TYPE)
1643 make_test_glue(test_glue_CONNECT_CONNECT, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_CONNECT_TYPE)
1650 main(int argc, char **argv)
1652 static TestUtilsTest tests[] = {
1653 TU_TEST(test_xfer_simple, 90),
1654 TU_TEST(test_xfer_files_simple, 90),
1655 TU_TEST(test_xfer_files_filter, 90),
1656 TU_TEST(test_glue_READFD_READFD, 90),
1657 TU_TEST(test_glue_READFD_WRITEFD, 90),
1658 TU_TEST(test_glue_READFD_PUSH, 90),
1659 TU_TEST(test_glue_READFD_PULL, 90),
1660 TU_TEST(test_glue_READFD_LISTEN, 90),
1661 TU_TEST(test_glue_READFD_CONNECT, 90),
1662 TU_TEST(test_glue_WRITEFD_READFD, 90),
1663 TU_TEST(test_glue_WRITEFD_WRITEFD, 90),
1664 TU_TEST(test_glue_WRITEFD_PUSH, 90),
1665 TU_TEST(test_glue_WRITEFD_PULL, 90),
1666 TU_TEST(test_glue_WRITEFD_LISTEN, 90),
1667 TU_TEST(test_glue_WRITEFD_CONNECT, 90),
1668 TU_TEST(test_glue_PUSH_READFD, 90),
1669 TU_TEST(test_glue_PUSH_WRITEFD, 90),
1670 TU_TEST(test_glue_PUSH_PUSH, 90),
1671 TU_TEST(test_glue_PUSH_PULL, 90),
1672 TU_TEST(test_glue_PUSH_LISTEN, 90),
1673 TU_TEST(test_glue_PUSH_CONNECT, 90),
1674 TU_TEST(test_glue_PULL_READFD, 90),
1675 TU_TEST(test_glue_PULL_WRITEFD, 90),
1676 TU_TEST(test_glue_PULL_PUSH, 90),
1677 TU_TEST(test_glue_PULL_PULL, 90),
1678 TU_TEST(test_glue_PULL_LISTEN, 90),
1679 TU_TEST(test_glue_PULL_CONNECT, 90),
1680 TU_TEST(test_glue_LISTEN_READFD, 90),
1681 TU_TEST(test_glue_LISTEN_WRITEFD, 90),
1682 TU_TEST(test_glue_LISTEN_PUSH, 90),
1683 TU_TEST(test_glue_LISTEN_PULL, 90),
1684 TU_TEST(test_glue_LISTEN_LISTEN, 90),
1685 TU_TEST(test_glue_LISTEN_CONNECT, 90),
1686 TU_TEST(test_glue_CONNECT_READFD, 90),
1687 TU_TEST(test_glue_CONNECT_WRITEFD, 90),
1688 TU_TEST(test_glue_CONNECT_PUSH, 90),
1689 TU_TEST(test_glue_CONNECT_PULL, 90),
1690 TU_TEST(test_glue_CONNECT_LISTEN, 90),
1691 TU_TEST(test_glue_CONNECT_CONNECT, 90),
1697 return testutils_run_tests(argc, argv, tests);