+/* LISTEN */
+
+static GType xfer_source_listen_get_type(void);
+#define XFER_SOURCE_LISTEN_TYPE (xfer_source_listen_get_type())
+#define XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen)
+#define XFER_SOURCE_LISTEN_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_listen_get_type(), XferSourceListen const)
+#define XFER_SOURCE_LISTEN_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_listen_get_type(), XferSourceListenClass)
+#define IS_XFER_SOURCE_LISTEN(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_listen_get_type ())
+#define XFER_SOURCE_LISTEN_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_listen_get_type(), XferSourceListenClass)
+
+typedef struct XferSourceListen {
+ XferElement __parent__;
+
+ GThread *thread;
+ simpleprng_state_t prng;
+} XferSourceListen;
+
+typedef struct {
+ XferElementClass __parent__;
+} XferSourceListenClass;
+
+static gpointer
+source_listen_thread(
+ gpointer data)
+{
+ XferSourceListen *self = XFER_SOURCE_LISTEN(data);
+ XferElement *elt = XFER_ELEMENT(self);
+ DirectTCPAddr *addrs;
+ sockaddr_union addr;
+ int sock;
+ char *buf;
+ int i;
+
+ /* set up the sockaddr -- IPv4 only */
+ SU_INIT(&addr, AF_INET);
+ addrs = elt->downstream->input_listen_addrs;
+ g_assert(addrs != NULL);
+ SU_SET_PORT(&addr, addrs->port);
+ ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+
+ tu_dbg("making data connection to %s\n", str_sockaddr(&addr));
+ sock = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock < 0) {
+ error("socket(): %s", strerror(errno));
+ }
+ if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
+ error("connect(): %s", strerror(errno));
+ }
+
+ tu_dbg("connected to %s\n", str_sockaddr(&addr));
+
+ buf = g_malloc(TEST_BLOCK_SIZE);
+ for (i = 0; i < TEST_BLOCK_COUNT; i++) {
+ simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
+ if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
+ error("error in full_write(): %s", strerror(errno));
+ }
+ }
+
+ /* send a smaller block */
+ simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
+ if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
+ error("error in full_write(): %s", strerror(errno));
+ }
+ g_free(buf);
+
+ /* send EOF by closing the socket */
+ close(sock);
+
+ xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+ return NULL;
+}
+
+static gboolean
+source_listen_start_impl(
+ XferElement *elt)
+{
+ XferSourceListen *self = XFER_SOURCE_LISTEN(elt);
+
+ simpleprng_seed(&self->prng, RANDOM_SEED);
+
+ self->thread = g_thread_create(source_listen_thread, (gpointer)self, FALSE, NULL);
+
+ return TRUE;
+}
+
+static void
+source_listen_class_init(
+ XferSourceListenClass * klass)
+{
+ XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+ static xfer_element_mech_pair_t mech_pairs[] = {
+ { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 1, 0},
+ { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ };
+
+ xec->start = source_listen_start_impl;
+ xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_source_listen_get_type (void)
+{
+ static GType type = 0;
+
+ if G_UNLIKELY(type == 0) {
+ static const GTypeInfo info = {
+ sizeof (XferSourceListenClass),
+ (GBaseInitFunc) NULL,
+ (GBaseFinalizeFunc) NULL,
+ (GClassInitFunc) source_listen_class_init,
+ (GClassFinalizeFunc) NULL,
+ NULL /* class_data */,
+ sizeof (XferSourceListen),
+ 0 /* n_preallocs */,
+ (GInstanceInitFunc) NULL,
+ NULL
+ };
+
+ type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceListen", &info, 0);
+ }
+
+ return type;
+}
+
+/* CONNECT */
+
+static GType xfer_source_connect_get_type(void);
+#define XFER_SOURCE_CONNECT_TYPE (xfer_source_connect_get_type())
+#define XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect)
+#define XFER_SOURCE_CONNECT_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_connect_get_type(), XferSourceConnect const)
+#define XFER_SOURCE_CONNECT_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_connect_get_type(), XferSourceConnectClass)
+#define IS_XFER_SOURCE_CONNECT(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_connect_get_type ())
+#define XFER_SOURCE_CONNECT_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_connect_get_type(), XferSourceConnectClass)
+
+typedef struct XferSourceConnect {
+ XferElement __parent__;
+
+ int listen_socket;
+
+ GThread *thread;
+ simpleprng_state_t prng;
+} XferSourceConnect;
+
+typedef struct {
+ XferElementClass __parent__;
+} XferSourceConnectClass;
+
+static gpointer
+source_connect_thread(
+ gpointer data)
+{
+ XferSourceConnect *self = XFER_SOURCE_CONNECT(data);
+ int sock;
+ char *buf;
+ int i;
+
+ g_assert(self->listen_socket != -1);
+
+ if ((sock = accept(self->listen_socket, NULL, NULL)) == -1) {
+ xfer_cancel_with_error(XFER_ELEMENT(self),
+ _("Error accepting incoming connection: %s"), strerror(errno));
+ wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
+ return NULL;
+ }
+
+ /* close the listening socket now, for good measure */
+ close(self->listen_socket);
+ self->listen_socket = -1;
+
+ tu_dbg("connection accepted\n");
+
+ buf = g_malloc(TEST_BLOCK_SIZE);
+ for (i = 0; i < TEST_BLOCK_COUNT; i++) {
+ simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_SIZE);
+ if (full_write(sock, buf, TEST_BLOCK_SIZE) < TEST_BLOCK_SIZE) {
+ error("error in full_write(): %s", strerror(errno));
+ }
+ }
+
+ /* send a smaller block */
+ simpleprng_fill_buffer(&self->prng, buf, TEST_BLOCK_EXTRA);
+ if (full_write(sock, buf, TEST_BLOCK_EXTRA) < TEST_BLOCK_EXTRA) {
+ error("error in full_write(): %s", strerror(errno));
+ }
+ g_free(buf);
+
+ /* send EOF by closing the socket */
+ close(sock);
+
+ xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+ return NULL;
+}
+
+static gboolean
+source_connect_setup_impl(
+ XferElement *elt)
+{
+ XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
+ sockaddr_union addr;
+ DirectTCPAddr *addrs;
+ socklen_t len;
+ int sock;
+
+ /* set up self->listen_socket and set elt->output_listen_addrs */
+ sock = self->listen_socket = socket(AF_INET, SOCK_STREAM, 0);
+ if (sock < 0)
+ error("socket(): %s", strerror(errno));
+
+ if (listen(sock, 1) < 0)
+ error("listen(): %s", strerror(errno));
+
+ len = sizeof(addr);
+ if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
+ error("getsockname(): %s", strerror(errno));
+ g_assert(SU_GET_FAMILY(&addr) == AF_INET);
+
+ addrs = g_new0(DirectTCPAddr, 2);
+ addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1"));
+ addrs[0].port = SU_GET_PORT(&addr);
+ elt->output_listen_addrs = addrs;
+
+ return TRUE;
+}
+
+static gboolean
+source_connect_start_impl(
+ XferElement *elt)
+{
+ XferSourceConnect *self = XFER_SOURCE_CONNECT(elt);
+
+ simpleprng_seed(&self->prng, RANDOM_SEED);
+
+ self->thread = g_thread_create(source_connect_thread, (gpointer)self, FALSE, NULL);
+
+ return TRUE;
+}
+
+static void
+source_connect_class_init(
+ XferSourceConnectClass * klass)
+{
+ XferElementClass *xec = XFER_ELEMENT_CLASS(klass);
+ static xfer_element_mech_pair_t mech_pairs[] = {
+ { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 1, 0},
+ { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ };
+
+ xec->setup = source_connect_setup_impl;
+ xec->start = source_connect_start_impl;
+ xec->mech_pairs = mech_pairs;
+}
+
+GType
+xfer_source_connect_get_type (void)
+{
+ static GType type = 0;
+
+ if G_UNLIKELY(type == 0) {
+ static const GTypeInfo info = {
+ sizeof (XferSourceConnectClass),
+ (GBaseInitFunc) NULL,
+ (GBaseFinalizeFunc) NULL,
+ (GClassInitFunc) source_connect_class_init,
+ (GClassFinalizeFunc) NULL,
+ NULL /* class_data */,
+ sizeof (XferSourceConnect),
+ 0 /* n_preallocs */,
+ (GInstanceInitFunc) NULL,
+ NULL
+ };
+
+ type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceConnect", &info, 0);
+ }
+
+ return type;
+}
+