X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=xfer-src%2Fxfer-test.c;fp=xfer-src%2Fxfer-test.c;h=f50cf132b52502ac794e6998c489a7a1de294d90;hb=2627875b7d18858bc1f9f7652811e4d8c15a23eb;hp=0000000000000000000000000000000000000000;hpb=fb2bd066c2f8b34addafe48d62550e3033a59431;p=debian%2Famanda diff --git a/xfer-src/xfer-test.c b/xfer-src/xfer-test.c new file mode 100644 index 0000000..f50cf13 --- /dev/null +++ b/xfer-src/xfer-test.c @@ -0,0 +1,1094 @@ +/* + * Copyright (c) 2008 Zmanda Inc. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Contact information: Zmanda Inc, 465 N Mathlida Ave, Suite 300 + * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com + * + * Author: Dustin J. Mitchell + */ + +#include "amxfer.h" +#include "glib-util.h" +#include "testutils.h" +#include "amanda.h" +#include "event.h" +#include "simpleprng.h" + +/* Having tests repeat exactly is an advantage, so we use a hard-coded + * random seed. */ +#define RANDOM_SEED 0xf00d + +/* + * XferElement subclasses + * + * This file defines a few "private" element classes that each have only one + * mechanism pair. These classes are then used to test all of the possible + * combinations of glue. + */ + +/* constants to determine the total amount of data to be transfered; EXTRA is + * to test out partial-block handling; it should be prime. */ +#define TEST_BLOCK_SIZE 1024 +#define TEST_BLOCK_COUNT 10 +#define TEST_BLOCK_EXTRA 97 +#define TEST_XFER_SIZE ((TEST_BLOCK_SIZE*TEST_BLOCK_COUNT)+TEST_BLOCK_EXTRA) + +/* READFD */ + +static GType xfer_source_readfd_get_type(void); +#define XFER_SOURCE_READFD_TYPE (xfer_source_readfd_get_type()) +#define XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd) +#define XFER_SOURCE_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_readfd_get_type(), XferSourceReadfd const) +#define XFER_SOURCE_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_readfd_get_type(), XferSourceReadfdClass) +#define IS_XFER_SOURCE_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_readfd_get_type ()) +#define XFER_SOURCE_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_readfd_get_type(), XferSourceReadfdClass) + +typedef struct XferSourceReadfd { + XferElement __parent__; + + int write_fd; + GThread *thread; + simpleprng_state_t prng; +} XferSourceReadfd; + +typedef struct { + XferElementClass __parent__; +} XferSourceReadfdClass; + +static gpointer +source_readfd_thread( + gpointer data) +{ + XferSourceReadfd *self = (XferSourceReadfd *)data; + char buf[TEST_XFER_SIZE]; + int fd = self->write_fd; + + simpleprng_fill_buffer(&self->prng, buf, sizeof(buf)); + + if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) { + error("error in full_write(): %s", strerror(errno)); + } + + close(fd); + + xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0)); + + return NULL; +} + +static void +source_readfd_setup_impl( + XferElement *elt) +{ + XferSourceReadfd *self = (XferSourceReadfd *)elt; + int p[2]; + + simpleprng_seed(&self->prng, RANDOM_SEED); + + if (pipe(p) < 0) + g_critical("Error from pipe(): %s", strerror(errno)); + + self->write_fd = p[1]; + XFER_ELEMENT(self)->output_fd = p[0]; +} + +static gboolean +source_readfd_start_impl( + XferElement *elt) +{ + XferSourceReadfd *self = (XferSourceReadfd *)elt; + self->thread = g_thread_create(source_readfd_thread, (gpointer)self, FALSE, NULL); + + return TRUE; +} + +static void +source_readfd_class_init( + XferSourceReadfdClass * klass) +{ + XferElementClass *xec = XFER_ELEMENT_CLASS(klass); + static xfer_element_mech_pair_t mech_pairs[] = { + { XFER_MECH_NONE, XFER_MECH_READFD, 1, 1}, + { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + }; + + xec->setup = source_readfd_setup_impl; + xec->start = source_readfd_start_impl; + xec->mech_pairs = mech_pairs; +} + +GType +xfer_source_readfd_get_type (void) +{ + static GType type = 0; + + if G_UNLIKELY(type == 0) { + static const GTypeInfo info = { + sizeof (XferSourceReadfdClass), + (GBaseInitFunc) NULL, + (GBaseFinalizeFunc) NULL, + (GClassInitFunc) source_readfd_class_init, + (GClassFinalizeFunc) NULL, + NULL /* class_data */, + sizeof (XferSourceReadfd), + 0 /* n_preallocs */, + (GInstanceInitFunc) NULL, + NULL + }; + + type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceReadfd", &info, 0); + } + + return type; +} + +/* WRITEFD */ + +static GType xfer_source_writefd_get_type(void); +#define XFER_SOURCE_WRITEFD_TYPE (xfer_source_writefd_get_type()) +#define XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd) +#define XFER_SOURCE_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_writefd_get_type(), XferSourceWritefd const) +#define XFER_SOURCE_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_writefd_get_type(), XferSourceWritefdClass) +#define IS_XFER_SOURCE_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_writefd_get_type ()) +#define XFER_SOURCE_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_writefd_get_type(), XferSourceWritefdClass) + +typedef struct XferSourceWritefd { + XferElement __parent__; + + GThread *thread; + simpleprng_state_t prng; +} XferSourceWritefd; + +typedef struct { + XferElementClass __parent__; +} XferSourceWritefdClass; + +static gpointer +source_writefd_thread( + gpointer data) +{ + XferSourceWritefd *self = (XferSourceWritefd *)data; + char buf[TEST_XFER_SIZE]; + int fd = XFER_ELEMENT(self)->downstream->input_fd; + + simpleprng_fill_buffer(&self->prng, buf, sizeof(buf)); + + if (full_write(fd, buf, sizeof(buf)) < sizeof(buf)) { + error("error in full_write(): %s", strerror(errno)); + } + + close(fd); + XFER_ELEMENT(self)->downstream->input_fd = -1; + + xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0)); + + return NULL; +} + +static gboolean +source_writefd_start_impl( + XferElement *elt) +{ + XferSourceWritefd *self = (XferSourceWritefd *)elt; + + simpleprng_seed(&self->prng, RANDOM_SEED); + + self->thread = g_thread_create(source_writefd_thread, (gpointer)self, FALSE, NULL); + + return TRUE; +} + +static void +source_writefd_class_init( + XferSourceWritefdClass * klass) +{ + XferElementClass *xec = XFER_ELEMENT_CLASS(klass); + static xfer_element_mech_pair_t mech_pairs[] = { + { XFER_MECH_NONE, XFER_MECH_WRITEFD, 1, 1}, + { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + }; + + xec->start = source_writefd_start_impl; + xec->mech_pairs = mech_pairs; +} + +GType +xfer_source_writefd_get_type (void) +{ + static GType type = 0; + + if G_UNLIKELY(type == 0) { + static const GTypeInfo info = { + sizeof (XferSourceWritefdClass), + (GBaseInitFunc) NULL, + (GBaseFinalizeFunc) NULL, + (GClassInitFunc) source_writefd_class_init, + (GClassFinalizeFunc) NULL, + NULL /* class_data */, + sizeof (XferSourceWritefd), + 0 /* n_preallocs */, + (GInstanceInitFunc) NULL, + NULL + }; + + type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceWritefd", &info, 0); + } + + return type; +} + +/* PUSH_BUFFER */ + +static GType xfer_source_push_get_type(void); +#define XFER_SOURCE_PUSH_TYPE (xfer_source_push_get_type()) +#define XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush) +#define XFER_SOURCE_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_push_get_type(), XferSourcePush const) +#define XFER_SOURCE_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_push_get_type(), XferSourcePushClass) +#define IS_XFER_SOURCE_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_push_get_type ()) +#define XFER_SOURCE_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_push_get_type(), XferSourcePushClass) + +typedef struct XferSourcePush { + XferElement __parent__; + + GThread *thread; + simpleprng_state_t prng; +} XferSourcePush; + +typedef struct { + XferElementClass __parent__; +} XferSourcePushClass; + +static gpointer +source_push_thread( + gpointer data) +{ + XferSourcePush *self = (XferSourcePush *)data; + char *buf; + int i; + + for (i = 0; i < TEST_BLOCK_COUNT; i++) { + buf = g_malloc(TEST_BLOCK_SIZE); + simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE); + xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_SIZE); + buf = NULL; + } + + /* send a smaller block */ + buf = g_malloc(TEST_BLOCK_EXTRA); + simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA); + xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, buf, TEST_BLOCK_EXTRA); + buf = NULL; + + /* send EOF */ + xfer_element_push_buffer(XFER_ELEMENT(self)->downstream, NULL, 0); + + xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0)); + + return NULL; +} + +static gboolean +source_push_start_impl( + XferElement *elt) +{ + XferSourcePush *self = (XferSourcePush *)elt; + + simpleprng_seed(&self->prng, RANDOM_SEED); + + self->thread = g_thread_create(source_push_thread, (gpointer)self, FALSE, NULL); + + return TRUE; +} + +static void +source_push_class_init( + XferSourcePushClass * klass) +{ + XferElementClass *xec = XFER_ELEMENT_CLASS(klass); + static xfer_element_mech_pair_t mech_pairs[] = { + { XFER_MECH_NONE, XFER_MECH_PUSH_BUFFER, 1, 1}, + { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + }; + + xec->start = source_push_start_impl; + xec->mech_pairs = mech_pairs; +} + +GType +xfer_source_push_get_type (void) +{ + static GType type = 0; + + if G_UNLIKELY(type == 0) { + static const GTypeInfo info = { + sizeof (XferSourcePushClass), + (GBaseInitFunc) NULL, + (GBaseFinalizeFunc) NULL, + (GClassInitFunc) source_push_class_init, + (GClassFinalizeFunc) NULL, + NULL /* class_data */, + sizeof (XferSourcePush), + 0 /* n_preallocs */, + (GInstanceInitFunc) NULL, + NULL + }; + + type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePush", &info, 0); + } + + return type; +} + +/* PULL_BUFFER */ + +static GType xfer_source_pull_get_type(void); +#define XFER_SOURCE_PULL_TYPE (xfer_source_pull_get_type()) +#define XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull) +#define XFER_SOURCE_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_pull_get_type(), XferSourcePull const) +#define XFER_SOURCE_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_pull_get_type(), XferSourcePullClass) +#define IS_XFER_SOURCE_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_pull_get_type ()) +#define XFER_SOURCE_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_pull_get_type(), XferSourcePullClass) + +typedef struct XferSourcePull { + XferElement __parent__; + + gint nbuffers; + GThread *thread; + simpleprng_state_t prng; +} XferSourcePull; + +typedef struct { + XferElementClass __parent__; +} XferSourcePullClass; + +static gpointer +source_pull_pull_buffer_impl( + XferElement *elt, + size_t *size) +{ + XferSourcePull *self = (XferSourcePull *)elt; + char *buf; + size_t bufsiz; + + if (self->nbuffers > TEST_BLOCK_COUNT) { + *size = 0; + return NULL; + } + bufsiz = (self->nbuffers != TEST_BLOCK_COUNT)? TEST_BLOCK_SIZE : TEST_BLOCK_EXTRA; + + self->nbuffers++; + + buf = g_malloc(bufsiz); + simpleprng_fill_buffer(&self->prng, buf, bufsiz); + *size = bufsiz; + return buf; +} + +static void +source_pull_setup_impl( + XferElement *elt) +{ + XferSourcePull *self = (XferSourcePull *)elt; + + simpleprng_seed(&self->prng, RANDOM_SEED); +} + +static void +source_pull_class_init( + XferSourcePullClass * klass) +{ + XferElementClass *xec = XFER_ELEMENT_CLASS(klass); + static xfer_element_mech_pair_t mech_pairs[] = { + { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0}, + { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + }; + + xec->pull_buffer = source_pull_pull_buffer_impl; + xec->setup = source_pull_setup_impl; + xec->mech_pairs = mech_pairs; +} + +GType +xfer_source_pull_get_type (void) +{ + static GType type = 0; + + if G_UNLIKELY(type == 0) { + static const GTypeInfo info = { + sizeof (XferSourcePullClass), + (GBaseInitFunc) NULL, + (GBaseFinalizeFunc) NULL, + (GClassInitFunc) source_pull_class_init, + (GClassFinalizeFunc) NULL, + NULL /* class_data */, + sizeof (XferSourcePull), + 0 /* n_preallocs */, + (GInstanceInitFunc) NULL, + NULL + }; + + type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourcePull", &info, 0); + } + + return type; +} + +/* READFD */ + +static GType xfer_dest_readfd_get_type(void); +#define XFER_DEST_READFD_TYPE (xfer_dest_readfd_get_type()) +#define XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd) +#define XFER_DEST_READFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_readfd_get_type(), XferDestReadfd const) +#define XFER_DEST_READFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_readfd_get_type(), XferDestReadfdClass) +#define IS_XFER_DEST_READFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_readfd_get_type ()) +#define XFER_DEST_READFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_readfd_get_type(), XferDestReadfdClass) + +typedef struct XferDestReadfd { + XferElement __parent__; + + GThread *thread; + simpleprng_state_t prng; +} XferDestReadfd; + +typedef struct { + XferElementClass __parent__; +} XferDestReadfdClass; + +static gpointer +dest_readfd_thread( + gpointer data) +{ + XferDestReadfd *self = (XferDestReadfd *)data; + char buf[TEST_XFER_SIZE]; + size_t remaining; + int fd = XFER_ELEMENT(self)->upstream->output_fd; + + remaining = sizeof(buf); + while (remaining) { + ssize_t nread; + if ((nread = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) { + error("error in read(): %s", strerror(errno)); + } + remaining -= nread; + } + + /* we should be at EOF here */ + if (read(fd, buf, 10) != 0) + g_critical("too much data entering XferDestReadfd"); + + if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE)) + g_critical("data entering XferDestReadfd does not match"); + + close(fd); + XFER_ELEMENT(self)->upstream->output_fd = -1; + + xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0)); + + return NULL; +} + +static gboolean +dest_readfd_start_impl( + XferElement *elt) +{ + XferDestReadfd *self = (XferDestReadfd *)elt; + + simpleprng_seed(&self->prng, RANDOM_SEED); + + self->thread = g_thread_create(dest_readfd_thread, (gpointer)self, FALSE, NULL); + + return TRUE; +} + +static void +dest_readfd_class_init( + XferDestReadfdClass * klass) +{ + XferElementClass *xec = XFER_ELEMENT_CLASS(klass); + static xfer_element_mech_pair_t mech_pairs[] = { + { XFER_MECH_READFD, XFER_MECH_NONE, 1, 1}, + { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + }; + + xec->start = dest_readfd_start_impl; + xec->mech_pairs = mech_pairs; +} + +GType +xfer_dest_readfd_get_type (void) +{ + static GType type = 0; + + if G_UNLIKELY(type == 0) { + static const GTypeInfo info = { + sizeof (XferDestReadfdClass), + (GBaseInitFunc) NULL, + (GBaseFinalizeFunc) NULL, + (GClassInitFunc) dest_readfd_class_init, + (GClassFinalizeFunc) NULL, + NULL /* class_data */, + sizeof (XferDestReadfd), + 0 /* n_preallocs */, + (GInstanceInitFunc) NULL, + NULL + }; + + type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestReadfd", &info, 0); + } + + return type; +} + +/* WRITEFD */ + +static GType xfer_dest_writefd_get_type(void); +#define XFER_DEST_WRITEFD_TYPE (xfer_dest_writefd_get_type()) +#define XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd) +#define XFER_DEST_WRITEFD_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_writefd_get_type(), XferDestWritefd const) +#define XFER_DEST_WRITEFD_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_writefd_get_type(), XferDestWritefdClass) +#define IS_XFER_DEST_WRITEFD(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_writefd_get_type ()) +#define XFER_DEST_WRITEFD_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_writefd_get_type(), XferDestWritefdClass) + +typedef struct XferDestWritefd { + XferElement __parent__; + + int read_fd; + GThread *thread; + simpleprng_state_t prng; +} XferDestWritefd; + +typedef struct { + XferElementClass __parent__; +} XferDestWritefdClass; + +static gpointer +dest_writefd_thread( + gpointer data) +{ + XferDestWritefd *self = (XferDestWritefd *)data; + char buf[TEST_XFER_SIZE]; + size_t remaining; + int fd = self->read_fd; + + remaining = sizeof(buf); + while (remaining) { + ssize_t nwrite; + if ((nwrite = read(fd, buf+sizeof(buf)-remaining, remaining)) <= 0) { + error("error in read(): %s", strerror(errno)); + } + remaining -= nwrite; + } + + /* we should be at EOF here */ + if (read(fd, buf, 10) != 0) + g_critical("too much data entering XferDestWritefd"); + + if (!simpleprng_verify_buffer(&self->prng, buf, TEST_XFER_SIZE)) + g_critical("data entering XferDestWritefd does not match"); + + close(fd); + XFER_ELEMENT(self)->upstream->output_fd = -1; + + xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0)); + + return NULL; +} + +static void +dest_writefd_setup_impl( + XferElement *elt) +{ + XferDestWritefd *self = (XferDestWritefd *)elt; + int p[2]; + + simpleprng_seed(&self->prng, RANDOM_SEED); + + if (pipe(p) < 0) + g_critical("Error from pipe(): %s", strerror(errno)); + + self->read_fd = p[0]; + XFER_ELEMENT(self)->input_fd = p[1]; +} + +static gboolean +dest_writefd_start_impl( + XferElement *elt) +{ + XferDestWritefd *self = (XferDestWritefd *)elt; + self->thread = g_thread_create(dest_writefd_thread, (gpointer)self, FALSE, NULL); + + return TRUE; +} + +static void +dest_writefd_class_init( + XferDestWritefdClass * klass) +{ + XferElementClass *xec = XFER_ELEMENT_CLASS(klass); + static xfer_element_mech_pair_t mech_pairs[] = { + { XFER_MECH_WRITEFD, XFER_MECH_NONE, 1, 1}, + { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + }; + + xec->setup = dest_writefd_setup_impl; + xec->start = dest_writefd_start_impl; + xec->mech_pairs = mech_pairs; +} + +GType +xfer_dest_writefd_get_type (void) +{ + static GType type = 0; + + if G_UNLIKELY(type == 0) { + static const GTypeInfo info = { + sizeof (XferDestWritefdClass), + (GBaseInitFunc) NULL, + (GBaseFinalizeFunc) NULL, + (GClassInitFunc) dest_writefd_class_init, + (GClassFinalizeFunc) NULL, + NULL /* class_data */, + sizeof (XferDestWritefd), + 0 /* n_preallocs */, + (GInstanceInitFunc) NULL, + NULL + }; + + type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestWritefd", &info, 0); + } + + return type; +} + +/* PUSH_BUFFER */ + +static GType xfer_dest_push_get_type(void); +#define XFER_DEST_PUSH_TYPE (xfer_dest_push_get_type()) +#define XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush) +#define XFER_DEST_PUSH_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_push_get_type(), XferDestPush const) +#define XFER_DEST_PUSH_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_push_get_type(), XferDestPushClass) +#define IS_XFER_DEST_PUSH(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_push_get_type ()) +#define XFER_DEST_PUSH_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_push_get_type(), XferDestPushClass) + +typedef struct XferDestPush { + XferElement __parent__; + + char buf[TEST_XFER_SIZE]; + size_t bufpos; + + GThread *thread; + simpleprng_state_t prng; +} XferDestPush; + +typedef struct { + XferElementClass __parent__; +} XferDestPushClass; + +static void +dest_push_push_buffer_impl( + XferElement *elt, + gpointer buf, + size_t size) +{ + XferDestPush *self = (XferDestPush *)elt; + + if (buf == NULL) { + /* if we're at EOF, verify we got the right bytes */ + g_assert(self->bufpos == TEST_XFER_SIZE); + if (!simpleprng_verify_buffer(&self->prng, self->buf, TEST_XFER_SIZE)) + g_critical("data entering XferDestPush does not match"); + return; + } + + g_assert(self->bufpos + size <= TEST_XFER_SIZE); + memcpy(self->buf + self->bufpos, buf, size); + self->bufpos += size; +} + +static void +dest_push_setup_impl( + XferElement *elt) +{ + XferDestPush *self = (XferDestPush *)elt; + + simpleprng_seed(&self->prng, RANDOM_SEED); +} + +static void +dest_push_class_init( + XferDestPushClass * klass) +{ + XferElementClass *xec = XFER_ELEMENT_CLASS(klass); + static xfer_element_mech_pair_t mech_pairs[] = { + { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 0}, + { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + }; + + xec->push_buffer = dest_push_push_buffer_impl; + xec->setup = dest_push_setup_impl; + xec->mech_pairs = mech_pairs; +} + +GType +xfer_dest_push_get_type (void) +{ + static GType type = 0; + + if G_UNLIKELY(type == 0) { + static const GTypeInfo info = { + sizeof (XferDestPushClass), + (GBaseInitFunc) NULL, + (GBaseFinalizeFunc) NULL, + (GClassInitFunc) dest_push_class_init, + (GClassFinalizeFunc) NULL, + NULL /* class_data */, + sizeof (XferDestPush), + 0 /* n_preallocs */, + (GInstanceInitFunc) NULL, + NULL + }; + + type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPush", &info, 0); + } + + return type; +} + +/* PULL_BUFFER */ + +static GType xfer_dest_pull_get_type(void); +#define XFER_DEST_PULL_TYPE (xfer_dest_pull_get_type()) +#define XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull) +#define XFER_DEST_PULL_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_pull_get_type(), XferDestPull const) +#define XFER_DEST_PULL_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_pull_get_type(), XferDestPullClass) +#define IS_XFER_DEST_PULL(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_pull_get_type ()) +#define XFER_DEST_PULL_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_pull_get_type(), XferDestPullClass) + +typedef struct XferDestPull { + XferElement __parent__; + + GThread *thread; + simpleprng_state_t prng; +} XferDestPull; + +typedef struct { + XferElementClass __parent__; +} XferDestPullClass; + +static gpointer +dest_pull_thread( + gpointer data) +{ + XferDestPull *self = (XferDestPull *)data; + char fullbuf[TEST_XFER_SIZE]; + char *buf; + size_t bufpos = 0; + size_t size; + + while ((buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size))) { + g_assert(bufpos + size <= TEST_XFER_SIZE); + memcpy(fullbuf + bufpos, buf, size); + bufpos += size; + } + + /* we're at EOF, so verify we got the right bytes */ + g_assert(bufpos == TEST_XFER_SIZE); + if (!simpleprng_verify_buffer(&self->prng, fullbuf, TEST_XFER_SIZE)) + g_critical("data entering XferDestPull does not match"); + + xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0)); + + return NULL; +} + +static gboolean +dest_pull_start_impl( + XferElement *elt) +{ + XferDestPull *self = (XferDestPull *)elt; + + simpleprng_seed(&self->prng, RANDOM_SEED); + + self->thread = g_thread_create(dest_pull_thread, (gpointer)self, FALSE, NULL); + + return TRUE; +} + +static void +dest_pull_class_init( + XferDestPullClass * klass) +{ + XferElementClass *xec = XFER_ELEMENT_CLASS(klass); + static xfer_element_mech_pair_t mech_pairs[] = { + { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 1, 1}, + { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + }; + + xec->start = dest_pull_start_impl; + xec->mech_pairs = mech_pairs; +} + +GType +xfer_dest_pull_get_type (void) +{ + static GType type = 0; + + if G_UNLIKELY(type == 0) { + static const GTypeInfo info = { + sizeof (XferDestPullClass), + (GBaseInitFunc) NULL, + (GBaseFinalizeFunc) NULL, + (GClassInitFunc) dest_pull_class_init, + (GClassFinalizeFunc) NULL, + NULL /* class_data */, + sizeof (XferDestPull), + 0 /* n_preallocs */, + (GInstanceInitFunc) NULL, + NULL + }; + + type = g_type_register_static (XFER_ELEMENT_TYPE, "XferDestPull", &info, 0); + } + + return type; +} + + +/* + * Tests + */ + +static void +test_xfer_generic_callback( + gpointer data G_GNUC_UNUSED, + XMsg *msg, + Xfer *xfer) +{ + tu_dbg("Received message %s\n", xmsg_repr(msg)); + + switch (msg->type) { + case XMSG_DONE: + /* are we done? */ + if (xfer->status == XFER_DONE) { + tu_dbg("all elements are done!\n"); + g_main_loop_quit(default_main_loop()); + } + break; + + default: + break; + } +} + +/**** + * Run a simple transfer with some xor filters + */ + +static int +test_xfer_simple(void) +{ + unsigned int i; + GSource *src; + XferElement *elements[] = { + xfer_source_random(100*1024, RANDOM_SEED), + xfer_filter_xor('d'), + xfer_filter_xor('d'), + xfer_dest_null(RANDOM_SEED), + }; + + Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements)); + src = xfer_get_source(xfer); + g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL); + g_source_attach(src, NULL); + tu_dbg("Transfer: %s\n", xfer_repr(xfer)); + + /* unreference the elements */ + for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) { + g_object_unref(elements[i]); + g_assert(G_OBJECT(elements[i])->ref_count == 1); + elements[i] = NULL; + } + + xfer_start(xfer); + + g_main_loop_run(default_main_loop()); + g_assert(xfer->status == XFER_DONE); + + xfer_unref(xfer); + + return 1; +} + +/**** + * Run a transfer between two files, with or without filters + */ + +static int +test_xfer_files(gboolean add_filters) +{ + unsigned int i; + unsigned int elts; + GSource *src; + char *in_filename = __FILE__; + char *out_filename = "xfer-test.tmp"; /* current directory is writeable */ + int rfd, wfd; + Xfer *xfer; + XferElement *elements[4]; + + rfd = open(in_filename, O_RDONLY, 0); + if (rfd < 0) + g_critical("Could not open '%s': %s", in_filename, strerror(errno)); + + wfd = open(out_filename, O_WRONLY|O_CREAT, 0777); + if (wfd < 0) + g_critical("Could not open '%s': %s", out_filename, strerror(errno)); + + elts = 0; + elements[elts++] = xfer_source_fd(rfd); + if (add_filters) { + elements[elts++] = xfer_filter_xor(0xab); + elements[elts++] = xfer_filter_xor(0xab); + } + elements[elts++] = xfer_dest_fd(wfd); + + xfer = xfer_new(elements, elts); + src = xfer_get_source(xfer); + g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL); + g_source_attach(src, NULL); + tu_dbg("Transfer: %s\n", xfer_repr(xfer)); + + /* unreference the elements */ + for (i = 0; i < elts; i++) { + g_object_unref(elements[i]); + g_assert(G_OBJECT(elements[i])->ref_count == 1); + elements[i] = NULL; + } + + xfer_start(xfer); + + g_main_loop_run(default_main_loop()); + g_assert(xfer->status == XFER_DONE); + + xfer_unref(xfer); + + unlink(out_filename); /* ignore any errors */ + + return 1; +} + +static int +test_xfer_files_simple(void) +{ + return test_xfer_files(FALSE); +} + +static int +test_xfer_files_filter(void) +{ + return test_xfer_files(TRUE); +} + +/***** + * test each possible combination of source and destination mechansim + */ + +static int +test_glue_combo( + XferElement *source, + XferElement *dest) +{ + unsigned int i; + GSource *src; + XferElement *elements[] = { source, dest }; + + Xfer *xfer = xfer_new(elements, sizeof(elements)/sizeof(*elements)); + src = xfer_get_source(xfer); + g_source_set_callback(src, (GSourceFunc)test_xfer_generic_callback, NULL, NULL); + g_source_attach(src, NULL); + + /* unreference the elements */ + for (i = 0; i < sizeof(elements)/sizeof(*elements); i++) { + g_object_unref(elements[i]); + g_assert(G_OBJECT(elements[i])->ref_count == 1); + elements[i] = NULL; + } + + xfer_start(xfer); + + g_main_loop_run(default_main_loop()); + g_assert(xfer->status == XFER_DONE); + + xfer_unref(xfer); + + return 1; +} + +#define make_test_glue(n, s, d) static int n(void) \ +{\ + return test_glue_combo((XferElement *)g_object_new(s, NULL), \ + (XferElement *)g_object_new(d, NULL)); \ +} +make_test_glue(test_glue_READFD_READFD, XFER_SOURCE_READFD_TYPE, XFER_DEST_READFD_TYPE) +make_test_glue(test_glue_READFD_WRITE, XFER_SOURCE_READFD_TYPE, XFER_DEST_WRITEFD_TYPE) +make_test_glue(test_glue_READFD_PUSH, XFER_SOURCE_READFD_TYPE, XFER_DEST_PUSH_TYPE) +make_test_glue(test_glue_READFD_PULL, XFER_SOURCE_READFD_TYPE, XFER_DEST_PULL_TYPE) +make_test_glue(test_glue_WRITEFD_READFD, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_READFD_TYPE) +make_test_glue(test_glue_WRITEFD_WRITE, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_WRITEFD_TYPE) +make_test_glue(test_glue_WRITEFD_PUSH, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PUSH_TYPE) +make_test_glue(test_glue_WRITEFD_PULL, XFER_SOURCE_WRITEFD_TYPE, XFER_DEST_PULL_TYPE) +make_test_glue(test_glue_PUSH_READFD, XFER_SOURCE_PUSH_TYPE, XFER_DEST_READFD_TYPE) +make_test_glue(test_glue_PUSH_WRITE, XFER_SOURCE_PUSH_TYPE, XFER_DEST_WRITEFD_TYPE) +make_test_glue(test_glue_PUSH_PUSH, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PUSH_TYPE) +make_test_glue(test_glue_PUSH_PULL, XFER_SOURCE_PUSH_TYPE, XFER_DEST_PULL_TYPE) +make_test_glue(test_glue_PULL_READFD, XFER_SOURCE_PULL_TYPE, XFER_DEST_READFD_TYPE) +make_test_glue(test_glue_PULL_WRITE, XFER_SOURCE_PULL_TYPE, XFER_DEST_WRITEFD_TYPE) +make_test_glue(test_glue_PULL_PUSH, XFER_SOURCE_PULL_TYPE, XFER_DEST_PUSH_TYPE) +make_test_glue(test_glue_PULL_PULL, XFER_SOURCE_PULL_TYPE, XFER_DEST_PULL_TYPE) + +/* + * Main driver + */ + +int +main(int argc, char **argv) +{ + static TestUtilsTest tests[] = { + TU_TEST(test_xfer_simple, 10), + TU_TEST(test_xfer_files_simple, 10), + TU_TEST(test_xfer_files_filter, 10), + TU_TEST(test_glue_READFD_READFD, 5), + TU_TEST(test_glue_READFD_WRITE, 5), + TU_TEST(test_glue_READFD_PUSH, 5), + TU_TEST(test_glue_READFD_PULL, 5), + TU_TEST(test_glue_WRITEFD_READFD, 5), + TU_TEST(test_glue_WRITEFD_WRITE, 5), + TU_TEST(test_glue_WRITEFD_PUSH, 5), + TU_TEST(test_glue_WRITEFD_PULL, 5), + TU_TEST(test_glue_PUSH_READFD, 5), + TU_TEST(test_glue_PUSH_WRITE, 5), + TU_TEST(test_glue_PUSH_PUSH, 5), + TU_TEST(test_glue_PUSH_PULL, 5), + TU_TEST(test_glue_PULL_READFD, 5), + TU_TEST(test_glue_PULL_WRITE, 5), + TU_TEST(test_glue_PULL_PUSH, 5), + TU_TEST(test_glue_PULL_PULL, 5), + TU_END() + }; + + glib_init(); + + return testutils_run_tests(argc, argv, tests); +}