Imported Upstream version 3.1.0
[debian/amanda] / xfer-src / xfer-test.c
index f50cf132b52502ac794e6998c489a7a1de294d90..30e69700e8afa30ce0e100f8c87ecc87eee34504 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008 Zmanda Inc.  All Rights Reserved.
+ * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
  * 
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 as published
@@ -14,7 +14,7 @@
  * with this program; if not, write to the Free Software Foundation, Inc.,
  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
  * 
- * Contact information: Zmanda Inc, 465 N Mathlida Ave, Suite 300
+ * Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
  *
  * Author: Dustin J. Mitchell <dustin@zmanda.com>
@@ -26,6 +26,7 @@
 #include "amanda.h"
 #include "event.h"
 #include "simpleprng.h"
+#include "sockaddr-util.h"
 
 /* Having tests repeat exactly is an advantage, so we use a hard-coded
  * random seed. */
@@ -41,7 +42,7 @@
 
 /* constants to determine the total amount of data to be transfered; EXTRA is
  * to test out partial-block handling; it should be prime. */
-#define TEST_BLOCK_SIZE 1024
+#define TEST_BLOCK_SIZE 32768
 #define TEST_BLOCK_COUNT 10
 #define TEST_BLOCK_EXTRA 97
 #define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA)
@@ -72,7 +73,7 @@ static gpointer
 source_readfd_thread(
     gpointer data)
 {
-    XferSourceReadfd *self = (XferSourceReadfd *)data;
+    XferSourceReadfd *self = XFER_SOURCE_READFD(data);
     char buf[TEST_XFER_SIZE];
     int fd = self->write_fd;
 
@@ -89,11 +90,11 @@ source_readfd_thread(
     return NULL;
 }
 
-static void
+static gboolean
 source_readfd_setup_impl(
     XferElement *elt)
 {
-    XferSourceReadfd *self = (XferSourceReadfd *)elt;
+    XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
     int p[2];
 
     simpleprng_seed(&self->prng, RANDOM_SEED);
@@ -103,13 +104,15 @@ source_readfd_setup_impl(
 
     self->write_fd = p[1];
     XFER_ELEMENT(self)->output_fd = p[0];
+
+    return TRUE;
 }
 
 static gboolean
 source_readfd_start_impl(
     XferElement *elt)
 {
-    XferSourceReadfd *self = (XferSourceReadfd *)elt;
+    XferSourceReadfd *self = XFER_SOURCE_READFD(elt);
     self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL);
 
     return TRUE;
@@ -180,7 +183,7 @@ static gpointer
 source_writefd_thread(
     gpointer data)
 {
-    XferSourceWritefd *self = (XferSourceWritefd *)data;
+    XferSourceWritefd *self = XFER_SOURCE_WRITEFD(data);
     char buf[TEST_XFER_SIZE];
     int fd = XFER_ELEMENT(self)->downstream->input_fd;
 
@@ -202,7 +205,7 @@ static gboolean
 source_writefd_start_impl(
     XferElement *elt)
 {
-    XferSourceWritefd *self = (XferSourceWritefd *)elt;
+    XferSourceWritefd *self = XFER_SOURCE_WRITEFD(elt);
 
     simpleprng_seed(&self->prng, RANDOM_SEED);
 
@@ -275,7 +278,7 @@ static gpointer
 source_push_thread(
     gpointer data)
 {
-    XferSourcePush *self = (XferSourcePush *)data;
+    XferSourcePush *self = XFER_SOURCE_PUSH(data);
     char *buf;
     int i;
 
@@ -304,7 +307,7 @@ static gboolean
 source_push_start_impl(
     XferElement *elt)
 {
-    XferSourcePush *self = (XferSourcePush *)elt;
+    XferSourcePush *self = XFER_SOURCE_PUSH(elt);
 
     simpleprng_seed(&self->prng, RANDOM_SEED);
 
@@ -379,7 +382,7 @@ source_pull_pull_buffer_impl(
     XferElement *elt,
     size_t *size)
 {
-    XferSourcePull *self = (XferSourcePull *)elt;
+    XferSourcePull *self = XFER_SOURCE_PULL(elt);
     char *buf;
     size_t bufsiz;
 
@@ -397,13 +400,15 @@ source_pull_pull_buffer_impl(
     return buf;
 }
 
-static void
+static gboolean
 source_pull_setup_impl(
     XferElement *elt)
 {
-    XferSourcePull *self = (XferSourcePull *)elt;
+    XferSourcePull *self = XFER_SOURCE_PULL(elt);
 
     simpleprng_seed(&self->prng, RANDOM_SEED);
+
+    return TRUE;
 }
 
 static void
@@ -446,6 +451,286 @@ xfer_source_pull_get_type (void)
     return type;
 }
 
+/* LISTEN */
+
+static GType xfer_source_listen_get_type(void);
+#define XFER_SOURCE_LISTEN_TYPE (xfer_source_listen_get_type())
+#define XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen)
+#define XFER_SOURCE_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen const)
+#define XFER_SOURCE_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_listen_get_type(), XferSourceListenClass)
+#define IS_XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_listen_get_type ())
+#define XFER_SOURCE_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_listen_get_type(), XferSourceListenClass)
+
+typedef struct XferSourceListen {
+    XferElement __parent__;
+
+    GThread *thread;
+    simpleprng_state_t prng;
+} XferSourceListen;
+
+typedef struct {
+    XferElementClass __parent__;
+} XferSourceListenClass;
+
+static gpointer
+source_listen_thread(
+    gpointer data)
+{
+    XferSourceListen *self = XFER_SOURCE_LISTEN(data);
+    XferElement *elt = XFER_ELEMENT(self);
+    DirectTCPAddr *addrs;
+    sockaddr_union addr;
+    int sock;
+    char *buf;
+    int i;
+
+    /* set up the sockaddr -- IPv4 only */
+    SU_INIT(&addr, AF_INET);
+    addrs = elt->downstream->input_listen_addrs;
+    g_assert(addrs != NULL);
+    SU_SET_PORT(&addr, addrs->port);
+    ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+
+    tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
+    sock = socket(AF_INET, SOCK_STREAM, 0);
+    if (sock < 0) {
+       error("socket(): %s", strerror(errno));
+    }
+    if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
+       error("connect(): %s", strerror(errno));
+    }
+
+    tu_dbg("connected to %s\n", str_sockaddr(&addr));
+
+    buf = g_malloc(TEST_BLOCK_SIZE);
+    for (i = 0; i < TEST_BLOCK_COUNT; i++) {
+       simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
+       if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
+           error("error in full_write(): %s", strerror(errno));
+       }
+    }
+
+    /* send a smaller block */
+    simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
+    if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
+       error("error in full_write(): %s", strerror(errno));
+    }
+    g_free(buf);
+
+    /* send EOF by closing the socket */
+    close(sock);
+
+    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+    return NULL;
+}
+
+static gboolean
+source_listen_start_impl(
+    XferElement *elt)
+{
+    XferSourceListen *self = XFER_SOURCE_LISTEN(elt);
+
+    simpleprng_seed(&self->prng, RANDOM_SEED);
+
+    self->thread = g_thread_create(source_listen_thread, (gpointer)self, FALSE, NULL);
+
+    return TRUE;
+}
+
+static void
+source_listen_class_init(
+    XferSourceListenClass * klass)
+{
+    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+    static xfer_element_mech_pair_t mech_pairs[] = {
+       { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 1, 0},
+       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    };
+
+    xec->start = source_listen_start_impl;
+    xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_source_listen_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferSourceListenClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) source_listen_class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferSourceListen),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) NULL,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceListen", &info, 0);
+    }
+
+    return type;
+}
+
+/* CONNECT */
+
+static GType xfer_source_connect_get_type(void);
+#define XFER_SOURCE_CONNECT_TYPE (xfer_source_connect_get_type())
+#define XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect)
+#define XFER_SOURCE_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect const)
+#define XFER_SOURCE_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_connect_get_type(), XferSourceConnectClass)
+#define IS_XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_connect_get_type ())
+#define XFER_SOURCE_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_connect_get_type(), XferSourceConnectClass)
+
+typedef struct XferSourceConnect {
+    XferElement __parent__;
+
+    int listen_socket;
+
+    GThread *thread;
+    simpleprng_state_t prng;
+} XferSourceConnect;
+
+typedef struct {
+    XferElementClass __parent__;
+} XferSourceConnectClass;
+
+static gpointer
+source_connect_thread(
+    gpointer data)
+{
+    XferSourceConnect *self = XFER_SOURCE_CONNECT(data);
+    int sock;
+    char *buf;
+    int i;
+
+    g_assert(self->listen_socket != -1);
+
+    if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
+       xfer_cancel_with_error(XFER_ELEMENT(self),
+           _("Error accepting incoming connection: %s"), strerror(errno));
+       wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
+       return NULL;
+    }
+
+    /* close the listening socket now, for good measure */
+    close(self->listen_socket);
+    self->listen_socket = -1;
+
+    tu_dbg("connection accepted\n");
+
+    buf = g_malloc(TEST_BLOCK_SIZE);
+    for (i = 0; i < TEST_BLOCK_COUNT; i++) {
+       simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
+       if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
+           error("error in full_write(): %s", strerror(errno));
+       }
+    }
+
+    /* send a smaller block */
+    simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
+    if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
+       error("error in full_write(): %s", strerror(errno));
+    }
+    g_free(buf);
+
+    /* send EOF by closing the socket */
+    close(sock);
+
+    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+    return NULL;
+}
+
+static gboolean
+source_connect_setup_impl(
+    XferElement *elt)
+{
+    XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
+    sockaddr_union addr;
+    DirectTCPAddr *addrs;
+    socklen_t len;
+    int sock;
+
+    /* set up self->listen_socket and set elt->output_listen_addrs */
+    sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
+    if (sock < 0)
+       error("socket(): %s", strerror(errno));
+
+    if (listen(sock, 1) < 0)
+       error("listen(): %s", strerror(errno));
+
+    len = sizeof(addr);
+    if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
+       error("getsockname(): %s", strerror(errno));
+    g_assert(SU_GET_FAMILY(&addr) == AF_INET);
+
+    addrs = g_new0(DirectTCPAddr, 2);
+    addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
+    addrs[0].port = SU_GET_PORT(&addr);
+    elt->output_listen_addrs = addrs;
+
+    return TRUE;
+}
+
+static gboolean
+source_connect_start_impl(
+    XferElement *elt)
+{
+    XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
+
+    simpleprng_seed(&self->prng, RANDOM_SEED);
+
+    self->thread = g_thread_create(source_connect_thread, (gpointer)self, FALSE, NULL);
+
+    return TRUE;
+}
+
+static void
+source_connect_class_init(
+    XferSourceConnectClass * klass)
+{
+    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+    static xfer_element_mech_pair_t mech_pairs[] = {
+       { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 1, 0},
+       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    };
+
+    xec->setup = source_connect_setup_impl;
+    xec->start = source_connect_start_impl;
+    xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_source_connect_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferSourceConnectClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) source_connect_class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferSourceConnect),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) NULL,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceConnect", &info, 0);
+    }
+
+    return type;
+}
+
 /* READFD */
 
 static GType xfer_dest_readfd_get_type(void);
@@ -471,7 +756,7 @@ static gpointer
 dest_readfd_thread(
     gpointer data)
 {
-    XferDestReadfd *self = (XferDestReadfd *)data;
+    XferDestReadfd *self = XFER_DEST_READFD(data);
     char buf[TEST_XFER_SIZE];
     size_t remaining;
     int fd = XFER_ELEMENT(self)->upstream->output_fd;
@@ -504,7 +789,7 @@ static gboolean
 dest_readfd_start_impl(
     XferElement *elt)
 {
-    XferDestReadfd *self = (XferDestReadfd *)elt;
+    XferDestReadfd *self = XFER_DEST_READFD(elt);
 
     simpleprng_seed(&self->prng, RANDOM_SEED);
 
@@ -578,7 +863,7 @@ static gpointer
 dest_writefd_thread(
     gpointer data)
 {
-    XferDestWritefd *self = (XferDestWritefd *)data;
+    XferDestWritefd *self = XFER_DEST_WRITEFD(data);
     char buf[TEST_XFER_SIZE];
     size_t remaining;
     int fd = self->read_fd;
@@ -607,11 +892,11 @@ dest_writefd_thread(
     return NULL;
 }
 
-static void
+static gboolean
 dest_writefd_setup_impl(
     XferElement *elt)
 {
-    XferDestWritefd *self = (XferDestWritefd *)elt;
+    XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
     int p[2];
 
     simpleprng_seed(&self->prng, RANDOM_SEED);
@@ -621,13 +906,15 @@ dest_writefd_setup_impl(
 
     self->read_fd = p[0];
     XFER_ELEMENT(self)->input_fd = p[1];
+
+    return TRUE;
 }
 
 static gboolean
 dest_writefd_start_impl(
     XferElement *elt)
 {
-    XferDestWritefd *self = (XferDestWritefd *)elt;
+    XferDestWritefd *self = XFER_DEST_WRITEFD(elt);
     self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL);
 
     return TRUE;
@@ -686,7 +973,7 @@ static GType xfer_dest_push_get_type(void);
 typedef struct XferDestPush {
     XferElement __parent__;
 
-    char buf[TEST_XFER_SIZE];
+    char *buf;
     size_t bufpos;
 
     GThread *thread;
@@ -703,13 +990,14 @@ dest_push_push_buffer_impl(
     gpointer buf,
     size_t size)
 {
-    XferDestPush *self = (XferDestPush *)elt;
+    XferDestPush *self = XFER_DEST_PUSH(elt);
 
     if (buf == NULL) {
        /* if we're at EOF, verify we got the right bytes */
        g_assert(self->bufpos == TEST_XFER_SIZE);
        if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE))
            g_critical("data entering XferDestPush does not match");
+       g_free(self->buf);
        return;
     }
 
@@ -718,13 +1006,16 @@ dest_push_push_buffer_impl(
     self->bufpos += size;
 }
 
-static void
+static gboolean
 dest_push_setup_impl(
     XferElement *elt)
 {
-    XferDestPush *self = (XferDestPush *)elt;
+    XferDestPush *self = XFER_DEST_PUSH(elt);
 
+    self->buf = g_malloc(TEST_XFER_SIZE);
     simpleprng_seed(&self->prng, RANDOM_SEED);
+
+    return TRUE;
 }
 
 static void
@@ -792,7 +1083,7 @@ static gpointer
 dest_pull_thread(
     gpointer data)
 {
-    XferDestPull *self = (XferDestPull *)data;
+    XferDestPull *self = XFER_DEST_PULL(data);
     char fullbuf[TEST_XFER_SIZE];
     char *buf;
     size_t bufpos = 0;
@@ -818,7 +1109,7 @@ static gboolean
 dest_pull_start_impl(
     XferElement *elt)
 {
-    XferDestPull *self = (XferDestPull *)elt;
+    XferDestPull *self = XFER_DEST_PULL(elt);
 
     simpleprng_seed(&self->prng, RANDOM_SEED);
 
@@ -866,6 +1157,274 @@ xfer_dest_pull_get_type (void)
     return type;
 }
 
+/* LISTEN */
+
+static GType xfer_dest_listen_get_type(void);
+#define XFER_DEST_LISTEN_TYPE (xfer_dest_listen_get_type())
+#define XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen)
+#define XFER_DEST_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_listen_get_type(), XferDestListen const)
+#define XFER_DEST_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_listen_get_type(), XferDestListenClass)
+#define IS_XFER_DEST_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_listen_get_type ())
+#define XFER_DEST_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_listen_get_type(), XferDestListenClass)
+
+typedef struct XferDestListen {
+    XferElement __parent__;
+
+    int listen_socket;
+
+    GThread *thread;
+    simpleprng_state_t prng;
+} XferDestListen;
+
+typedef struct {
+    XferElementClass __parent__;
+} XferDestListenClass;
+
+static gpointer
+dest_listen_thread(
+    gpointer data)
+{
+    XferDestListen *self = XFER_DEST_LISTEN(data);
+    char *buf;
+    size_t bytes = 0;
+    int sock;
+
+    g_assert(self->listen_socket != -1);
+
+    if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
+       xfer_cancel_with_error(XFER_ELEMENT(self),
+           _("Error accepting incoming connection: %s"), strerror(errno));
+       wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
+       return NULL;
+    }
+
+    /* close the listening socket now, for good measure */
+    close(self->listen_socket);
+    self->listen_socket = -1;
+
+    /* read from the socket until EOF or all of the data is read.  We try to
+     * read one extra byte - if we get it, then upstream sent too much data */
+    buf = g_malloc(TEST_XFER_SIZE+1);
+    bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
+    g_assert(bytes == TEST_XFER_SIZE);
+    close(sock);
+
+    /* we're at EOF, so verify we got the right bytes */
+    g_assert(bytes == TEST_XFER_SIZE);
+    if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
+       g_critical("data entering XferDestListen does not match");
+
+    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+    return NULL;
+}
+
+static gboolean
+dest_listen_setup_impl(
+    XferElement *elt)
+{
+    XferDestListen *self = XFER_DEST_LISTEN(elt);
+    sockaddr_union addr;
+    DirectTCPAddr *addrs;
+    socklen_t len;
+    int sock;
+
+    /* set up self->listen_socket and set elt->input_listen_addrs */
+    sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
+    if (sock < 0)
+       error("socket(): %s", strerror(errno));
+
+    if (listen(sock, 1) < 0)
+       error("listen(): %s", strerror(errno));
+
+    len = sizeof(addr);
+    if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
+       error("getsockname(): %s", strerror(errno));
+    g_assert(SU_GET_FAMILY(&addr) == AF_INET);
+
+    addrs = g_new0(DirectTCPAddr, 2);
+    addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
+    addrs[0].port = SU_GET_PORT(&addr);
+    elt->input_listen_addrs = addrs;
+
+    return TRUE;
+}
+
+static gboolean
+dest_listen_start_impl(
+    XferElement *elt)
+{
+    XferDestListen *self = XFER_DEST_LISTEN(elt);
+
+    simpleprng_seed(&self->prng, RANDOM_SEED);
+
+    self->thread = g_thread_create(dest_listen_thread, (gpointer)self, FALSE, NULL);
+
+    return TRUE;
+}
+
+static void
+dest_listen_class_init(
+    XferDestListenClass * klass)
+{
+    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+    static xfer_element_mech_pair_t mech_pairs[] = {
+       { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 1, 1},
+       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    };
+
+    xec->setup = dest_listen_setup_impl;
+    xec->start = dest_listen_start_impl;
+    xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_dest_listen_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferDestListenClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) dest_listen_class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferDestListen),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) NULL,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestListen", &info, 0);
+    }
+
+    return type;
+}
+
+/* CONNET */
+
+static GType xfer_dest_connect_get_type(void);
+#define XFER_DEST_CONNECT_TYPE (xfer_dest_connect_get_type())
+#define XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect)
+#define XFER_DEST_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_connect_get_type(), XferDestConnect const)
+#define XFER_DEST_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_connect_get_type(), XferDestConnectClass)
+#define IS_XFER_DEST_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_connect_get_type ())
+#define XFER_DEST_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_connect_get_type(), XferDestConnectClass)
+
+typedef struct XferDestConnect {
+    XferElement __parent__;
+
+    int connect_socket;
+
+    GThread *thread;
+    simpleprng_state_t prng;
+} XferDestConnect;
+
+typedef struct {
+    XferElementClass __parent__;
+} XferDestConnectClass;
+
+static gpointer
+dest_connect_thread(
+    gpointer data)
+{
+    XferDestConnect *self = XFER_DEST_CONNECT(data);
+    XferElement *elt = XFER_ELEMENT(self);
+    DirectTCPAddr *addrs;
+    sockaddr_union addr;
+    char *buf;
+    size_t bytes = 0;
+    int sock;
+
+    /* set up the sockaddr -- IPv4 only */
+    SU_INIT(&addr, AF_INET);
+    addrs = elt->upstream->output_listen_addrs;
+    g_assert(addrs != NULL);
+    SU_SET_PORT(&addr, addrs->port);
+    ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+
+    tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
+    sock = socket(AF_INET, SOCK_STREAM, 0);
+    if (sock < 0) {
+       error("socket(): %s", strerror(errno));
+    }
+    if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
+       error("connect(): %s", strerror(errno));
+    }
+
+    tu_dbg("connected to %s\n", str_sockaddr(&addr));
+
+    /* read from the socket until EOF or all of the data is read.  We try to
+     * read one extra byte - if we get it, then upstream sent too much data */
+    buf = g_malloc(TEST_XFER_SIZE+1);
+    bytes = full_read(sock, buf, TEST_XFER_SIZE+1);
+    g_assert(bytes == TEST_XFER_SIZE);
+    close(sock);
+
+    /* we're at EOF, so verify we got the right bytes */
+    g_assert(bytes == TEST_XFER_SIZE);
+    if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
+       g_critical("data entering XferDestConnect does not match");
+
+    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+    return NULL;
+}
+
+static gboolean
+dest_connect_start_impl(
+    XferElement *elt)
+{
+    XferDestConnect *self = XFER_DEST_CONNECT(elt);
+
+    simpleprng_seed(&self->prng, RANDOM_SEED);
+
+    self->thread = g_thread_create(dest_connect_thread, (gpointer)self, FALSE, NULL);
+
+    return TRUE;
+}
+
+static void
+dest_connect_class_init(
+    XferDestConnectClass * klass)
+{
+    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+    static xfer_element_mech_pair_t mech_pairs[] = {
+       { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_NONE, 1, 1},
+       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    };
+
+    xec->start = dest_connect_start_impl;
+    xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_dest_connect_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferDestConnectClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) dest_connect_class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferDestConnect),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) NULL,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestConnect", &info, 0);
+    }
+
+    return type;
+}
+
 
 /*
  * Tests
@@ -1042,21 +1601,41 @@ test_glue_combo(
                           (XferElement *)g_object_new(d, NULL)); \
 }
 make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE)
-make_test_glue(test_glue_READFD_WRITE, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_READFD_WRITEFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
 make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE)
 make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_READFD_LISTEN, XFER_SOURCE_READFD_TYPE, XFER_DEST_LISTEN_TYPE)
+make_test_glue(test_glue_READFD_CONNECT, XFER_SOURCE_READFD_TYPE, XFER_DEST_CONNECT_TYPE)
 make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE)
-make_test_glue(test_glue_WRITEFD_WRITE, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_WRITEFD_WRITEFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
 make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE)
 make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_WRITEFD_LISTEN, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_LISTEN_TYPE)
+make_test_glue(test_glue_WRITEFD_CONNECT, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_CONNECT_TYPE)
 make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE)
-make_test_glue(test_glue_PUSH_WRITE, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_PUSH_WRITEFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
 make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE)
 make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_PUSH_LISTEN, XFER_SOURCE_PUSH_TYPE, XFER_DEST_LISTEN_TYPE)
+make_test_glue(test_glue_PUSH_CONNECT, XFER_SOURCE_PUSH_TYPE, XFER_DEST_CONNECT_TYPE)
 make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE)
-make_test_glue(test_glue_PULL_WRITE, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_PULL_WRITEFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
 make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE)
 make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_PULL_LISTEN, XFER_SOURCE_PULL_TYPE, XFER_DEST_LISTEN_TYPE)
+make_test_glue(test_glue_PULL_CONNECT, XFER_SOURCE_PULL_TYPE, XFER_DEST_CONNECT_TYPE)
+make_test_glue(test_glue_LISTEN_READFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_READFD_TYPE)
+make_test_glue(test_glue_LISTEN_WRITEFD, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_LISTEN_PUSH, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PUSH_TYPE)
+make_test_glue(test_glue_LISTEN_PULL, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_LISTEN_LISTEN, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_LISTEN_TYPE)
+make_test_glue(test_glue_LISTEN_CONNECT, XFER_SOURCE_LISTEN_TYPE, XFER_DEST_CONNECT_TYPE)
+make_test_glue(test_glue_CONNECT_READFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_READFD_TYPE)
+make_test_glue(test_glue_CONNECT_WRITEFD, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_CONNECT_PUSH, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PUSH_TYPE)
+make_test_glue(test_glue_CONNECT_PULL, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_CONNECT_LISTEN, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_LISTEN_TYPE)
+make_test_glue(test_glue_CONNECT_CONNECT, XFER_SOURCE_CONNECT_TYPE, XFER_DEST_CONNECT_TYPE)
 
 /*
  * Main driver
@@ -1066,25 +1645,45 @@ int
 main(int argc, char **argv)
 {
     static TestUtilsTest tests[] = {
-       TU_TEST(test_xfer_simple, 10),
-       TU_TEST(test_xfer_files_simple, 10),
-       TU_TEST(test_xfer_files_filter, 10),
-        TU_TEST(test_glue_READFD_READFD, 5),
-        TU_TEST(test_glue_READFD_WRITE, 5),
-        TU_TEST(test_glue_READFD_PUSH, 5),
-        TU_TEST(test_glue_READFD_PULL, 5),
-        TU_TEST(test_glue_WRITEFD_READFD, 5),
-        TU_TEST(test_glue_WRITEFD_WRITE, 5),
-        TU_TEST(test_glue_WRITEFD_PUSH, 5),
-        TU_TEST(test_glue_WRITEFD_PULL, 5),
-        TU_TEST(test_glue_PUSH_READFD, 5),
-        TU_TEST(test_glue_PUSH_WRITE, 5),
-        TU_TEST(test_glue_PUSH_PUSH, 5),
-        TU_TEST(test_glue_PUSH_PULL, 5),
-        TU_TEST(test_glue_PULL_READFD, 5),
-        TU_TEST(test_glue_PULL_WRITE, 5),
-        TU_TEST(test_glue_PULL_PUSH, 5),
-        TU_TEST(test_glue_PULL_PULL, 5),
+       TU_TEST(test_xfer_simple, 90),
+       TU_TEST(test_xfer_files_simple, 90),
+       TU_TEST(test_xfer_files_filter, 90),
+        TU_TEST(test_glue_READFD_READFD, 90),
+        TU_TEST(test_glue_READFD_WRITEFD, 90),
+        TU_TEST(test_glue_READFD_PUSH, 90),
+        TU_TEST(test_glue_READFD_PULL, 90),
+        TU_TEST(test_glue_READFD_LISTEN, 90),
+        TU_TEST(test_glue_READFD_CONNECT, 90),
+        TU_TEST(test_glue_WRITEFD_READFD, 90),
+        TU_TEST(test_glue_WRITEFD_WRITEFD, 90),
+        TU_TEST(test_glue_WRITEFD_PUSH, 90),
+        TU_TEST(test_glue_WRITEFD_PULL, 90),
+        TU_TEST(test_glue_WRITEFD_LISTEN, 90),
+        TU_TEST(test_glue_WRITEFD_CONNECT, 90),
+        TU_TEST(test_glue_PUSH_READFD, 90),
+        TU_TEST(test_glue_PUSH_WRITEFD, 90),
+        TU_TEST(test_glue_PUSH_PUSH, 90),
+        TU_TEST(test_glue_PUSH_PULL, 90),
+        TU_TEST(test_glue_PUSH_LISTEN, 90),
+        TU_TEST(test_glue_PUSH_CONNECT, 90),
+        TU_TEST(test_glue_PULL_READFD, 90),
+        TU_TEST(test_glue_PULL_WRITEFD, 90),
+        TU_TEST(test_glue_PULL_PUSH, 90),
+        TU_TEST(test_glue_PULL_PULL, 90),
+        TU_TEST(test_glue_PULL_LISTEN, 90),
+        TU_TEST(test_glue_PULL_CONNECT, 90),
+        TU_TEST(test_glue_LISTEN_READFD, 90),
+        TU_TEST(test_glue_LISTEN_WRITEFD, 90),
+        TU_TEST(test_glue_LISTEN_PUSH, 90),
+        TU_TEST(test_glue_LISTEN_PULL, 90),
+        TU_TEST(test_glue_LISTEN_LISTEN, 90),
+        TU_TEST(test_glue_LISTEN_CONNECT, 90),
+        TU_TEST(test_glue_CONNECT_READFD, 90),
+        TU_TEST(test_glue_CONNECT_WRITEFD, 90),
+        TU_TEST(test_glue_CONNECT_PUSH, 90),
+        TU_TEST(test_glue_CONNECT_PULL, 90),
+        TU_TEST(test_glue_CONNECT_LISTEN, 90),
+        TU_TEST(test_glue_CONNECT_CONNECT, 90),
        TU_END()
     };