X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=xfer-src%2Fxfer-test.c;h=30e69700e8afa30ce0e100f8c87ecc87eee34504;hb=fd48f3e498442f0cbff5f3606c7c403d0566150e;hp=f50cf132b52502ac794e6998c489a7a1de294d90;hpb=96f35b20267e8b1a1c846d476f27fcd330e0b018;p=debian%2Famanda diff --git a/xfer-src/xfer-test.c b/xfer-src/xfer-test.c index f50cf13..30e6970 100644 --- a/xfer-src/xfer-test.c +++ b/xfer-src/xfer-test.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008 Zmanda Inc. All Rights Reserved. + * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved. * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 as published @@ -14,7 +14,7 @@ * with this program; if not, write to the Free Software Foundation, Inc., * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * - * Contact information: Zmanda Inc, 465 N Mathlida Ave, Suite 300 + * Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com * * Author: Dustin J. Mitchell @@ -26,6 +26,7 @@ #include "amanda.h" #include "event.h" #include "simpleprng.h" +#include "sockaddr-util.h" /* Having tests repeat exactly is an advantage, so we use a hard-coded * random seed. */ @@ -41,7 +42,7 @@ /* constants to determine the total amount of data to be transfered; EXTRA is * to test out partial-block handling; it should be prime. */ -#define TEST_BLOCK_SIZE 1024 +#define TEST_BLOCK_SIZE 32768 #define TEST_BLOCK_COUNT 10 #define TEST_BLOCK_EXTRA 97 #define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA) @@ -72,7 +73,7 @@ static gpointer source_readfd_thread( gpointer data) { - XferSourceReadfd *self = (XferSourceReadfd *)data; + XferSourceReadfd *self = XFER_SOURCE_READFD(data); char buf[TEST_XFER_SIZE]; int fd = self->write_fd; @@ -89,11 +90,11 @@ source_readfd_thread( return NULL; } -static void +static gboolean source_readfd_setup_impl( XferElement *elt) { - XferSourceReadfd *self = (XferSourceReadfd *)elt; + XferSourceReadfd *self = XFER_SOURCE_READFD(elt); int p[2]; simpleprng_seed(&self->prng, RANDOM_SEED); @@ -103,13 +104,15 @@ source_readfd_setup_impl( self->write_fd = p[1]; XFER_ELEMENT(self)->output_fd = p[0]; + + return TRUE; } static gboolean source_readfd_start_impl( XferElement *elt) { - XferSourceReadfd *self = (XferSourceReadfd *)elt; + XferSourceReadfd *self = XFER_SOURCE_READFD(elt); self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL); return TRUE; @@ -180,7 +183,7 @@ static gpointer source_writefd_thread( gpointer data) { - XferSourceWritefd *self = (XferSourceWritefd *)data; + XferSourceWritefd *self = XFER_SOURCE_WRITEFD(data); char buf[TEST_XFER_SIZE]; int fd = XFER_ELEMENT(self)->downstream->input_fd; @@ -202,7 +205,7 @@ static gboolean source_writefd_start_impl( XferElement *elt) { - XferSourceWritefd *self = (XferSourceWritefd *)elt; + XferSourceWritefd *self = XFER_SOURCE_WRITEFD(elt); simpleprng_seed(&self->prng, RANDOM_SEED); @@ -275,7 +278,7 @@ static gpointer source_push_thread( gpointer data) { - XferSourcePush *self = (XferSourcePush *)data; + XferSourcePush *self = XFER_SOURCE_PUSH(data); char *buf; int i; @@ -304,7 +307,7 @@ static gboolean source_push_start_impl( XferElement *elt) { - XferSourcePush *self = (XferSourcePush *)elt; + XferSourcePush *self = XFER_SOURCE_PUSH(elt); simpleprng_seed(&self->prng, RANDOM_SEED); @@ -379,7 +382,7 @@ source_pull_pull_buffer_impl( XferElement *elt, size_t *size) { - XferSourcePull *self = (XferSourcePull *)elt; + XferSourcePull *self = XFER_SOURCE_PULL(elt); char *buf; size_t bufsiz; @@ -397,13 +400,15 @@ source_pull_pull_buffer_impl( return buf; } -static void +static gboolean source_pull_setup_impl( XferElement *elt) { - XferSourcePull *self = (XferSourcePull *)elt; + XferSourcePull *self = XFER_SOURCE_PULL(elt); simpleprng_seed(&self->prng, RANDOM_SEED); + + return TRUE; } static void @@ -446,6 +451,286 @@ xfer_source_pull_get_type (void) return type; } +/* LISTEN */ + +static GType xfer_source_listen_get_type(void); +#define XFER_SOURCE_LISTEN_TYPE (xfer_source_listen_get_type()) +#define XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen) +#define XFER_SOURCE_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen const) +#define XFER_SOURCE_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_listen_get_type(), XferSourceListenClass) +#define IS_XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_listen_get_type ()) +#define XFER_SOURCE_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_listen_get_type(), XferSourceListenClass) + +typedef struct XferSourceListen { + XferElement __parent__; + + GThread *thread; + simpleprng_state_t prng; +} XferSourceListen; + +typedef struct { + XferElementClass __parent__; +} XferSourceListenClass; + +static gpointer +source_listen_thread( + gpointer data) +{ + XferSourceListen *self = XFER_SOURCE_LISTEN(data); + XferElement *elt = XFER_ELEMENT(self); + DirectTCPAddr *addrs; + sockaddr_union addr; + int sock; + char *buf; + int i; + + /* set up the sockaddr -- IPv4 only */ + SU_INIT(&addr, AF_INET); + addrs = elt->downstream->input_listen_addrs; + g_assert(addrs != NULL); + SU_SET_PORT(&addr, addrs->port); + ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4); + + tu_dbg("making data connection to %s\n", str_sockaddr(&addr)); + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + error("socket(): %s", strerror(errno)); + } + if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) { + error("connect(): %s", strerror(errno)); + } + + tu_dbg("connected to %s\n", str_sockaddr(&addr)); + + buf = g_malloc(TEST_BLOCK_SIZE); + for (i = 0; i < TEST_BLOCK_COUNT; i++) { + simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE); + if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) { + error("error in full_write(): %s", strerror(errno)); + } + } + + /* send a smaller block */ + simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA); + if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) { + error("error in full_write(): %s", strerror(errno)); + } + g_free(buf); + + /* send EOF by closing the socket */ + close(sock); + + xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0)); + + return NULL; +} + +static gboolean +source_listen_start_impl( + XferElement *elt) +{ + XferSourceListen *self = XFER_SOURCE_LISTEN(elt); + + simpleprng_seed(&self->prng, RANDOM_SEED); + + self->thread = g_thread_create(source_listen_thread, (gpointer)self, FALSE, NULL); + + return TRUE; +} + +static void +source_listen_class_init( + XferSourceListenClass * klass) +{ + XferElementClass *xec = XFER_ELEMENT_CLASS(klass); + static xfer_element_mech_pair_t mech_pairs[] = { + { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 1, 0}, + { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + }; + + xec->start = source_listen_start_impl; + xec->mech_pairs = mech_pairs; +} + +GType +xfer_source_listen_get_type (void) +{ + static GType type = 0; + + if G_UNLIKELY(type == 0) { + static const GTypeInfo info = { + sizeof (XferSourceListenClass), + (GBaseInitFunc) NULL, + (GBaseFinalizeFunc) NULL, + (GClassInitFunc) source_listen_class_init, + (GClassFinalizeFunc) NULL, + NULL /* class_data */, + sizeof (XferSourceListen), + 0 /* n_preallocs */, + (GInstanceInitFunc) NULL, + NULL + }; + + type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceListen", &info, 0); + } + + return type; +} + +/* CONNECT */ + +static GType xfer_source_connect_get_type(void); +#define XFER_SOURCE_CONNECT_TYPE (xfer_source_connect_get_type()) +#define XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect) +#define XFER_SOURCE_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect const) +#define XFER_SOURCE_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_connect_get_type(), XferSourceConnectClass) +#define IS_XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_connect_get_type ()) +#define XFER_SOURCE_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_connect_get_type(), XferSourceConnectClass) + +typedef struct XferSourceConnect { + XferElement __parent__; + + int listen_socket; + + GThread *thread; + simpleprng_state_t prng; +} XferSourceConnect; + +typedef struct { + XferElementClass __parent__; +} XferSourceConnectClass; + +static gpointer +source_connect_thread( + gpointer data) +{ + XferSourceConnect *self = XFER_SOURCE_CONNECT(data); + int sock; + char *buf; + int i; + + g_assert(self->listen_socket != -1); + + if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) { + xfer_cancel_with_error(XFER_ELEMENT(self), + _("Error accepting incoming connection: %s"), strerror(errno)); + wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer); + return NULL; + } + + /* close the listening socket now, for good measure */ + close(self->listen_socket); + self->listen_socket = -1; + + tu_dbg("connection accepted\n"); + + buf = g_malloc(TEST_BLOCK_SIZE); + for (i = 0; i < TEST_BLOCK_COUNT; i++) { + simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE); + if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) { + error("error in full_write(): %s", strerror(errno)); + } + } + + /* send a smaller block */ + simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA); + if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) { + error("error in full_write(): %s", strerror(errno)); + } + g_free(buf); + + /* send EOF by closing the socket */ + close(sock); + + xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0)); + + return NULL; +} + +static gboolean +source_connect_setup_impl( + XferElement *elt) +{ + XferSourceConnect *self = XFER_SOURCE_CONNECT(elt); + sockaddr_union addr; + DirectTCPAddr *addrs; + socklen_t len; + int sock; + + /* set up self->listen_socket and set elt->output_listen_addrs */ + sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) + error("socket(): %s", strerror(errno)); + + if (listen(sock, 1) < 0) + error("listen(): %s", strerror(errno)); + + len = sizeof(addr); + if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0) + error("getsockname(): %s", strerror(errno)); + g_assert(SU_GET_FAMILY(&addr) == AF_INET); + + addrs = g_new0(DirectTCPAddr, 2); + addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); + addrs[0].port = SU_GET_PORT(&addr); + elt->output_listen_addrs = addrs; + + return TRUE; +} + +static gboolean +source_connect_start_impl( + XferElement *elt) +{ + XferSourceConnect *self = XFER_SOURCE_CONNECT(elt); + + simpleprng_seed(&self->prng, RANDOM_SEED); + + self->thread = g_thread_create(source_connect_thread, (gpointer)self, FALSE, NULL); + + return TRUE; +} + +static void +source_connect_class_init( + XferSourceConnectClass * klass) +{ + XferElementClass *xec = XFER_ELEMENT_CLASS(klass); + static xfer_element_mech_pair_t mech_pairs[] = { + { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 1, 0}, + { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + }; + + xec->setup = source_connect_setup_impl; + xec->start = source_connect_start_impl; + xec->mech_pairs = mech_pairs; +} + +GType +xfer_source_connect_get_type (void) +{ + static GType type = 0; + + if G_UNLIKELY(type == 0) { + static const GTypeInfo info = { + sizeof (XferSourceConnectClass), + (GBaseInitFunc) NULL, + (GBaseFinalizeFunc) NULL, + (GClassInitFunc) source_connect_class_init, + (GClassFinalizeFunc) NULL, + NULL /* class_data */, + sizeof (XferSourceConnect), + 0 /* n_preallocs */, + (GInstanceInitFunc) NULL, + NULL + }; + + type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceConnect", &info, 0); + } + + return type; +} + /* READFD */ static GType xfer_dest_readfd_get_type(void); @@ -471,7 +756,7 @@ static gpointer dest_readfd_thread( gpointer data) { - XferDestReadfd *self = (XferDestReadfd *)data; + XferDestReadfd *self = XFER_DEST_READFD(data); char buf[TEST_XFER_SIZE]; size_t remaining; int fd = XFER_ELEMENT(self)->upstream->output_fd; @@ -504,7 +789,7 @@ static gboolean dest_readfd_start_impl( XferElement *elt) { - XferDestReadfd *self = (XferDestReadfd *)elt; + XferDestReadfd *self = XFER_DEST_READFD(elt); simpleprng_seed(&self->prng, RANDOM_SEED); @@ -578,7 +863,7 @@ static gpointer dest_writefd_thread( gpointer data) { - XferDestWritefd *self = (XferDestWritefd *)data; + XferDestWritefd *self = XFER_DEST_WRITEFD(data); char buf[TEST_XFER_SIZE]; size_t remaining; int fd = self->read_fd; @@ -607,11 +892,11 @@ dest_writefd_thread( return NULL; } -static void +static gboolean dest_writefd_setup_impl( XferElement *elt) { - XferDestWritefd *self = (XferDestWritefd *)elt; + XferDestWritefd *self = XFER_DEST_WRITEFD(elt); int p[2]; simpleprng_seed(&self->prng, RANDOM_SEED); @@ -621,13 +906,15 @@ dest_writefd_setup_impl( self->read_fd = p[0]; XFER_ELEMENT(self)->input_fd = p[1]; + + return TRUE; } static gboolean dest_writefd_start_impl( XferElement *elt) { - XferDestWritefd *self = (XferDestWritefd *)elt; + XferDestWritefd *self = XFER_DEST_WRITEFD(elt); self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL); return TRUE; @@ -686,7 +973,7 @@ static GType xfer_dest_push_get_type(void); typedef struct XferDestPush { XferElement __parent__; - char buf[TEST_XFER_SIZE]; + char *buf; size_t bufpos; GThread *thread; @@ -703,13 +990,14 @@ dest_push_push_buffer_impl( gpointer buf, size_t size) { - XferDestPush *self = (XferDestPush *)elt; + XferDestPush *self = XFER_DEST_PUSH(elt); if (buf == NULL) { /* if we're at EOF, verify we got the right bytes */ g_assert(self->bufpos == TEST_XFER_SIZE); if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE)) g_critical("data entering XferDestPush does not match"); + g_free(self->buf); return; } @@ -718,13 +1006,16 @@ dest_push_push_buffer_impl( self->bufpos += size; } -static void +static gboolean dest_push_setup_impl( XferElement *elt) { - XferDestPush *self = (XferDestPush *)elt; + XferDestPush *self = XFER_DEST_PUSH(elt); + self->buf = g_malloc(TEST_XFER_SIZE); simpleprng_seed(&self->prng, RANDOM_SEED); + + return TRUE; } static void @@ -792,7 +1083,7 @@ static gpointer dest_pull_thread( gpointer data) { - XferDestPull *self = (XferDestPull *)data; + XferDestPull *self = XFER_DEST_PULL(data); char fullbuf[TEST_XFER_SIZE]; char *buf; size_t bufpos = 0; @@ -818,7 +1109,7 @@ static gboolean dest_pull_start_impl( XferElement *elt) { - XferDestPull *self = (XferDestPull *)elt; + XferDestPull *self = XFER_DEST_PULL(elt); simpleprng_seed(&self->prng, RANDOM_SEED); @@ -866,6 +1157,274 @@ xfer_dest_pull_get_type (void) return type; } +/* LISTEN */ + +static GType xfer_dest_listen_get_type(void); +#define XFER_DEST_LISTEN_TYPE (xfer_dest_listen_get_type()) +#define XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen) +#define XFER_DEST_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen const) +#define XFER_DEST_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_listen_get_type(), XferDestListenClass) +#define IS_XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_listen_get_type ()) +#define XFER_DEST_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_listen_get_type(), XferDestListenClass) + +typedef struct XferDestListen { + XferElement __parent__; + + int listen_socket; + + GThread *thread; + simpleprng_state_t prng; +} XferDestListen; + +typedef struct { + XferElementClass __parent__; +} XferDestListenClass; + +static gpointer +dest_listen_thread( + gpointer data) +{ + XferDestListen *self = XFER_DEST_LISTEN(data); + char *buf; + size_t bytes = 0; + int sock; + + g_assert(self->listen_socket != -1); + + if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) { + xfer_cancel_with_error(XFER_ELEMENT(self), + _("Error accepting incoming connection: %s"), strerror(errno)); + wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer); + return NULL; + } + + /* close the listening socket now, for good measure */ + close(self->listen_socket); + self->listen_socket = -1; + + /* read from the socket until EOF or all of the data is read. We try to + * read one extra byte - if we get it, then upstream sent too much data */ + buf = g_malloc(TEST_XFER_SIZE+1); + bytes = full_read(sock, buf, TEST_XFER_SIZE+1); + g_assert(bytes == TEST_XFER_SIZE); + close(sock); + + /* we're at EOF, so verify we got the right bytes */ + g_assert(bytes == TEST_XFER_SIZE); + if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE)) + g_critical("data entering XferDestListen does not match"); + + xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0)); + + return NULL; +} + +static gboolean +dest_listen_setup_impl( + XferElement *elt) +{ + XferDestListen *self = XFER_DEST_LISTEN(elt); + sockaddr_union addr; + DirectTCPAddr *addrs; + socklen_t len; + int sock; + + /* set up self->listen_socket and set elt->input_listen_addrs */ + sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) + error("socket(): %s", strerror(errno)); + + if (listen(sock, 1) < 0) + error("listen(): %s", strerror(errno)); + + len = sizeof(addr); + if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0) + error("getsockname(): %s", strerror(errno)); + g_assert(SU_GET_FAMILY(&addr) == AF_INET); + + addrs = g_new0(DirectTCPAddr, 2); + addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); + addrs[0].port = SU_GET_PORT(&addr); + elt->input_listen_addrs = addrs; + + return TRUE; +} + +static gboolean +dest_listen_start_impl( + XferElement *elt) +{ + XferDestListen *self = XFER_DEST_LISTEN(elt); + + simpleprng_seed(&self->prng, RANDOM_SEED); + + self->thread = g_thread_create(dest_listen_thread, (gpointer)self, FALSE, NULL); + + return TRUE; +} + +static void +dest_listen_class_init( + XferDestListenClass * klass) +{ + XferElementClass *xec = XFER_ELEMENT_CLASS(klass); + static xfer_element_mech_pair_t mech_pairs[] = { + { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 1, 1}, + { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + }; + + xec->setup = dest_listen_setup_impl; + xec->start = dest_listen_start_impl; + xec->mech_pairs = mech_pairs; +} + +GType +xfer_dest_listen_get_type (void) +{ + static GType type = 0; + + if G_UNLIKELY(type == 0) { + static const GTypeInfo info = { + sizeof (XferDestListenClass), + (GBaseInitFunc) NULL, + (GBaseFinalizeFunc) NULL, + (GClassInitFunc) dest_listen_class_init, + (GClassFinalizeFunc) NULL, + NULL /* class_data */, + sizeof (XferDestListen), + 0 /* n_preallocs */, + (GInstanceInitFunc) NULL, + NULL + }; + + type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestListen", &info, 0); + } + + return type; +} + +/* CONNET */ + +static GType xfer_dest_connect_get_type(void); +#define XFER_DEST_CONNECT_TYPE (xfer_dest_connect_get_type()) +#define XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect) +#define XFER_DEST_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect const) +#define XFER_DEST_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_connect_get_type(), XferDestConnectClass) +#define IS_XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_connect_get_type ()) +#define XFER_DEST_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_connect_get_type(), XferDestConnectClass) + +typedef struct XferDestConnect { + XferElement __parent__; + + int connect_socket; + + GThread *thread; + simpleprng_state_t prng; +} XferDestConnect; + +typedef struct { + XferElementClass __parent__; +} XferDestConnectClass; + +static gpointer +dest_connect_thread( + gpointer data) +{ + XferDestConnect *self = XFER_DEST_CONNECT(data); + XferElement *elt = XFER_ELEMENT(self); + DirectTCPAddr *addrs; + sockaddr_union addr; + char *buf; + size_t bytes = 0; + int sock; + + /* set up the sockaddr -- IPv4 only */ + SU_INIT(&addr, AF_INET); + addrs = elt->upstream->output_listen_addrs; + g_assert(addrs != NULL); + SU_SET_PORT(&addr, addrs->port); + ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4); + + tu_dbg("making data connection to %s\n", str_sockaddr(&addr)); + sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock < 0) { + error("socket(): %s", strerror(errno)); + } + if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) { + error("connect(): %s", strerror(errno)); + } + + tu_dbg("connected to %s\n", str_sockaddr(&addr)); + + /* read from the socket until EOF or all of the data is read. We try to + * read one extra byte - if we get it, then upstream sent too much data */ + buf = g_malloc(TEST_XFER_SIZE+1); + bytes = full_read(sock, buf, TEST_XFER_SIZE+1); + g_assert(bytes == TEST_XFER_SIZE); + close(sock); + + /* we're at EOF, so verify we got the right bytes */ + g_assert(bytes == TEST_XFER_SIZE); + if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE)) + g_critical("data entering XferDestConnect does not match"); + + xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0)); + + return NULL; +} + +static gboolean +dest_connect_start_impl( + XferElement *elt) +{ + XferDestConnect *self = XFER_DEST_CONNECT(elt); + + simpleprng_seed(&self->prng, RANDOM_SEED); + + self->thread = g_thread_create(dest_connect_thread, (gpointer)self, FALSE, NULL); + + return TRUE; +} + +static void +dest_connect_class_init( + XferDestConnectClass * klass) +{ + XferElementClass *xec = XFER_ELEMENT_CLASS(klass); + static xfer_element_mech_pair_t mech_pairs[] = { + { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, 1, 1}, + { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + }; + + xec->start = dest_connect_start_impl; + xec->mech_pairs = mech_pairs; +} + +GType +xfer_dest_connect_get_type (void) +{ + static GType type = 0; + + if G_UNLIKELY(type == 0) { + static const GTypeInfo info = { + sizeof (XferDestConnectClass), + (GBaseInitFunc) NULL, + (GBaseFinalizeFunc) NULL, + (GClassInitFunc) dest_connect_class_init, + (GClassFinalizeFunc) NULL, + NULL /* class_data */, + sizeof (XferDestConnect), + 0 /* n_preallocs */, + (GInstanceInitFunc) NULL, + NULL + }; + + type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestConnect", &info, 0); + } + + return type; +} + /* * Tests @@ -1042,21 +1601,41 @@ test_glue_combo( (XferElement *)g_object_new(d, NULL)); \ } make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE) -make_test_glue(test_glue_READFD_WRITE, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE) +make_test_glue(test_glue_READFD_WRITEFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE) make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE) make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE) +make_test_glue(test_glue_READFD_LISTEN, XFER_SOURCE_READFD_TYPE, XFER_DEST_LISTEN_TYPE) +make_test_glue(test_glue_READFD_CONNECT, XFER_SOURCE_READFD_TYPE, XFER_DEST_CONNECT_TYPE) make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE) -make_test_glue(test_glue_WRITEFD_WRITE, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE) +make_test_glue(test_glue_WRITEFD_WRITEFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE) make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE) make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE) +make_test_glue(test_glue_WRITEFD_LISTEN, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_LISTEN_TYPE) +make_test_glue(test_glue_WRITEFD_CONNECT, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_CONNECT_TYPE) make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE) -make_test_glue(test_glue_PUSH_WRITE, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE) +make_test_glue(test_glue_PUSH_WRITEFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE) make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE) make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE) +make_test_glue(test_glue_PUSH_LISTEN, XFER_SOURCE_PUSH_TYPE, XFER_DEST_LISTEN_TYPE) +make_test_glue(test_glue_PUSH_CONNECT, XFER_SOURCE_PUSH_TYPE, XFER_DEST_CONNECT_TYPE) make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE) -make_test_glue(test_glue_PULL_WRITE, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE) +make_test_glue(test_glue_PULL_WRITEFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE) make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE) make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE) +make_test_glue(test_glue_PULL_LISTEN, XFER_SOURCE_PULL_TYPE, XFER_DEST_LISTEN_TYPE) +make_test_glue(test_glue_PULL_CONNECT, XFER_SOURCE_PULL_TYPE, XFER_DEST_CONNECT_TYPE) +make_test_glue(test_glue_LISTEN_READFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_READFD_TYPE) +make_test_glue(test_glue_LISTEN_WRITEFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_WRITEFD_TYPE) +make_test_glue(test_glue_LISTEN_PUSH, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PUSH_TYPE) +make_test_glue(test_glue_LISTEN_PULL, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PULL_TYPE) +make_test_glue(test_glue_LISTEN_LISTEN, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_LISTEN_TYPE) +make_test_glue(test_glue_LISTEN_CONNECT, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_CONNECT_TYPE) +make_test_glue(test_glue_CONNECT_READFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_READFD_TYPE) +make_test_glue(test_glue_CONNECT_WRITEFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_WRITEFD_TYPE) +make_test_glue(test_glue_CONNECT_PUSH, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PUSH_TYPE) +make_test_glue(test_glue_CONNECT_PULL, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PULL_TYPE) +make_test_glue(test_glue_CONNECT_LISTEN, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_LISTEN_TYPE) +make_test_glue(test_glue_CONNECT_CONNECT, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_CONNECT_TYPE) /* * Main driver @@ -1066,25 +1645,45 @@ int main(int argc, char **argv) { static TestUtilsTest tests[] = { - TU_TEST(test_xfer_simple, 10), - TU_TEST(test_xfer_files_simple, 10), - TU_TEST(test_xfer_files_filter, 10), - TU_TEST(test_glue_READFD_READFD, 5), - TU_TEST(test_glue_READFD_WRITE, 5), - TU_TEST(test_glue_READFD_PUSH, 5), - TU_TEST(test_glue_READFD_PULL, 5), - TU_TEST(test_glue_WRITEFD_READFD, 5), - TU_TEST(test_glue_WRITEFD_WRITE, 5), - TU_TEST(test_glue_WRITEFD_PUSH, 5), - TU_TEST(test_glue_WRITEFD_PULL, 5), - TU_TEST(test_glue_PUSH_READFD, 5), - TU_TEST(test_glue_PUSH_WRITE, 5), - TU_TEST(test_glue_PUSH_PUSH, 5), - TU_TEST(test_glue_PUSH_PULL, 5), - TU_TEST(test_glue_PULL_READFD, 5), - TU_TEST(test_glue_PULL_WRITE, 5), - TU_TEST(test_glue_PULL_PUSH, 5), - TU_TEST(test_glue_PULL_PULL, 5), + TU_TEST(test_xfer_simple, 90), + TU_TEST(test_xfer_files_simple, 90), + TU_TEST(test_xfer_files_filter, 90), + TU_TEST(test_glue_READFD_READFD, 90), + TU_TEST(test_glue_READFD_WRITEFD, 90), + TU_TEST(test_glue_READFD_PUSH, 90), + TU_TEST(test_glue_READFD_PULL, 90), + TU_TEST(test_glue_READFD_LISTEN, 90), + TU_TEST(test_glue_READFD_CONNECT, 90), + TU_TEST(test_glue_WRITEFD_READFD, 90), + TU_TEST(test_glue_WRITEFD_WRITEFD, 90), + TU_TEST(test_glue_WRITEFD_PUSH, 90), + TU_TEST(test_glue_WRITEFD_PULL, 90), + TU_TEST(test_glue_WRITEFD_LISTEN, 90), + TU_TEST(test_glue_WRITEFD_CONNECT, 90), + TU_TEST(test_glue_PUSH_READFD, 90), + TU_TEST(test_glue_PUSH_WRITEFD, 90), + TU_TEST(test_glue_PUSH_PUSH, 90), + TU_TEST(test_glue_PUSH_PULL, 90), + TU_TEST(test_glue_PUSH_LISTEN, 90), + TU_TEST(test_glue_PUSH_CONNECT, 90), + TU_TEST(test_glue_PULL_READFD, 90), + TU_TEST(test_glue_PULL_WRITEFD, 90), + TU_TEST(test_glue_PULL_PUSH, 90), + TU_TEST(test_glue_PULL_PULL, 90), + TU_TEST(test_glue_PULL_LISTEN, 90), + TU_TEST(test_glue_PULL_CONNECT, 90), + TU_TEST(test_glue_LISTEN_READFD, 90), + TU_TEST(test_glue_LISTEN_WRITEFD, 90), + TU_TEST(test_glue_LISTEN_PUSH, 90), + TU_TEST(test_glue_LISTEN_PULL, 90), + TU_TEST(test_glue_LISTEN_LISTEN, 90), + TU_TEST(test_glue_LISTEN_CONNECT, 90), + TU_TEST(test_glue_CONNECT_READFD, 90), + TU_TEST(test_glue_CONNECT_WRITEFD, 90), + TU_TEST(test_glue_CONNECT_PUSH, 90), + TU_TEST(test_glue_CONNECT_PULL, 90), + TU_TEST(test_glue_CONNECT_LISTEN, 90), + TU_TEST(test_glue_CONNECT_CONNECT, 90), TU_END() };