/*
- * Copyright (c) 2008 Zmanda Inc. All Rights Reserved.
+ * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
* with this program; if not, write to the Free Software Foundation, Inc.,
* 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
- * Contact information: Zmanda Inc, 465 N Mathlida Ave, Suite 300
+ * Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
* Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
*
* Author: Dustin J. Mitchell <dustin@zmanda.com>
#include "amanda.h"
#include "event.h"
#include "simpleprng.h"
+#include "sockaddr-util.h"
/* Having tests repeat exactly is an advantage, so we use a hard-coded
* random seed. */
/* constants to determine the total amount of data to be transfered; EXTRA is
* to test out partial-block handling; it should be prime. */
-#define TEST_BLOCK_SIZE 1024
+#define TEST_BLOCK_SIZE 32768
#define TEST_BLOCK_COUNT 10
#define TEST_BLOCK_EXTRA 97
#define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA)
source_readfd_thread(
gpointer data)
{
- XferSourceReadfd *self = (XferSourceReadfd *)data;
+ XferSourceReadfd *self = XFER_SOURCE_READFD(data);
char buf[TEST_XFER_SIZE];
int fd = self->write_fd;
return NULL;
}
-static void
+static gboolean
source_readfd_setup_impl(
XferElement *elt)
{
- XferSourceReadfd *self = (XferSourceReadfd *)elt;
+ XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
int p[2];
simpleprng_seed(&self->prng, RANDOM_SEED);
self->write_fd = p[1];
XFER_ELEMENT(self)->output_fd = p[0];
+
+ return TRUE;
}
static gboolean
source_readfd_start_impl(
XferElement *elt)
{
- XferSourceReadfd *self = (XferSourceReadfd *)elt;
+ XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL);
return TRUE;
source_writefd_thread(
gpointer data)
{
- XferSourceWritefd *self = (XferSourceWritefd *)data;
+ XferSourceWritefd *self = XFER_SOURCE_WRITEFD(data);
char buf[TEST_XFER_SIZE];
int fd = XFER_ELEMENT(self)->downstream->input_fd;
source_writefd_start_impl(
XferElement *elt)
{
- XferSourceWritefd *self = (XferSourceWritefd *)elt;
+ XferSourceWritefd *self = XFER_SOURCE_WRITEFD(elt);
simpleprng_seed(&self->prng, RANDOM_SEED);
source_push_thread(
gpointer data)
{
- XferSourcePush *self = (XferSourcePush *)data;
+ XferSourcePush *self = XFER_SOURCE_PUSH(data);
char *buf;
int i;
source_push_start_impl(
XferElement *elt)
{
- XferSourcePush *self = (XferSourcePush *)elt;
+ XferSourcePush *self = XFER_SOURCE_PUSH(elt);
simpleprng_seed(&self->prng, RANDOM_SEED);
XferElement *elt,
size_t *size)
{
- XferSourcePull *self = (XferSourcePull *)elt;
+ XferSourcePull *self = XFER_SOURCE_PULL(elt);
char *buf;
size_t bufsiz;
return buf;
}
-static void
+static gboolean
source_pull_setup_impl(
XferElement *elt)
{
- XferSourcePull *self = (XferSourcePull *)elt;
+ XferSourcePull *self = XFER_SOURCE_PULL(elt);
simpleprng_seed(&self->prng, RANDOM_SEED);
+
+ return TRUE;
}
static void
return type;
}
+/* LISTEN */
+
+static GType xfer_source_listen_get_type(void);
+#define XFER_SOURCE_LISTEN_TYPE (xfer_source_listen_get_type())
+#define XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen)
+#define XFER_SOURCE_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen const)
+#define XFER_SOURCE_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_listen_get_type(), XferSourceListenClass)
+#define IS_XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_listen_get_type ())
+#define XFER_SOURCE_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_listen_get_type(), XferSourceListenClass)
+
+typedef struct XferSourceListen {
+ XferElement __parent__;
+
+ GThread *thread;
+ simpleprng_state_t prng;
+} XferSourceListen;
+
+typedef struct {
+ XferElementClass __parent__;
+} XferSourceListenClass;
+
+static gpointer
+source_listen_thread(
+ gpointer data)
+{
+ XferSourceListen *self = XFER_SOURCE_LISTEN(data);
+ XferElement *elt = XFER_ELEMENT(self);
+ DirectTCPAddr *addrs;
+ sockaddr_union addr;
+ int sock;
+ char *buf;
+ int i;
+
+ /* set up the sockaddr -- IPv4 only */
+ SU_INIT(&addr, AF_INET);
+ addrs = elt->downstream->input_listen_addrs;
+ g_assert(addrs != NULL);
+ SU_SET_PORT(&addr, addrs->port);
+ ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+
+ tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
+ sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock < 0) {
+ error("socket(): %s", strerror(errno));
+ }
+ if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
+ error("connect(): %s", strerror(errno));
+ }
+
+ tu_dbg("connected to %s\n", str_sockaddr(&addr));
+
+ buf = g_malloc(TEST_BLOCK_SIZE);
+ for (i = 0; i < TEST_BLOCK_COUNT; i++) {
+ simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
+ if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
+ error("error in full_write(): %s", strerror(errno));
+ }
+ }
+
+ /* send a smaller block */
+ simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
+ if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
+ error("error in full_write(): %s", strerror(errno));
+ }
+ g_free(buf);
+
+ /* send EOF by closing the socket */
+ close(sock);
+
+ xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+ return NULL;
+}
+
+static gboolean
+source_listen_start_impl(
+ XferElement *elt)
+{
+ XferSourceListen *self = XFER_SOURCE_LISTEN(elt);
+
+ simpleprng_seed(&self->prng, RANDOM_SEED);
+
+ self->thread = g_thread_create(source_listen_thread, (gpointer)self, FALSE, NULL);
+
+ return TRUE;
+}
+
+static void
+source_listen_class_init(
+ XferSourceListenClass * klass)
+{
+ XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+ static xfer_element_mech_pair_t mech_pairs[] = {
+ { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 1, 0},
+ { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ };
+
+ xec->start = source_listen_start_impl;
+ xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_source_listen_get_type (void)
+{
+ static GType type = 0;
+
+ if G_UNLIKELY(type == 0) {
+ static const GTypeInfo info = {
+ sizeof (XferSourceListenClass),
+ (GBaseInitFunc) NULL,
+ (GBaseFinalizeFunc) NULL,
+ (GClassInitFunc) source_listen_class_init,
+ (GClassFinalizeFunc) NULL,
+ NULL /* class_data */,
+ sizeof (XferSourceListen),
+ 0 /* n_preallocs */,
+ (GInstanceInitFunc) NULL,
+ NULL
+ };
+
+ type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceListen", &info, 0);
+ }
+
+ return type;
+}
+
+/* CONNECT */
+
+static GType xfer_source_connect_get_type(void);
+#define XFER_SOURCE_CONNECT_TYPE (xfer_source_connect_get_type())
+#define XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect)
+#define XFER_SOURCE_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect const)
+#define XFER_SOURCE_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_connect_get_type(), XferSourceConnectClass)
+#define IS_XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_connect_get_type ())
+#define XFER_SOURCE_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_connect_get_type(), XferSourceConnectClass)
+
+typedef struct XferSourceConnect {
+ XferElement __parent__;
+
+ int listen_socket;
+
+ GThread *thread;
+ simpleprng_state_t prng;
+} XferSourceConnect;
+
+typedef struct {
+ XferElementClass __parent__;
+} XferSourceConnectClass;
+
+static gpointer
+source_connect_thread(
+ gpointer data)
+{
+ XferSourceConnect *self = XFER_SOURCE_CONNECT(data);
+ int sock;
+ char *buf;
+ int i;
+
+ g_assert(self->listen_socket != -1);
+
+ if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
+ xfer_cancel_with_error(XFER_ELEMENT(self),
+ _("Error accepting incoming connection: %s"), strerror(errno));
+ wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
+ return NULL;
+ }
+
+ /* close the listening socket now, for good measure */
+ close(self->listen_socket);
+ self->listen_socket = -1;
+
+ tu_dbg("connection accepted\n");
+
+ buf = g_malloc(TEST_BLOCK_SIZE);
+ for (i = 0; i < TEST_BLOCK_COUNT; i++) {
+ simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
+ if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
+ error("error in full_write(): %s", strerror(errno));
+ }
+ }
+
+ /* send a smaller block */
+ simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
+ if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
+ error("error in full_write(): %s", strerror(errno));
+ }
+ g_free(buf);
+
+ /* send EOF by closing the socket */
+ close(sock);
+
+ xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+ return NULL;
+}
+
+static gboolean
+source_connect_setup_impl(
+ XferElement *elt)
+{
+ XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
+ sockaddr_union addr;
+ DirectTCPAddr *addrs;
+ socklen_t len;
+ int sock;
+
+ /* set up self->listen_socket and set elt->output_listen_addrs */
+ sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock < 0)
+ error("socket(): %s", strerror(errno));
+
+ if (listen(sock, 1) < 0)
+ error("listen(): %s", strerror(errno));
+
+ len = sizeof(addr);
+ if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
+ error("getsockname(): %s", strerror(errno));
+ g_assert(SU_GET_FAMILY(&addr) == AF_INET);
+
+ addrs = g_new0(DirectTCPAddr, 2);
+ addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
+ addrs[0].port = SU_GET_PORT(&addr);
+ elt->output_listen_addrs = addrs;
+
+ return TRUE;
+}
+
+static gboolean
+source_connect_start_impl(
+ XferElement *elt)
+{
+ XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
+
+ simpleprng_seed(&self->prng, RANDOM_SEED);
+
+ self->thread = g_thread_create(source_connect_thread, (gpointer)self, FALSE, NULL);
+
+ return TRUE;
+}
+
+static void
+source_connect_class_init(
+ XferSourceConnectClass * klass)
+{
+ XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+ static xfer_element_mech_pair_t mech_pairs[] = {
+ { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 1, 0},
+ { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ };
+
+ xec->setup = source_connect_setup_impl;
+ xec->start = source_connect_start_impl;
+ xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_source_connect_get_type (void)
+{
+ static GType type = 0;
+
+ if G_UNLIKELY(type == 0) {
+ static const GTypeInfo info = {
+ sizeof (XferSourceConnectClass),
+ (GBaseInitFunc) NULL,
+ (GBaseFinalizeFunc) NULL,
+ (GClassInitFunc) source_connect_class_init,
+ (GClassFinalizeFunc) NULL,
+ NULL /* class_data */,
+ sizeof (XferSourceConnect),
+ 0 /* n_preallocs */,
+ (GInstanceInitFunc) NULL,
+ NULL
+ };
+
+ type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceConnect", &info, 0);
+ }
+
+ return type;
+}
+
/* READFD */
static GType xfer_dest_readfd_get_type(void);
dest_readfd_thread(
gpointer data)
{
- XferDestReadfd *self = (XferDestReadfd *)data;
+ XferDestReadfd *self = XFER_DEST_READFD(data);
char buf[TEST_XFER_SIZE];
size_t remaining;
int fd = XFER_ELEMENT(self)->upstream->output_fd;
dest_readfd_start_impl(
XferElement *elt)
{
- XferDestReadfd *self = (XferDestReadfd *)elt;
+ XferDestReadfd *self = XFER_DEST_READFD(elt);
simpleprng_seed(&self->prng, RANDOM_SEED);
dest_writefd_thread(
gpointer data)
{
- XferDestWritefd *self = (XferDestWritefd *)data;
+ XferDestWritefd *self = XFER_DEST_WRITEFD(data);
char buf[TEST_XFER_SIZE];
size_t remaining;
int fd = self->read_fd;
return NULL;
}
-static void
+static gboolean
dest_writefd_setup_impl(
XferElement *elt)
{
- XferDestWritefd *self = (XferDestWritefd *)elt;
+ XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
int p[2];
simpleprng_seed(&self->prng, RANDOM_SEED);
self->read_fd = p[0];
XFER_ELEMENT(self)->input_fd = p[1];
+
+ return TRUE;
}
static gboolean
dest_writefd_start_impl(
XferElement *elt)
{
- XferDestWritefd *self = (XferDestWritefd *)elt;
+ XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL);
return TRUE;
typedef struct XferDestPush {
XferElement __parent__;
- char buf[TEST_XFER_SIZE];
+ char *buf;
size_t bufpos;
GThread *thread;
gpointer buf,
size_t size)
{
- XferDestPush *self = (XferDestPush *)elt;
+ XferDestPush *self = XFER_DEST_PUSH(elt);
if (buf == NULL) {
/* if we're at EOF, verify we got the right bytes */
g_assert(self->bufpos == TEST_XFER_SIZE);
if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE))
g_critical("data entering XferDestPush does not match");
+ g_free(self->buf);
return;
}
self->bufpos += size;
}
-static void
+static gboolean
dest_push_setup_impl(
XferElement *elt)
{
- XferDestPush *self = (XferDestPush *)elt;
+ XferDestPush *self = XFER_DEST_PUSH(elt);
+ self->buf = g_malloc(TEST_XFER_SIZE);
simpleprng_seed(&self->prng, RANDOM_SEED);
+
+ return TRUE;
}
static void
dest_pull_thread(
gpointer data)
{
- XferDestPull *self = (XferDestPull *)data;
+ XferDestPull *self = XFER_DEST_PULL(data);
char fullbuf[TEST_XFER_SIZE];
char *buf;
size_t bufpos = 0;
dest_pull_start_impl(
XferElement *elt)
{
- XferDestPull *self = (XferDestPull *)elt;
+ XferDestPull *self = XFER_DEST_PULL(elt);
simpleprng_seed(&self->prng, RANDOM_SEED);
return type;
}
+/* LISTEN */
+
+static GType xfer_dest_listen_get_type(void);
+#define XFER_DEST_LISTEN_TYPE (xfer_dest_listen_get_type())
+#define XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen)
+#define XFER_DEST_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen const)
+#define XFER_DEST_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_listen_get_type(), XferDestListenClass)
+#define IS_XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_listen_get_type ())
+#define XFER_DEST_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_listen_get_type(), XferDestListenClass)
+
+typedef struct XferDestListen {
+ XferElement __parent__;
+
+ int listen_socket;
+
+ GThread *thread;
+ simpleprng_state_t prng;
+} XferDestListen;
+
+typedef struct {
+ XferElementClass __parent__;
+} XferDestListenClass;
+
+static gpointer
+dest_listen_thread(
+ gpointer data)
+{
+ XferDestListen *self = XFER_DEST_LISTEN(data);
+ char *buf;
+ size_t bytes = 0;
+ int sock;
+
+ g_assert(self->listen_socket != -1);
+
+ if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
+ xfer_cancel_with_error(XFER_ELEMENT(self),
+ _("Error accepting incoming connection: %s"), strerror(errno));
+ wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
+ return NULL;
+ }
+
+ /* close the listening socket now, for good measure */
+ close(self->listen_socket);
+ self->listen_socket = -1;
+
+ /* read from the socket until EOF or all of the data is read. We try to
+ * read one extra byte - if we get it, then upstream sent too much data */
+ buf = g_malloc(TEST_XFER_SIZE+1);
+ bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
+ g_assert(bytes == TEST_XFER_SIZE);
+ close(sock);
+
+ /* we're at EOF, so verify we got the right bytes */
+ g_assert(bytes == TEST_XFER_SIZE);
+ if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
+ g_critical("data entering XferDestListen does not match");
+
+ xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+ return NULL;
+}
+
+static gboolean
+dest_listen_setup_impl(
+ XferElement *elt)
+{
+ XferDestListen *self = XFER_DEST_LISTEN(elt);
+ sockaddr_union addr;
+ DirectTCPAddr *addrs;
+ socklen_t len;
+ int sock;
+
+ /* set up self->listen_socket and set elt->input_listen_addrs */
+ sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock < 0)
+ error("socket(): %s", strerror(errno));
+
+ if (listen(sock, 1) < 0)
+ error("listen(): %s", strerror(errno));
+
+ len = sizeof(addr);
+ if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
+ error("getsockname(): %s", strerror(errno));
+ g_assert(SU_GET_FAMILY(&addr) == AF_INET);
+
+ addrs = g_new0(DirectTCPAddr, 2);
+ addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
+ addrs[0].port = SU_GET_PORT(&addr);
+ elt->input_listen_addrs = addrs;
+
+ return TRUE;
+}
+
+static gboolean
+dest_listen_start_impl(
+ XferElement *elt)
+{
+ XferDestListen *self = XFER_DEST_LISTEN(elt);
+
+ simpleprng_seed(&self->prng, RANDOM_SEED);
+
+ self->thread = g_thread_create(dest_listen_thread, (gpointer)self, FALSE, NULL);
+
+ return TRUE;
+}
+
+static void
+dest_listen_class_init(
+ XferDestListenClass * klass)
+{
+ XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+ static xfer_element_mech_pair_t mech_pairs[] = {
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 1, 1},
+ { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ };
+
+ xec->setup = dest_listen_setup_impl;
+ xec->start = dest_listen_start_impl;
+ xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_dest_listen_get_type (void)
+{
+ static GType type = 0;
+
+ if G_UNLIKELY(type == 0) {
+ static const GTypeInfo info = {
+ sizeof (XferDestListenClass),
+ (GBaseInitFunc) NULL,
+ (GBaseFinalizeFunc) NULL,
+ (GClassInitFunc) dest_listen_class_init,
+ (GClassFinalizeFunc) NULL,
+ NULL /* class_data */,
+ sizeof (XferDestListen),
+ 0 /* n_preallocs */,
+ (GInstanceInitFunc) NULL,
+ NULL
+ };
+
+ type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestListen", &info, 0);
+ }
+
+ return type;
+}
+
+/* CONNET */
+
+static GType xfer_dest_connect_get_type(void);
+#define XFER_DEST_CONNECT_TYPE (xfer_dest_connect_get_type())
+#define XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect)
+#define XFER_DEST_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect const)
+#define XFER_DEST_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_connect_get_type(), XferDestConnectClass)
+#define IS_XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_connect_get_type ())
+#define XFER_DEST_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_connect_get_type(), XferDestConnectClass)
+
+typedef struct XferDestConnect {
+ XferElement __parent__;
+
+ int connect_socket;
+
+ GThread *thread;
+ simpleprng_state_t prng;
+} XferDestConnect;
+
+typedef struct {
+ XferElementClass __parent__;
+} XferDestConnectClass;
+
+static gpointer
+dest_connect_thread(
+ gpointer data)
+{
+ XferDestConnect *self = XFER_DEST_CONNECT(data);
+ XferElement *elt = XFER_ELEMENT(self);
+ DirectTCPAddr *addrs;
+ sockaddr_union addr;
+ char *buf;
+ size_t bytes = 0;
+ int sock;
+
+ /* set up the sockaddr -- IPv4 only */
+ SU_INIT(&addr, AF_INET);
+ addrs = elt->upstream->output_listen_addrs;
+ g_assert(addrs != NULL);
+ SU_SET_PORT(&addr, addrs->port);
+ ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+
+ tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
+ sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock < 0) {
+ error("socket(): %s", strerror(errno));
+ }
+ if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
+ error("connect(): %s", strerror(errno));
+ }
+
+ tu_dbg("connected to %s\n", str_sockaddr(&addr));
+
+ /* read from the socket until EOF or all of the data is read. We try to
+ * read one extra byte - if we get it, then upstream sent too much data */
+ buf = g_malloc(TEST_XFER_SIZE+1);
+ bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
+ g_assert(bytes == TEST_XFER_SIZE);
+ close(sock);
+
+ /* we're at EOF, so verify we got the right bytes */
+ g_assert(bytes == TEST_XFER_SIZE);
+ if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
+ g_critical("data entering XferDestConnect does not match");
+
+ xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+ return NULL;
+}
+
+static gboolean
+dest_connect_start_impl(
+ XferElement *elt)
+{
+ XferDestConnect *self = XFER_DEST_CONNECT(elt);
+
+ simpleprng_seed(&self->prng, RANDOM_SEED);
+
+ self->thread = g_thread_create(dest_connect_thread, (gpointer)self, FALSE, NULL);
+
+ return TRUE;
+}
+
+static void
+dest_connect_class_init(
+ XferDestConnectClass * klass)
+{
+ XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+ static xfer_element_mech_pair_t mech_pairs[] = {
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, 1, 1},
+ { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ };
+
+ xec->start = dest_connect_start_impl;
+ xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_dest_connect_get_type (void)
+{
+ static GType type = 0;
+
+ if G_UNLIKELY(type == 0) {
+ static const GTypeInfo info = {
+ sizeof (XferDestConnectClass),
+ (GBaseInitFunc) NULL,
+ (GBaseFinalizeFunc) NULL,
+ (GClassInitFunc) dest_connect_class_init,
+ (GClassFinalizeFunc) NULL,
+ NULL /* class_data */,
+ sizeof (XferDestConnect),
+ 0 /* n_preallocs */,
+ (GInstanceInitFunc) NULL,
+ NULL
+ };
+
+ type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestConnect", &info, 0);
+ }
+
+ return type;
+}
+
/*
* Tests
(XferElement *)g_object_new(d, NULL)); \
}
make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE)
-make_test_glue(test_glue_READFD_WRITE, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_READFD_WRITEFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE)
make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_READFD_LISTEN, XFER_SOURCE_READFD_TYPE, XFER_DEST_LISTEN_TYPE)
+make_test_glue(test_glue_READFD_CONNECT, XFER_SOURCE_READFD_TYPE, XFER_DEST_CONNECT_TYPE)
make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE)
-make_test_glue(test_glue_WRITEFD_WRITE, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_WRITEFD_WRITEFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE)
make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_WRITEFD_LISTEN, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_LISTEN_TYPE)
+make_test_glue(test_glue_WRITEFD_CONNECT, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_CONNECT_TYPE)
make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE)
-make_test_glue(test_glue_PUSH_WRITE, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_PUSH_WRITEFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE)
make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_PUSH_LISTEN, XFER_SOURCE_PUSH_TYPE, XFER_DEST_LISTEN_TYPE)
+make_test_glue(test_glue_PUSH_CONNECT, XFER_SOURCE_PUSH_TYPE, XFER_DEST_CONNECT_TYPE)
make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE)
-make_test_glue(test_glue_PULL_WRITE, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_PULL_WRITEFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE)
make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_PULL_LISTEN, XFER_SOURCE_PULL_TYPE, XFER_DEST_LISTEN_TYPE)
+make_test_glue(test_glue_PULL_CONNECT, XFER_SOURCE_PULL_TYPE, XFER_DEST_CONNECT_TYPE)
+make_test_glue(test_glue_LISTEN_READFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_READFD_TYPE)
+make_test_glue(test_glue_LISTEN_WRITEFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_LISTEN_PUSH, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PUSH_TYPE)
+make_test_glue(test_glue_LISTEN_PULL, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_LISTEN_LISTEN, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_LISTEN_TYPE)
+make_test_glue(test_glue_LISTEN_CONNECT, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_CONNECT_TYPE)
+make_test_glue(test_glue_CONNECT_READFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_READFD_TYPE)
+make_test_glue(test_glue_CONNECT_WRITEFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_CONNECT_PUSH, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PUSH_TYPE)
+make_test_glue(test_glue_CONNECT_PULL, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_CONNECT_LISTEN, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_LISTEN_TYPE)
+make_test_glue(test_glue_CONNECT_CONNECT, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_CONNECT_TYPE)
/*
* Main driver
main(int argc, char **argv)
{
static TestUtilsTest tests[] = {
- TU_TEST(test_xfer_simple, 10),
- TU_TEST(test_xfer_files_simple, 10),
- TU_TEST(test_xfer_files_filter, 10),
- TU_TEST(test_glue_READFD_READFD, 5),
- TU_TEST(test_glue_READFD_WRITE, 5),
- TU_TEST(test_glue_READFD_PUSH, 5),
- TU_TEST(test_glue_READFD_PULL, 5),
- TU_TEST(test_glue_WRITEFD_READFD, 5),
- TU_TEST(test_glue_WRITEFD_WRITE, 5),
- TU_TEST(test_glue_WRITEFD_PUSH, 5),
- TU_TEST(test_glue_WRITEFD_PULL, 5),
- TU_TEST(test_glue_PUSH_READFD, 5),
- TU_TEST(test_glue_PUSH_WRITE, 5),
- TU_TEST(test_glue_PUSH_PUSH, 5),
- TU_TEST(test_glue_PUSH_PULL, 5),
- TU_TEST(test_glue_PULL_READFD, 5),
- TU_TEST(test_glue_PULL_WRITE, 5),
- TU_TEST(test_glue_PULL_PUSH, 5),
- TU_TEST(test_glue_PULL_PULL, 5),
+ TU_TEST(test_xfer_simple, 90),
+ TU_TEST(test_xfer_files_simple, 90),
+ TU_TEST(test_xfer_files_filter, 90),
+ TU_TEST(test_glue_READFD_READFD, 90),
+ TU_TEST(test_glue_READFD_WRITEFD, 90),
+ TU_TEST(test_glue_READFD_PUSH, 90),
+ TU_TEST(test_glue_READFD_PULL, 90),
+ TU_TEST(test_glue_READFD_LISTEN, 90),
+ TU_TEST(test_glue_READFD_CONNECT, 90),
+ TU_TEST(test_glue_WRITEFD_READFD, 90),
+ TU_TEST(test_glue_WRITEFD_WRITEFD, 90),
+ TU_TEST(test_glue_WRITEFD_PUSH, 90),
+ TU_TEST(test_glue_WRITEFD_PULL, 90),
+ TU_TEST(test_glue_WRITEFD_LISTEN, 90),
+ TU_TEST(test_glue_WRITEFD_CONNECT, 90),
+ TU_TEST(test_glue_PUSH_READFD, 90),
+ TU_TEST(test_glue_PUSH_WRITEFD, 90),
+ TU_TEST(test_glue_PUSH_PUSH, 90),
+ TU_TEST(test_glue_PUSH_PULL, 90),
+ TU_TEST(test_glue_PUSH_LISTEN, 90),
+ TU_TEST(test_glue_PUSH_CONNECT, 90),
+ TU_TEST(test_glue_PULL_READFD, 90),
+ TU_TEST(test_glue_PULL_WRITEFD, 90),
+ TU_TEST(test_glue_PULL_PUSH, 90),
+ TU_TEST(test_glue_PULL_PULL, 90),
+ TU_TEST(test_glue_PULL_LISTEN, 90),
+ TU_TEST(test_glue_PULL_CONNECT, 90),
+ TU_TEST(test_glue_LISTEN_READFD, 90),
+ TU_TEST(test_glue_LISTEN_WRITEFD, 90),
+ TU_TEST(test_glue_LISTEN_PUSH, 90),
+ TU_TEST(test_glue_LISTEN_PULL, 90),
+ TU_TEST(test_glue_LISTEN_LISTEN, 90),
+ TU_TEST(test_glue_LISTEN_CONNECT, 90),
+ TU_TEST(test_glue_CONNECT_READFD, 90),
+ TU_TEST(test_glue_CONNECT_WRITEFD, 90),
+ TU_TEST(test_glue_CONNECT_PUSH, 90),
+ TU_TEST(test_glue_CONNECT_PULL, 90),
+ TU_TEST(test_glue_CONNECT_LISTEN, 90),
+ TU_TEST(test_glue_CONNECT_CONNECT, 90),
TU_END()
};