Imported Upstream version 2.6.1
[debian/amanda] / xfer-src / xfer-test.c
diff --git a/xfer-src/xfer-test.c b/xfer-src/xfer-test.c
new file mode 100644 (file)
index 0000000..f50cf13
--- /dev/null
@@ -0,0 +1,1094 @@
+/*
+ * Copyright (c) 2008 Zmanda Inc.  All Rights Reserved.
+ * 
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation.
+ * 
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ * 
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+ * 
+ * Contact information: Zmanda Inc, 465 N Mathlida Ave, Suite 300
+ * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
+ *
+ * Author: Dustin J. Mitchell <dustin@zmanda.com>
+ */
+
+#include "amxfer.h"
+#include "glib-util.h"
+#include "testutils.h"
+#include "amanda.h"
+#include "event.h"
+#include "simpleprng.h"
+
+/* Having tests repeat exactly is an advantage, so we use a hard-coded
+ * random seed. */
+#define RANDOM_SEED 0xf00d
+
+/*
+ * XferElement subclasses
+ *
+ * This file defines a few "private" element classes that each have only one
+ * mechanism pair.  These classes are then used to test all of the possible
+ * combinations of glue.
+ */
+
+/* constants to determine the total amount of data to be transfered; EXTRA is
+ * to test out partial-block handling; it should be prime. */
+#define TEST_BLOCK_SIZE 1024
+#define TEST_BLOCK_COUNT 10
+#define TEST_BLOCK_EXTRA 97
+#define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA)
+
+/* READFD */
+
+static GType xfer_source_readfd_get_type(void);
+#define XFER_SOURCE_READFD_TYPE (xfer_source_readfd_get_type())
+#define XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd)
+#define XFER_SOURCE_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd const)
+#define XFER_SOURCE_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_readfd_get_type(), XferSourceReadfdClass)
+#define IS_XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_readfd_get_type ())
+#define XFER_SOURCE_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_readfd_get_type(), XferSourceReadfdClass)
+
+typedef struct XferSourceReadfd {
+    XferElement __parent__;
+
+    int write_fd;
+    GThread *thread;
+    simpleprng_state_t prng;
+} XferSourceReadfd;
+
+typedef struct {
+    XferElementClass __parent__;
+} XferSourceReadfdClass;
+
+static gpointer
+source_readfd_thread(
+    gpointer data)
+{
+    XferSourceReadfd *self = (XferSourceReadfd *)data;
+    char buf[TEST_XFER_SIZE];
+    int fd = self->write_fd;
+
+    simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
+
+    if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
+       error("error in full_write(): %s", strerror(errno));
+    }
+
+    close(fd);
+
+    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+    return NULL;
+}
+
+static void
+source_readfd_setup_impl(
+    XferElement *elt)
+{
+    XferSourceReadfd *self = (XferSourceReadfd *)elt;
+    int p[2];
+
+    simpleprng_seed(&self->prng, RANDOM_SEED);
+
+    if (pipe(p) < 0)
+       g_critical("Error from pipe(): %s", strerror(errno));
+
+    self->write_fd = p[1];
+    XFER_ELEMENT(self)->output_fd = p[0];
+}
+
+static gboolean
+source_readfd_start_impl(
+    XferElement *elt)
+{
+    XferSourceReadfd *self = (XferSourceReadfd *)elt;
+    self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL);
+
+    return TRUE;
+}
+
+static void
+source_readfd_class_init(
+    XferSourceReadfdClass * klass)
+{
+    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+    static xfer_element_mech_pair_t mech_pairs[] = {
+       { XFER_MECH_NONE, XFER_MECH_READFD, 1, 1},
+       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    };
+
+    xec->setup = source_readfd_setup_impl;
+    xec->start = source_readfd_start_impl;
+    xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_source_readfd_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferSourceReadfdClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) source_readfd_class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferSourceReadfd),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) NULL,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceReadfd", &info, 0);
+    }
+
+    return type;
+}
+
+/* WRITEFD */
+
+static GType xfer_source_writefd_get_type(void);
+#define XFER_SOURCE_WRITEFD_TYPE (xfer_source_writefd_get_type())
+#define XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd)
+#define XFER_SOURCE_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd const)
+#define XFER_SOURCE_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_writefd_get_type(), XferSourceWritefdClass)
+#define IS_XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_writefd_get_type ())
+#define XFER_SOURCE_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_writefd_get_type(), XferSourceWritefdClass)
+
+typedef struct XferSourceWritefd {
+    XferElement __parent__;
+
+    GThread *thread;
+    simpleprng_state_t prng;
+} XferSourceWritefd;
+
+typedef struct {
+    XferElementClass __parent__;
+} XferSourceWritefdClass;
+
+static gpointer
+source_writefd_thread(
+    gpointer data)
+{
+    XferSourceWritefd *self = (XferSourceWritefd *)data;
+    char buf[TEST_XFER_SIZE];
+    int fd = XFER_ELEMENT(self)->downstream->input_fd;
+
+    simpleprng_fill_buffer(&self->prng, buf, sizeof(buf));
+
+    if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) {
+       error("error in full_write(): %s", strerror(errno));
+    }
+
+    close(fd);
+    XFER_ELEMENT(self)->downstream->input_fd = -1;
+
+    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+    return NULL;
+}
+
+static gboolean
+source_writefd_start_impl(
+    XferElement *elt)
+{
+    XferSourceWritefd *self = (XferSourceWritefd *)elt;
+
+    simpleprng_seed(&self->prng, RANDOM_SEED);
+
+    self->thread = g_thread_create(source_writefd_thread, (gpointer)self, FALSE, NULL);
+
+    return TRUE;
+}
+
+static void
+source_writefd_class_init(
+    XferSourceWritefdClass * klass)
+{
+    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+    static xfer_element_mech_pair_t mech_pairs[] = {
+       { XFER_MECH_NONE, XFER_MECH_WRITEFD, 1, 1},
+       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    };
+
+    xec->start = source_writefd_start_impl;
+    xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_source_writefd_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferSourceWritefdClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) source_writefd_class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferSourceWritefd),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) NULL,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceWritefd", &info, 0);
+    }
+
+    return type;
+}
+
+/* PUSH_BUFFER */
+
+static GType xfer_source_push_get_type(void);
+#define XFER_SOURCE_PUSH_TYPE (xfer_source_push_get_type())
+#define XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush)
+#define XFER_SOURCE_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush const)
+#define XFER_SOURCE_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_push_get_type(), XferSourcePushClass)
+#define IS_XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_push_get_type ())
+#define XFER_SOURCE_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_push_get_type(), XferSourcePushClass)
+
+typedef struct XferSourcePush {
+    XferElement __parent__;
+
+    GThread *thread;
+    simpleprng_state_t prng;
+} XferSourcePush;
+
+typedef struct {
+    XferElementClass __parent__;
+} XferSourcePushClass;
+
+static gpointer
+source_push_thread(
+    gpointer data)
+{
+    XferSourcePush *self = (XferSourcePush *)data;
+    char *buf;
+    int i;
+
+    for (i = 0; i < TEST_BLOCK_COUNT; i++) {
+       buf = g_malloc(TEST_BLOCK_SIZE);
+       simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
+       xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_SIZE);
+       buf = NULL;
+    }
+
+    /* send a smaller block */
+    buf = g_malloc(TEST_BLOCK_EXTRA);
+    simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
+    xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_EXTRA);
+    buf = NULL;
+
+    /* send EOF */
+    xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, NULL, 0);
+
+    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+    return NULL;
+}
+
+static gboolean
+source_push_start_impl(
+    XferElement *elt)
+{
+    XferSourcePush *self = (XferSourcePush *)elt;
+
+    simpleprng_seed(&self->prng, RANDOM_SEED);
+
+    self->thread = g_thread_create(source_push_thread, (gpointer)self, FALSE, NULL);
+
+    return TRUE;
+}
+
+static void
+source_push_class_init(
+    XferSourcePushClass * klass)
+{
+    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+    static xfer_element_mech_pair_t mech_pairs[] = {
+       { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, 1, 1},
+       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    };
+
+    xec->start = source_push_start_impl;
+    xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_source_push_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferSourcePushClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) source_push_class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferSourcePush),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) NULL,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePush", &info, 0);
+    }
+
+    return type;
+}
+
+/* PULL_BUFFER */
+
+static GType xfer_source_pull_get_type(void);
+#define XFER_SOURCE_PULL_TYPE (xfer_source_pull_get_type())
+#define XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull)
+#define XFER_SOURCE_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull const)
+#define XFER_SOURCE_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_pull_get_type(), XferSourcePullClass)
+#define IS_XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_pull_get_type ())
+#define XFER_SOURCE_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_pull_get_type(), XferSourcePullClass)
+
+typedef struct XferSourcePull {
+    XferElement __parent__;
+
+    gint nbuffers;
+    GThread *thread;
+    simpleprng_state_t prng;
+} XferSourcePull;
+
+typedef struct {
+    XferElementClass __parent__;
+} XferSourcePullClass;
+
+static gpointer
+source_pull_pull_buffer_impl(
+    XferElement *elt,
+    size_t *size)
+{
+    XferSourcePull *self = (XferSourcePull *)elt;
+    char *buf;
+    size_t bufsiz;
+
+    if (self->nbuffers > TEST_BLOCK_COUNT) {
+       *size = 0;
+       return NULL;
+    }
+    bufsiz = (self->nbuffers != TEST_BLOCK_COUNT)? TEST_BLOCK_SIZE : TEST_BLOCK_EXTRA;
+
+    self->nbuffers++;
+
+    buf = g_malloc(bufsiz);
+    simpleprng_fill_buffer(&self->prng, buf, bufsiz);
+    *size = bufsiz;
+    return buf;
+}
+
+static void
+source_pull_setup_impl(
+    XferElement *elt)
+{
+    XferSourcePull *self = (XferSourcePull *)elt;
+
+    simpleprng_seed(&self->prng, RANDOM_SEED);
+}
+
+static void
+source_pull_class_init(
+    XferSourcePullClass * klass)
+{
+    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+    static xfer_element_mech_pair_t mech_pairs[] = {
+       { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
+       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    };
+
+    xec->pull_buffer = source_pull_pull_buffer_impl;
+    xec->setup = source_pull_setup_impl;
+    xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_source_pull_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferSourcePullClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) source_pull_class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferSourcePull),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) NULL,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePull", &info, 0);
+    }
+
+    return type;
+}
+
+/* READFD */
+
+static GType xfer_dest_readfd_get_type(void);
+#define XFER_DEST_READFD_TYPE (xfer_dest_readfd_get_type())
+#define XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd)
+#define XFER_DEST_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd const)
+#define XFER_DEST_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_readfd_get_type(), XferDestReadfdClass)
+#define IS_XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_readfd_get_type ())
+#define XFER_DEST_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_readfd_get_type(), XferDestReadfdClass)
+
+typedef struct XferDestReadfd {
+    XferElement __parent__;
+
+    GThread *thread;
+    simpleprng_state_t prng;
+} XferDestReadfd;
+
+typedef struct {
+    XferElementClass __parent__;
+} XferDestReadfdClass;
+
+static gpointer
+dest_readfd_thread(
+    gpointer data)
+{
+    XferDestReadfd *self = (XferDestReadfd *)data;
+    char buf[TEST_XFER_SIZE];
+    size_t remaining;
+    int fd = XFER_ELEMENT(self)->upstream->output_fd;
+
+    remaining = sizeof(buf);
+    while (remaining) {
+       ssize_t nread;
+       if ((nread = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
+           error("error in read(): %s", strerror(errno));
+       }
+       remaining -= nread;
+    }
+
+    /* we should be at EOF here */
+    if (read(fd, buf, 10) != 0)
+       g_critical("too much data entering XferDestReadfd");
+
+    if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
+       g_critical("data entering XferDestReadfd does not match");
+
+    close(fd);
+    XFER_ELEMENT(self)->upstream->output_fd = -1;
+
+    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+    return NULL;
+}
+
+static gboolean
+dest_readfd_start_impl(
+    XferElement *elt)
+{
+    XferDestReadfd *self = (XferDestReadfd *)elt;
+
+    simpleprng_seed(&self->prng, RANDOM_SEED);
+
+    self->thread = g_thread_create(dest_readfd_thread, (gpointer)self, FALSE, NULL);
+
+    return TRUE;
+}
+
+static void
+dest_readfd_class_init(
+    XferDestReadfdClass * klass)
+{
+    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+    static xfer_element_mech_pair_t mech_pairs[] = {
+       { XFER_MECH_READFD, XFER_MECH_NONE, 1, 1},
+       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    };
+
+    xec->start = dest_readfd_start_impl;
+    xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_dest_readfd_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferDestReadfdClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) dest_readfd_class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferDestReadfd),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) NULL,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestReadfd", &info, 0);
+    }
+
+    return type;
+}
+
+/* WRITEFD */
+
+static GType xfer_dest_writefd_get_type(void);
+#define XFER_DEST_WRITEFD_TYPE (xfer_dest_writefd_get_type())
+#define XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd)
+#define XFER_DEST_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd const)
+#define XFER_DEST_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_writefd_get_type(), XferDestWritefdClass)
+#define IS_XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_writefd_get_type ())
+#define XFER_DEST_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_writefd_get_type(), XferDestWritefdClass)
+
+typedef struct XferDestWritefd {
+    XferElement __parent__;
+
+    int read_fd;
+    GThread *thread;
+    simpleprng_state_t prng;
+} XferDestWritefd;
+
+typedef struct {
+    XferElementClass __parent__;
+} XferDestWritefdClass;
+
+static gpointer
+dest_writefd_thread(
+    gpointer data)
+{
+    XferDestWritefd *self = (XferDestWritefd *)data;
+    char buf[TEST_XFER_SIZE];
+    size_t remaining;
+    int fd = self->read_fd;
+
+    remaining = sizeof(buf);
+    while (remaining) {
+       ssize_t nwrite;
+       if ((nwrite = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) {
+           error("error in read(): %s", strerror(errno));
+       }
+       remaining -= nwrite;
+    }
+
+    /* we should be at EOF here */
+    if (read(fd, buf, 10) != 0)
+       g_critical("too much data entering XferDestWritefd");
+
+    if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE))
+       g_critical("data entering XferDestWritefd does not match");
+
+    close(fd);
+    XFER_ELEMENT(self)->upstream->output_fd = -1;
+
+    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+    return NULL;
+}
+
+static void
+dest_writefd_setup_impl(
+    XferElement *elt)
+{
+    XferDestWritefd *self = (XferDestWritefd *)elt;
+    int p[2];
+
+    simpleprng_seed(&self->prng, RANDOM_SEED);
+
+    if (pipe(p) < 0)
+       g_critical("Error from pipe(): %s", strerror(errno));
+
+    self->read_fd = p[0];
+    XFER_ELEMENT(self)->input_fd = p[1];
+}
+
+static gboolean
+dest_writefd_start_impl(
+    XferElement *elt)
+{
+    XferDestWritefd *self = (XferDestWritefd *)elt;
+    self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL);
+
+    return TRUE;
+}
+
+static void
+dest_writefd_class_init(
+    XferDestWritefdClass * klass)
+{
+    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+    static xfer_element_mech_pair_t mech_pairs[] = {
+       { XFER_MECH_WRITEFD, XFER_MECH_NONE, 1, 1},
+       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    };
+
+    xec->setup = dest_writefd_setup_impl;
+    xec->start = dest_writefd_start_impl;
+    xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_dest_writefd_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferDestWritefdClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) dest_writefd_class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferDestWritefd),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) NULL,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestWritefd", &info, 0);
+    }
+
+    return type;
+}
+
+/* PUSH_BUFFER */
+
+static GType xfer_dest_push_get_type(void);
+#define XFER_DEST_PUSH_TYPE (xfer_dest_push_get_type())
+#define XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush)
+#define XFER_DEST_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush const)
+#define XFER_DEST_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_push_get_type(), XferDestPushClass)
+#define IS_XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_push_get_type ())
+#define XFER_DEST_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_push_get_type(), XferDestPushClass)
+
+typedef struct XferDestPush {
+    XferElement __parent__;
+
+    char buf[TEST_XFER_SIZE];
+    size_t bufpos;
+
+    GThread *thread;
+    simpleprng_state_t prng;
+} XferDestPush;
+
+typedef struct {
+    XferElementClass __parent__;
+} XferDestPushClass;
+
+static void
+dest_push_push_buffer_impl(
+    XferElement *elt,
+    gpointer buf,
+    size_t size)
+{
+    XferDestPush *self = (XferDestPush *)elt;
+
+    if (buf == NULL) {
+       /* if we're at EOF, verify we got the right bytes */
+       g_assert(self->bufpos == TEST_XFER_SIZE);
+       if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE))
+           g_critical("data entering XferDestPush does not match");
+       return;
+    }
+
+    g_assert(self->bufpos + size <= TEST_XFER_SIZE);
+    memcpy(self->buf + self->bufpos, buf, size);
+    self->bufpos += size;
+}
+
+static void
+dest_push_setup_impl(
+    XferElement *elt)
+{
+    XferDestPush *self = (XferDestPush *)elt;
+
+    simpleprng_seed(&self->prng, RANDOM_SEED);
+}
+
+static void
+dest_push_class_init(
+    XferDestPushClass * klass)
+{
+    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+    static xfer_element_mech_pair_t mech_pairs[] = {
+       { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 0},
+       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    };
+
+    xec->push_buffer = dest_push_push_buffer_impl;
+    xec->setup = dest_push_setup_impl;
+    xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_dest_push_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferDestPushClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) dest_push_class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferDestPush),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) NULL,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPush", &info, 0);
+    }
+
+    return type;
+}
+
+/* PULL_BUFFER */
+
+static GType xfer_dest_pull_get_type(void);
+#define XFER_DEST_PULL_TYPE (xfer_dest_pull_get_type())
+#define XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull)
+#define XFER_DEST_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull const)
+#define XFER_DEST_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_pull_get_type(), XferDestPullClass)
+#define IS_XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_pull_get_type ())
+#define XFER_DEST_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_pull_get_type(), XferDestPullClass)
+
+typedef struct XferDestPull {
+    XferElement __parent__;
+
+    GThread *thread;
+    simpleprng_state_t prng;
+} XferDestPull;
+
+typedef struct {
+    XferElementClass __parent__;
+} XferDestPullClass;
+
+static gpointer
+dest_pull_thread(
+    gpointer data)
+{
+    XferDestPull *self = (XferDestPull *)data;
+    char fullbuf[TEST_XFER_SIZE];
+    char *buf;
+    size_t bufpos = 0;
+    size_t size;
+
+    while ((buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size))) {
+       g_assert(bufpos + size <= TEST_XFER_SIZE);
+       memcpy(fullbuf + bufpos, buf, size);
+       bufpos += size;
+    }
+
+    /* we're at EOF, so verify we got the right bytes */
+    g_assert(bufpos == TEST_XFER_SIZE);
+    if (!simpleprng_verify_buffer(&self->prng, fullbuf, TEST_XFER_SIZE))
+       g_critical("data entering XferDestPull does not match");
+
+    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+    return NULL;
+}
+
+static gboolean
+dest_pull_start_impl(
+    XferElement *elt)
+{
+    XferDestPull *self = (XferDestPull *)elt;
+
+    simpleprng_seed(&self->prng, RANDOM_SEED);
+
+    self->thread = g_thread_create(dest_pull_thread, (gpointer)self, FALSE, NULL);
+
+    return TRUE;
+}
+
+static void
+dest_pull_class_init(
+    XferDestPullClass * klass)
+{
+    XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+    static xfer_element_mech_pair_t mech_pairs[] = {
+       { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 1, 1},
+       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    };
+
+    xec->start = dest_pull_start_impl;
+    xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_dest_pull_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferDestPullClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) dest_pull_class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferDestPull),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) NULL,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPull", &info, 0);
+    }
+
+    return type;
+}
+
+
+/*
+ * Tests
+ */
+
+static void
+test_xfer_generic_callback(
+    gpointer data G_GNUC_UNUSED,
+    XMsg *msg,
+    Xfer *xfer)
+{
+    tu_dbg("Received message %s\n", xmsg_repr(msg));
+
+    switch (msg->type) {
+       case XMSG_DONE:
+           /* are we done? */
+           if (xfer->status == XFER_DONE) {
+               tu_dbg("all elements are done!\n");
+               g_main_loop_quit(default_main_loop());
+           }
+           break;
+
+       default:
+           break;
+    }
+}
+
+/****
+ * Run a simple transfer with some xor filters
+ */
+
+static int
+test_xfer_simple(void)
+{
+    unsigned int i;
+    GSource *src;
+    XferElement *elements[] = {
+       xfer_source_random(100*1024, RANDOM_SEED),
+       xfer_filter_xor('d'),
+       xfer_filter_xor('d'),
+       xfer_dest_null(RANDOM_SEED),
+    };
+
+    Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
+    src = xfer_get_source(xfer);
+    g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
+    g_source_attach(src, NULL);
+    tu_dbg("Transfer: %s\n", xfer_repr(xfer));
+
+    /* unreference the elements */
+    for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
+       g_object_unref(elements[i]);
+       g_assert(G_OBJECT(elements[i])->ref_count == 1);
+       elements[i] = NULL;
+    }
+
+    xfer_start(xfer);
+
+    g_main_loop_run(default_main_loop());
+    g_assert(xfer->status == XFER_DONE);
+
+    xfer_unref(xfer);
+
+    return 1;
+}
+
+/****
+ * Run a transfer between two files, with or without filters
+ */
+
+static int
+test_xfer_files(gboolean add_filters)
+{
+    unsigned int i;
+    unsigned int elts;
+    GSource *src;
+    char *in_filename = __FILE__;
+    char *out_filename = "xfer-test.tmp"; /* current directory is writeable */
+    int rfd, wfd;
+    Xfer *xfer;
+    XferElement *elements[4];
+
+    rfd = open(in_filename, O_RDONLY, 0);
+    if (rfd < 0)
+       g_critical("Could not open '%s': %s", in_filename, strerror(errno));
+
+    wfd = open(out_filename, O_WRONLY|O_CREAT, 0777);
+    if (wfd < 0)
+       g_critical("Could not open '%s': %s", out_filename, strerror(errno));
+
+    elts = 0;
+    elements[elts++] = xfer_source_fd(rfd);
+    if (add_filters) {
+       elements[elts++] = xfer_filter_xor(0xab);
+       elements[elts++] = xfer_filter_xor(0xab);
+    }
+    elements[elts++] = xfer_dest_fd(wfd);
+
+    xfer = xfer_new(elements, elts);
+    src = xfer_get_source(xfer);
+    g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
+    g_source_attach(src, NULL);
+    tu_dbg("Transfer: %s\n", xfer_repr(xfer));
+
+    /* unreference the elements */
+    for (i = 0; i < elts; i++) {
+       g_object_unref(elements[i]);
+       g_assert(G_OBJECT(elements[i])->ref_count == 1);
+       elements[i] = NULL;
+    }
+
+    xfer_start(xfer);
+
+    g_main_loop_run(default_main_loop());
+    g_assert(xfer->status == XFER_DONE);
+
+    xfer_unref(xfer);
+
+    unlink(out_filename); /* ignore any errors */
+
+    return 1;
+}
+
+static int
+test_xfer_files_simple(void)
+{
+    return test_xfer_files(FALSE);
+}
+
+static int
+test_xfer_files_filter(void)
+{
+    return test_xfer_files(TRUE);
+}
+
+/*****
+ * test each possible combination of source and destination mechansim
+ */
+
+static int
+test_glue_combo(
+    XferElement *source,
+    XferElement *dest)
+{
+    unsigned int i;
+    GSource *src;
+    XferElement *elements[] = { source, dest };
+
+    Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements));
+    src = xfer_get_source(xfer);
+    g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL);
+    g_source_attach(src, NULL);
+
+    /* unreference the elements */
+    for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) {
+       g_object_unref(elements[i]);
+       g_assert(G_OBJECT(elements[i])->ref_count == 1);
+       elements[i] = NULL;
+    }
+
+    xfer_start(xfer);
+
+    g_main_loop_run(default_main_loop());
+    g_assert(xfer->status == XFER_DONE);
+
+    xfer_unref(xfer);
+
+    return 1;
+}
+
+#define make_test_glue(n, s, d) static int n(void) \
+{\
+    return test_glue_combo((XferElement *)g_object_new(s, NULL), \
+                          (XferElement *)g_object_new(d, NULL)); \
+}
+make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE)
+make_test_glue(test_glue_READFD_WRITE, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE)
+make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE)
+make_test_glue(test_glue_WRITEFD_WRITE, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE)
+make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE)
+make_test_glue(test_glue_PUSH_WRITE, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE)
+make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE)
+make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE)
+make_test_glue(test_glue_PULL_WRITE, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE)
+make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE)
+make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE)
+
+/*
+ * Main driver
+ */
+
+int
+main(int argc, char **argv)
+{
+    static TestUtilsTest tests[] = {
+       TU_TEST(test_xfer_simple, 10),
+       TU_TEST(test_xfer_files_simple, 10),
+       TU_TEST(test_xfer_files_filter, 10),
+        TU_TEST(test_glue_READFD_READFD, 5),
+        TU_TEST(test_glue_READFD_WRITE, 5),
+        TU_TEST(test_glue_READFD_PUSH, 5),
+        TU_TEST(test_glue_READFD_PULL, 5),
+        TU_TEST(test_glue_WRITEFD_READFD, 5),
+        TU_TEST(test_glue_WRITEFD_WRITE, 5),
+        TU_TEST(test_glue_WRITEFD_PUSH, 5),
+        TU_TEST(test_glue_WRITEFD_PULL, 5),
+        TU_TEST(test_glue_PUSH_READFD, 5),
+        TU_TEST(test_glue_PUSH_WRITE, 5),
+        TU_TEST(test_glue_PUSH_PUSH, 5),
+        TU_TEST(test_glue_PUSH_PULL, 5),
+        TU_TEST(test_glue_PULL_READFD, 5),
+        TU_TEST(test_glue_PULL_WRITE, 5),
+        TU_TEST(test_glue_PULL_PUSH, 5),
+        TU_TEST(test_glue_PULL_PULL, 5),
+       TU_END()
+    };
+
+    glib_init();
+
+    return testutils_run_tests(argc, argv, tests);
+}