Imported Upstream version 3.1.0
[debian/amanda] / xfer-src / element-glue.c
index 75c7177815c75ae134fe583cba912a404e09bc6b..29e639ad5aac16ff89bf6421b300a5138c1cd9e4 100644 (file)
@@ -1,25 +1,84 @@
 /*
  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2008 Zmanda Inc.
+ * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
  *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation.
  *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
  *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+ *
+ * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
+ * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
  */
 
 #include "amxfer.h"
 #include "element-glue.h"
 #include "amanda.h"
+#include "directtcp.h"
+#include "sockaddr-util.h"
+
+/*
+ * Instance definition
+ */
+
+typedef struct XferElementGlue_ {
+    XferElement __parent__;
+
+    /* instructions to push_buffer_impl */
+    enum {
+       PUSH_TO_RING_BUFFER,
+       PUSH_TO_FD, /* write to *write_fdp */
+       PUSH_INVALID,
+
+       PUSH_ACCEPT_FIRST = (1 << 16),
+       PUSH_CONNECT_FIRST = (2 << 16),
+    } on_push;
+
+    /* instructions to pull_buffer_impl */
+    enum {
+       PULL_FROM_RING_BUFFER,
+       PULL_FROM_FD, /* read from *read_fdp */
+       PULL_INVALID,
+
+       PULL_ACCEPT_FIRST = (1 << 16),
+       PULL_CONNECT_FIRST = (2 << 16),
+    } on_pull;
+
+    int *write_fdp;
+    int *read_fdp;
+
+    gboolean need_thread;
+
+    /* the stuff we might use, depending on what flavor of glue we're
+     * providing.. */
+    int pipe[2];
+    int input_listen_socket, output_listen_socket;
+    int input_data_socket, output_data_socket;
+
+    /* a ring buffer of ptr/size pairs with semaphores */
+    struct { gpointer buf; size_t size; } *ring;
+    semaphore_t *ring_used_sem, *ring_free_sem;
+    gint ring_head, ring_tail;
+
+    GThread *thread;
+    GThreadFunc threadfunc;
+} XferElementGlue;
+
+/*
+ * Class definition
+ */
+
+typedef struct XferElementGlueClass_ {
+    XferElementClass __parent__;
+} XferElementGlueClass;
 
 static GObjectClass *parent_class = NULL;
 
@@ -43,23 +102,129 @@ send_xfer_done(
            xmsg_new((XferElement *)self, XMSG_DONE, 0));
 }
 
+static gboolean
+do_directtcp_listen(
+    XferElement *elt,
+    int *sockp,
+    DirectTCPAddr **addrsp)
+{
+    int sock;
+    sockaddr_union addr;
+    DirectTCPAddr *addrs;
+    socklen_t len;
+
+    sock = *sockp = socket(AF_INET, SOCK_STREAM, 0);
+    if (sock < 0) {
+       xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
+       return FALSE;
+    }
+
+    if (listen(sock, 1) < 0) {
+       xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
+       return FALSE;
+    }
+
+    /* TODO: which addresses should this display? all ifaces? localhost? */
+    len = sizeof(addr);
+    if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
+       error("getsockname(): %s", strerror(errno));
+    g_assert(SU_GET_FAMILY(&addr) == AF_INET);
+
+    addrs = g_new0(DirectTCPAddr, 2);
+    addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); /* TODO: be smarter! */
+    addrs[0].port = SU_GET_PORT(&addr);
+    *addrsp = addrs;
+
+    return TRUE;
+}
+
+static int
+do_directtcp_accept(
+    XferElementGlue *self,
+    int *socketp)
+{
+    int sock;
+    g_assert(*socketp != -1);
+
+    if ((sock = accept(*socketp, NULL, NULL)) == -1) {
+       xfer_cancel_with_error(XFER_ELEMENT(self),
+           _("Error accepting incoming connection: %s"), strerror(errno));
+       wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
+       return -1;
+    }
+
+    /* close the listening socket now, for good measure */
+    close(*socketp);
+    *socketp = -1;
+
+    return sock;
+}
+
+static int
+do_directtcp_connect(
+    XferElementGlue *self,
+    DirectTCPAddr *addrs)
+{
+    XferElement *elt = XFER_ELEMENT(self);
+    sockaddr_union addr;
+    int sock;
+
+    if (!addrs) {
+       g_debug("element-glue got no directtcp addresses to connect to!");
+       if (!elt->cancelled) {
+           xfer_cancel_with_error(elt,
+               "%s got no directtcp addresses to connect to",
+               xfer_element_repr(elt));
+       }
+       goto cancel_wait;
+    }
+
+    /* set up the sockaddr -- IPv4 only */
+    SU_INIT(&addr, AF_INET);
+    SU_SET_PORT(&addr, addrs->port);
+    ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+
+    g_debug("making data connection to %s", str_sockaddr(&addr));
+    sock = socket(AF_INET, SOCK_STREAM, 0);
+    if (sock < 0) {
+       xfer_cancel_with_error(elt,
+           "socket(): %s", strerror(errno));
+       goto cancel_wait;
+    }
+    if (connect(sock, (struct sockaddr *)&addr, SS_LEN(&addr)) < 0) {
+       xfer_cancel_with_error(elt,
+           "connect(): %s", strerror(errno));
+       goto cancel_wait;
+    }
+
+    g_debug("connected to %s", str_sockaddr(&addr));
+
+    return sock;
+
+cancel_wait:
+    wait_until_xfer_cancelled(elt->xfer);
+    return -1;
+}
+
 #define GLUE_BUFFER_SIZE 32768
 #define GLUE_RING_BUFFER_SIZE 32
 
+#define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
+
+/* if self->read_fdp or self->write_fdp are pointing to this integer, then
+ * they should be redirected to point to the upstream's output_fd or
+ * downstream's input_fd, respectively, at start() */
+static int neighboring_element_fd = -1;
+
 /*
- * Worker threads
- *
- * At most one of these runs in a given instance, as selected in setup_impl
+ * Worker thread utility functions
  */
 
-static gpointer
-call_and_write_thread(
-    gpointer data)
+static void
+pull_and_write(XferElementGlue *self)
 {
-    XferElement *elt = XFER_ELEMENT(data);
-    XferElementGlue *self = XFER_ELEMENT_GLUE(data);
-    int *fdp = (self->pipe[1] == -1)? &elt->downstream->input_fd : &self->pipe[1];
-    int fd = *fdp;
+    XferElement *elt = XFER_ELEMENT(self);
+    int fd = *self->write_fdp;
 
     while (!elt->cancelled) {
        size_t len;
@@ -72,8 +237,11 @@ call_and_write_thread(
 
        /* write it */
        if (full_write(fd, buf, len) < len) {
-           xfer_element_handle_error(elt,
-               _("Error writing to fd %d: %s"), fd, strerror(errno));
+           if (!elt->cancelled) {
+               xfer_cancel_with_error(elt,
+                   _("Error writing to fd %d: %s"), fd, strerror(errno));
+               wait_until_xfer_cancelled(elt->xfer);
+           }
            amfree(buf);
            break;
        }
@@ -87,21 +255,15 @@ call_and_write_thread(
     /* close the fd we've been writing, as an EOF signal to downstream, and
      * set it to -1 to avoid accidental re-use */
     close(fd);
-    *fdp = -1;
-
-    send_xfer_done(self);
-
-    return NULL;
+    *self->write_fdp = -1;
 }
 
-static gpointer
-read_and_write_thread(
-    gpointer data)
+static void
+read_and_write(XferElementGlue *self)
 {
-    XferElement *elt = XFER_ELEMENT(data);
-    XferElementGlue *self = XFER_ELEMENT_GLUE(data);
-    int rfd = elt->upstream->output_fd;
-    int wfd = elt->downstream->input_fd;
+    XferElement *elt = XFER_ELEMENT(self);
+    int rfd = *self->read_fdp;
+    int wfd = *self->write_fdp;
 
     /* dynamically allocate a buffer, in case this thread has
      * a limited amount of stack allocated */
@@ -114,8 +276,11 @@ read_and_write_thread(
        len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
        if (len < GLUE_BUFFER_SIZE) {
            if (errno) {
-               xfer_element_handle_error(elt,
-                   _("Error reading from fd %d: %s"), rfd, strerror(errno));
+               if (!elt->cancelled) {
+                   xfer_cancel_with_error(elt,
+                       _("Error reading from fd %d: %s"), rfd, strerror(errno));
+                   wait_until_xfer_cancelled(elt->xfer);
+               }
                break;
            } else if (len == 0) { /* we only count a zero-length read as EOF */
                break;
@@ -124,8 +289,11 @@ read_and_write_thread(
 
        /* write the buffer fully */
        if (full_write(wfd, buf, len) < len) {
-           xfer_element_handle_error(elt,
-               _("Could not write to fd %d: %s"), wfd, strerror(errno));
+           if (!elt->cancelled) {
+               xfer_cancel_with_error(elt,
+                   _("Could not write to fd %d: %s"), wfd, strerror(errno));
+               wait_until_xfer_cancelled(elt->xfer);
+           }
            break;
        }
     }
@@ -137,28 +305,23 @@ read_and_write_thread(
      * re-use */
     if (!elt->cancelled || elt->expect_eof) {
        close(rfd);
-       elt->upstream->output_fd = -1;
+       *self->read_fdp = -1;
     }
 
     /* close the fd we've been writing, as an EOF signal to downstream, and
      * set it to -1 to avoid accidental re-use */
     close(wfd);
-    elt->downstream->input_fd = -1;
-
-    send_xfer_done(self);
+    *self->write_fdp = -1;
 
     amfree(buf);
-    return NULL;
 }
 
-static gpointer
-read_and_call_thread(
-    gpointer data)
+static void
+read_and_push(
+    XferElementGlue *self)
 {
-    XferElement *elt = XFER_ELEMENT(data);
-    XferElementGlue *self = XFER_ELEMENT_GLUE(data);
-    int *fdp = (self->pipe[0] == -1)? &elt->upstream->output_fd : &self->pipe[0];
-    int fd = *fdp;
+    XferElement *elt = XFER_ELEMENT(self);
+    int fd = *self->read_fdp;
 
     while (!elt->cancelled) {
        char *buf = g_malloc(GLUE_BUFFER_SIZE);
@@ -168,8 +331,14 @@ read_and_call_thread(
        len = full_read(fd, buf, GLUE_BUFFER_SIZE);
        if (len < GLUE_BUFFER_SIZE) {
            if (errno) {
-               xfer_element_handle_error(elt,
-                   _("Error reading from fd %d: %s"), fd, strerror(errno));
+               if (!elt->cancelled) {
+                   int saved_errno = errno;
+                   xfer_cancel_with_error(elt,
+                       _("Error reading from fd %d: %s"), fd, strerror(saved_errno));
+                   g_debug("element-glue: error reading from fd %d: %s",
+                           fd, strerror(saved_errno));
+                   wait_until_xfer_cancelled(elt->xfer);
+               }
                break;
            } else if (len == 0) { /* we only count a zero-length read as EOF */
                amfree(buf);
@@ -189,25 +358,15 @@ read_and_call_thread(
     /* close the read fd, since it's at EOF, and set it to -1 to avoid accidental
      * re-use */
     close(fd);
-    *fdp = -1;
-
-    send_xfer_done(self);
-
-    return NULL;
+    *self->read_fdp = -1;
 }
 
-static gpointer
-call_and_call_thread(
-    gpointer data)
+static void
+pull_and_push(XferElementGlue *self)
 {
-    XferElement *elt = XFER_ELEMENT(data);
-    XferElementGlue *self = XFER_ELEMENT_GLUE(data);
+    XferElement *elt = XFER_ELEMENT(self);
     gboolean eof_sent = FALSE;
 
-    /* TODO: consider breaking this into two cooperating threads: one to pull
-     * buffers from upstream and one to push them downstream.  This would gain
-     * parallelism at the cost of a lot of synchronization operations. */
-
     while (!elt->cancelled) {
        char *buf;
        size_t len;
@@ -229,6 +388,141 @@ call_and_call_thread(
 
     if (!eof_sent)
        xfer_element_push_buffer(elt->downstream, NULL, 0);
+}
+
+static gpointer
+worker_thread(
+    gpointer data)
+{
+    XferElement *elt = XFER_ELEMENT(data);
+    XferElementGlue *self = XFER_ELEMENT_GLUE(data);
+
+    switch (mech_pair(elt->input_mech, elt->output_mech)) {
+    case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
+       read_and_write(self);
+       break;
+
+    case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
+    case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
+       read_and_push(self);
+       break;
+
+    case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
+    case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
+       pull_and_write(self);
+       break;
+
+    case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
+       pull_and_push(self);
+       break;
+
+    case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
+    case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
+       if ((self->output_data_socket = do_directtcp_connect(self,
+                                   elt->downstream->input_listen_addrs)) == -1)
+           break;
+       self->write_fdp = &self->output_data_socket;
+       read_and_write(self);
+       break;
+
+    case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
+       if ((self->output_data_socket = do_directtcp_connect(self,
+                                   elt->downstream->input_listen_addrs)) == -1)
+           break;
+       self->write_fdp = &self->output_data_socket;
+       pull_and_write(self);
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
+    case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
+       if ((self->input_data_socket = do_directtcp_accept(self, &self->input_listen_socket)) == -1)
+           break;
+       self->read_fdp = &self->input_data_socket;
+       read_and_write(self);
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
+       if ((self->input_data_socket = do_directtcp_accept(self,
+                                           &self->input_listen_socket)) == -1)
+           break;
+       self->read_fdp = &self->input_data_socket;
+       read_and_push(self);
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
+    case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
+    case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
+    case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
+    case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
+    case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
+    case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
+    case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
+    case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
+    case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
+    default:
+       g_assert_not_reached();
+       break;
+
+    case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
+    case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
+       if ((self->output_data_socket = do_directtcp_accept(self,
+                                           &self->output_listen_socket)) == -1)
+           break;
+       self->write_fdp = &self->output_data_socket;
+       read_and_write(self);
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
+    case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
+       if ((self->input_data_socket = do_directtcp_connect(self,
+                                   elt->upstream->output_listen_addrs)) == -1)
+           break;
+       self->read_fdp = &self->input_data_socket;
+       read_and_write(self);
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
+       if ((self->input_data_socket = do_directtcp_connect(self,
+                                   elt->upstream->output_listen_addrs)) == -1)
+           break;
+       self->read_fdp = &self->input_data_socket;
+       read_and_push(self);
+       break;
+
+    case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
+       if ((self->output_data_socket = do_directtcp_accept(self,
+                                           &self->output_listen_socket)) == -1)
+           break;
+       self->write_fdp = &self->output_data_socket;
+       pull_and_write(self);
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
+       /* TODO: use async accept's here to avoid order dependency */
+       if ((self->output_data_socket = do_directtcp_accept(self,
+                                           &self->output_listen_socket)) == -1)
+           break;
+       self->write_fdp = &self->output_data_socket;
+       if ((self->input_data_socket = do_directtcp_accept(self,
+                                           &self->input_listen_socket)) == -1)
+           break;
+       self->read_fdp = &self->input_data_socket;
+       read_and_write(self);
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
+       /* TODO: use async connects and select() to avoid order dependency here */
+       if ((self->input_data_socket = do_directtcp_connect(self,
+                                   elt->upstream->output_listen_addrs)) == -1)
+           break;
+       self->read_fdp = &self->input_data_socket;
+       if ((self->output_data_socket = do_directtcp_connect(self,
+                                   elt->downstream->input_listen_addrs)) == -1)
+           break;
+       self->write_fdp = &self->output_data_socket;
+       read_and_write(self);
+       break;
+    }
 
     send_xfer_done(self);
 
@@ -239,122 +533,254 @@ call_and_call_thread(
  * Implementation
  */
 
-static void
+static gboolean
 setup_impl(
     XferElement *elt)
 {
     XferElementGlue *self = (XferElementGlue *)elt;
+    gboolean need_ring = FALSE;
+    gboolean need_listen_input = FALSE;
+    gboolean need_listen_output = FALSE;
+
+    g_assert(elt->input_mech != XFER_MECH_NONE);
+    g_assert(elt->output_mech != XFER_MECH_NONE);
+    g_assert(elt->input_mech != elt->output_mech);
+
+    self->read_fdp = NULL;
+    self->write_fdp = NULL;
+    self->on_push = PUSH_INVALID;
+    self->on_pull = PULL_INVALID;
+    self->need_thread = FALSE;
+
+    switch (mech_pair(elt->input_mech, elt->output_mech)) {
+    case mech_pair(XFER_MECH_READFD, XFER_MECH_WRITEFD):
+       /* thread will read from one fd and write to the other */
+       self->read_fdp = &neighboring_element_fd;
+       self->write_fdp = &neighboring_element_fd;
+       self->need_thread = TRUE;
+       break;
 
-    switch (elt->input_mech) {
-    case XFER_MECH_READFD:
-       switch (elt->output_mech) {
-       case XFER_MECH_READFD:
-           g_assert_not_reached(); /* no glue needed */
-           break;
+    case mech_pair(XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER):
+       /* thread will read from one fd and call push_buffer downstream */
+       self->read_fdp = &neighboring_element_fd;
+       self->need_thread = TRUE;
+       break;
 
-       case XFER_MECH_WRITEFD:
-           self->threadfunc = read_and_write_thread;
-           break;
+    case mech_pair(XFER_MECH_READFD, XFER_MECH_PULL_BUFFER):
+       self->read_fdp = &neighboring_element_fd;
+       self->on_pull = PULL_FROM_FD;
+       break;
 
-       case XFER_MECH_PUSH_BUFFER:
-           self->threadfunc = read_and_call_thread;
-           break;
+    case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN):
+       /* thread will connect for output, then read from fd and write to the
+        * socket. */
+       self->read_fdp = &neighboring_element_fd;
+       self->need_thread = TRUE;
+       break;
 
-       case XFER_MECH_PULL_BUFFER:
-           break;
+    case mech_pair(XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT):
+       /* thread will accept output conn, then read from upstream and write to socket */
+       self->read_fdp = &neighboring_element_fd;
+       self->need_thread = TRUE;
+       need_listen_output = TRUE;
+       break;
 
-       case XFER_MECH_NONE:
-           g_assert_not_reached();
-           break;
-       }
+    case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
+       make_pipe(self);
+       elt->input_fd = self->pipe[1];
+       self->pipe[1] = -1; /* upstream will close this for us */
+       elt->output_fd = self->pipe[0];
+       self->pipe[0] = -1; /* downstream will close this for us */
        break;
 
-    case XFER_MECH_WRITEFD:
+    case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
+       /* thread will read from pipe and call downstream's push_buffer */
        make_pipe(self);
        elt->input_fd = self->pipe[1];
        self->pipe[1] = -1; /* upstream will close this for us */
+       self->read_fdp = &self->pipe[0];
+       self->need_thread = TRUE;
+       break;
 
-       switch (elt->output_mech) {
-       case XFER_MECH_READFD:
-           elt->output_fd = self->pipe[0];
-           self->pipe[0] = -1; /* downstream will close this for us */
-           break;
+    case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
+       make_pipe(self);
+       elt->input_fd = self->pipe[1];
+       self->pipe[1] = -1; /* upstream will close this for us */
+       self->on_pull = PULL_FROM_FD;
+       self->read_fdp = &self->pipe[0];
+       break;
 
-       case XFER_MECH_WRITEFD:
-           g_assert_not_reached(); /* no glue needed */
-           break;
+    case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
+       /* thread will connect for output, then read from pipe and write to socket */
+       make_pipe(self);
+       elt->input_fd = self->pipe[1];
+       self->pipe[1] = -1; /* upstream will close this for us */
+       self->read_fdp = &self->pipe[0];
+       self->need_thread = TRUE;
+       break;
 
-       case XFER_MECH_PUSH_BUFFER:
-           self->threadfunc = read_and_call_thread;
-           break;
+    case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
+       /* thread will accept output conn, then read from pipe and write to socket */
+       make_pipe(self);
+       elt->input_fd = self->pipe[1];
+       self->pipe[1] = -1; /* upstream will close this for us */
+       self->read_fdp = &self->pipe[0];
+       self->need_thread = TRUE;
+       need_listen_output = TRUE;
+       break;
 
-       case XFER_MECH_PULL_BUFFER:
-           break;
+    case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
+       make_pipe(self);
+       elt->output_fd = self->pipe[0];
+       self->pipe[0] = -1; /* downstream will close this for us */
+       self->on_push = PUSH_TO_FD;
+       self->write_fdp = &self->pipe[1];
+       break;
 
-       case XFER_MECH_NONE:
-           g_assert_not_reached();
-           break;
-       }
+    case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD):
+       self->on_push = PUSH_TO_FD;
+       self->write_fdp = &neighboring_element_fd;
        break;
 
-    case XFER_MECH_PUSH_BUFFER:
-       switch (elt->output_mech) {
-       case XFER_MECH_READFD:
-           make_pipe(self);
-           elt->output_fd = self->pipe[0];
-           self->pipe[0] = -1; /* downstream will close this for us */
-           break;
+    case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER):
+       self->on_push = PUSH_TO_RING_BUFFER;
+       self->on_pull = PULL_FROM_RING_BUFFER;
+       need_ring = TRUE;
+       break;
 
-       case XFER_MECH_WRITEFD:
-           break;
+    case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
+       /* push will connect for output first */
+       self->on_push = PUSH_TO_FD | PUSH_CONNECT_FIRST;
+       break;
 
-       case XFER_MECH_PUSH_BUFFER:
-           g_assert_not_reached(); /* no glue needed */
-           break;
+    case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
+       /* push will accept for output first */
+       self->on_push = PUSH_TO_FD | PUSH_ACCEPT_FIRST;
+       need_listen_output = TRUE;
+       break;
 
-       case XFER_MECH_PULL_BUFFER:
-           self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
-           self->ring_used_sem = semaphore_new_with_value(0);
-           self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
-           break;
+    case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
+       /* thread will pull from upstream and write to pipe */
+       make_pipe(self);
+       elt->output_fd = self->pipe[0];
+       self->pipe[0] = -1; /* downstream will close this for us */
+       self->write_fdp = &self->pipe[1];
+       self->need_thread = TRUE;
+       break;
 
-       case XFER_MECH_NONE:
-           g_assert_not_reached();
-           break;
-       }
+    case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD):
+       /* thread will pull from upstream and write to downstream */
+       self->write_fdp = &neighboring_element_fd;
+       self->need_thread = TRUE;
        break;
 
-    case XFER_MECH_PULL_BUFFER:
-       switch (elt->output_mech) {
-       case XFER_MECH_READFD:
-           make_pipe(self);
-           elt->output_fd = self->pipe[0];
-           self->pipe[0] = -1; /* downstream will close this for us */
-           self->threadfunc = call_and_write_thread;
-           break;
+    case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER):
+       /* thread will pull from upstream and push to downstream */
+       self->need_thread = TRUE;
+       break;
 
-       case XFER_MECH_WRITEFD:
-           self->threadfunc = call_and_write_thread;
-           break;
+    case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN):
+       /* thread will connect for output, then pull from upstream and write to socket */
+       self->need_thread = TRUE;
+       break;
 
-       case XFER_MECH_PUSH_BUFFER:
-           self->threadfunc = call_and_call_thread;
-           break;
+    case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT):
+       /* thread will accept for output, then pull from upstream and write to socket */
+       self->need_thread = TRUE;
+       need_listen_output = TRUE;
+       break;
 
-       case XFER_MECH_PULL_BUFFER:
-           g_assert_not_reached(); /* no glue needed */
-           break;
+    case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
+       /* thread will accept for input, then read from socket and write to pipe */
+       make_pipe(self);
+       elt->output_fd = self->pipe[0];
+       self->pipe[0] = -1; /* downstream will close this for us */
+       self->write_fdp = &self->pipe[1];
+       self->need_thread = TRUE;
+       need_listen_input = TRUE;
+       break;
 
-       case XFER_MECH_NONE:
-           g_assert_not_reached();
-           break;
-       }
+    case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD):
+       /* thread will accept for input, then read from socket and write to downstream */
+       self->write_fdp = &neighboring_element_fd;
+       self->need_thread = TRUE;
+       need_listen_input = TRUE;
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER):
+       /* thread will accept for input, then read from socket and push downstream */
+       self->need_thread = TRUE;
+       need_listen_input = TRUE;
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER):
+       /* first pull will accept for input, then read from socket */
+       self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
+       need_listen_input = TRUE;
        break;
 
-    case XFER_MECH_NONE:
+    case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT):
+       /* thread will accept on both sides, then copy from socket to socket */
+       self->need_thread = TRUE;
+       need_listen_input = TRUE;
+       need_listen_output = TRUE;
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
+       /* thread will connect for input, then read from socket and write to pipe */
+       make_pipe(self);
+       elt->output_fd = self->pipe[0];
+       self->pipe[0] = -1; /* downstream will close this for us */
+       self->write_fdp = &self->pipe[1];
+       self->need_thread = TRUE;
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD):
+       /* thread will connect for input, then read from socket and write to downstream */
+       self->write_fdp = &neighboring_element_fd;
+       self->need_thread = TRUE;
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER):
+       /* thread will connect for input, then read from socket and push downstream */
+       self->need_thread = TRUE;
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER):
+       /* first pull will connect for input, then read from socket */
+       self->on_pull = PULL_FROM_FD | PULL_CONNECT_FIRST;
+       break;
+
+    case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN):
+       /* thread will connect on both sides, then copy from socket to socket */
+       self->on_pull = PULL_FROM_FD | PULL_ACCEPT_FIRST;
+       self->need_thread = TRUE;
+       break;
+
+    default:
        g_assert_not_reached();
        break;
     }
+
+    /* set up ring if desired */
+    if (need_ring) {
+       self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
+       self->ring_used_sem = semaphore_new_with_value(0);
+       self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
+    }
+
+    if (need_listen_input) {
+       if (!do_directtcp_listen(elt,
+                   &self->input_listen_socket, &elt->input_listen_addrs))
+           return FALSE;
+    }
+    if (need_listen_output) {
+       if (!do_directtcp_listen(elt,
+                   &self->output_listen_socket, &elt->output_listen_addrs))
+           return FALSE;
+    }
+
+    return TRUE;
 }
 
 static gboolean
@@ -363,12 +789,17 @@ start_impl(
 {
     XferElementGlue *self = (XferElementGlue *)elt;
 
-    if (self->threadfunc) {
-       self->thread = g_thread_create(self->threadfunc, (gpointer)self, FALSE, NULL);
-    }
+    /* upstream and downstream are now set, so we can point our fdp's to them */
+    if (self->write_fdp == &neighboring_element_fd)
+       self->write_fdp = &elt->downstream->input_fd;
+    if (self->read_fdp == &neighboring_element_fd)
+       self->read_fdp = &elt->upstream->output_fd;
+
+    if (self->need_thread)
+       self->thread = g_thread_create(worker_thread, (gpointer)self, FALSE, NULL);
 
     /* we're active if we have a thread that will eventually die */
-    return self->threadfunc? TRUE : FALSE;
+    return self->need_thread;
 }
 
 static gpointer
@@ -378,75 +809,131 @@ pull_buffer_impl(
 {
     XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
 
-    if (self->ring) {
-       gpointer buf;
+    /* accept first, if required */
+    if (self->on_pull & PULL_ACCEPT_FIRST) {
+       /* don't accept the next time around */
+       self->on_pull &= ~PULL_ACCEPT_FIRST;
 
        if (elt->cancelled) {
-           /* The finalize method will empty the ring buffer */
            *size = 0;
            return NULL;
        }
 
-       /* make sure there's at least one element available */
-       semaphore_down(self->ring_used_sem);
-
-       /* get it */
-       buf = self->ring[self->ring_tail].buf;
-       *size = self->ring[self->ring_tail].size;
-       self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
+       if ((self->input_data_socket = do_directtcp_accept(self,
+                                           &self->input_listen_socket)) == -1) {
+           /* do_directtcp_accept already signalled an error; xfer
+            * is cancelled */
+           *size = 0;
+           return NULL;
+       }
 
-       /* and mark this element as free to be overwritten */
-       semaphore_up(self->ring_free_sem);
+       /* read from this new socket */
+       self->read_fdp = &self->input_data_socket;
+    }
 
-       return buf;
-    } else {
-       int *fdp = (self->pipe[0] == -1)? &elt->upstream->output_fd : &self->pipe[0];
-       int fd = *fdp;
-       char *buf = g_malloc(GLUE_BUFFER_SIZE);
-       ssize_t len;
+    /* or connect first, if required */
+    if (self->on_pull & PULL_CONNECT_FIRST) {
+       /* don't connect the next time around */
+       self->on_pull &= ~PULL_CONNECT_FIRST;
 
        if (elt->cancelled) {
-           if (elt->expect_eof)
-               xfer_element_drain_by_reading(fd);
-
-           close(fd);
-           *fdp = -1;
+           *size = 0;
+           return NULL;
+       }
 
+       if ((self->input_data_socket = do_directtcp_connect(self,
+                                   elt->upstream->output_listen_addrs)) == -1) {
+           /* do_directtcp_connect already signalled an error; xfer
+            * is cancelled */
            *size = 0;
            return NULL;
        }
 
-       /* read from upstream */
-       len = full_read(fd, buf, GLUE_BUFFER_SIZE);
-       if (len < GLUE_BUFFER_SIZE) {
-           if (errno) {
-               xfer_element_handle_error(elt,
-                   _("Error reading from fd %d: %s"), fd, strerror(errno));
+       /* read from this new socket */
+       self->read_fdp = &self->input_data_socket;
+    }
 
-               /* return an EOF */
-               amfree(buf);
-               len = 0;
+    switch (self->on_pull) {
+       case PULL_FROM_RING_BUFFER: {
+           gpointer buf;
 
-               /* and finish off the upstream */
-               if (elt->expect_eof) {
+           if (elt->cancelled) {
+               /* the finalize method will empty the ring buffer */
+               *size = 0;
+               return NULL;
+           }
+
+           /* make sure there's at least one element available */
+           semaphore_down(self->ring_used_sem);
+
+           /* get it */
+           buf = self->ring[self->ring_tail].buf;
+           *size = self->ring[self->ring_tail].size;
+           self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
+
+           /* and mark this element as free to be overwritten */
+           semaphore_up(self->ring_free_sem);
+
+           return buf;
+       }
+
+       case PULL_FROM_FD: {
+           int fd = *self->read_fdp;
+           char *buf = g_malloc(GLUE_BUFFER_SIZE);
+           ssize_t len;
+
+           if (elt->cancelled) {
+               if (elt->expect_eof)
                    xfer_element_drain_by_reading(fd);
-               }
+
                close(fd);
-               *fdp = -1;
-           } else if (len == 0) {
-               /* EOF */
-               g_free(buf);
-               buf = NULL;
+               *self->read_fdp = -1;
+
                *size = 0;
+               return NULL;
+           }
 
-               /* signal EOF to downstream */
-               close(fd);
-               *fdp = -1;
+           /* read from upstream */
+           len = full_read(fd, buf, GLUE_BUFFER_SIZE);
+           if (len < GLUE_BUFFER_SIZE) {
+               if (errno) {
+                   if (!elt->cancelled) {
+                       xfer_cancel_with_error(elt,
+                           _("Error reading from fd %d: %s"), fd, strerror(errno));
+                       wait_until_xfer_cancelled(elt->xfer);
+                   }
+
+                   /* return an EOF */
+                   amfree(buf);
+                   len = 0;
+
+                   /* and finish off the upstream */
+                   if (elt->expect_eof) {
+                       xfer_element_drain_by_reading(fd);
+                   }
+                   close(fd);
+                   *self->read_fdp = -1;
+               } else if (len == 0) {
+                   /* EOF */
+                   g_free(buf);
+                   buf = NULL;
+                   *size = 0;
+
+                   /* signal EOF to downstream */
+                   close(fd);
+                   *self->read_fdp = -1;
+               }
            }
+
+           *size = (size_t)len;
+
+           return buf;
        }
 
-       *size = (size_t)len;
-       return buf;
+       default:
+       case PULL_INVALID:
+           g_assert_not_reached();
+           return NULL;
     }
 }
 
@@ -458,57 +945,107 @@ push_buffer_impl(
 {
     XferElementGlue *self = (XferElementGlue *)elt;
 
-    if (self->ring) {
-       /* just drop packets if the transfer has been cancelled */
+    /* accept first, if required */
+    if (self->on_push & PUSH_ACCEPT_FIRST) {
+       /* don't accept the next time around */
+       self->on_push &= ~PUSH_ACCEPT_FIRST;
+
        if (elt->cancelled) {
-           amfree(buf);
            return;
        }
 
-       /* make sure there's at least one element free */
-       semaphore_down(self->ring_free_sem);
-
-       /* set it */
-       self->ring[self->ring_head].buf = buf;
-       self->ring[self->ring_head].size = len;
-       self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
+       if ((self->output_data_socket = do_directtcp_accept(self,
+                                           &self->output_listen_socket)) == -1) {
+           /* do_directtcp_accept already signalled an error; xfer
+            * is cancelled */
+           return;
+       }
 
-       /* and mark this element as available for reading */
-       semaphore_up(self->ring_used_sem);
+       /* write to this new socket */
+       self->write_fdp = &self->output_data_socket;
+    }
 
-       return;
-    } else {
-       int *fdp = (self->pipe[1] == -1)? &elt->downstream->input_fd : &self->pipe[1];
-       int fd = *fdp;
+    /* or connect first, if required */
+    if (self->on_push & PUSH_CONNECT_FIRST) {
+       /* don't accept the next time around */
+       self->on_push &= ~PUSH_CONNECT_FIRST;
 
        if (elt->cancelled) {
-           if (!elt->expect_eof || !buf) {
-               close(fd);
-               *fdp = -1;
+           return;
+       }
 
-               /* hack to ensure we won't close the fd again, if we get another push */
-               elt->expect_eof = TRUE;
+       if ((self->output_data_socket = do_directtcp_connect(self,
+                                   elt->downstream->input_listen_addrs)) == -1) {
+           /* do_directtcp_connect already signalled an error; xfer
+            * is cancelled */
+           return;
+       }
+
+       /* read from this new socket */
+       self->write_fdp = &self->output_data_socket;
+    }
+
+    switch (self->on_push) {
+       case PUSH_TO_RING_BUFFER:
+           /* just drop packets if the transfer has been cancelled */
+           if (elt->cancelled) {
+               amfree(buf);
+               return;
            }
 
-           amfree(buf);
+           /* make sure there's at least one element free */
+           semaphore_down(self->ring_free_sem);
+
+           /* set it */
+           self->ring[self->ring_head].buf = buf;
+           self->ring[self->ring_head].size = len;
+           self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
+
+           /* and mark this element as available for reading */
+           semaphore_up(self->ring_used_sem);
 
            return;
-       }
 
-       /* write the full buffer to the fd, or close on EOF */
-       if (buf) {
-           if (full_write(fd, buf, len) < len) {
-               xfer_element_handle_error(elt,
-                   _("Error writing to fd %d: %s"), fd, strerror(errno));
-               /* nothing special to do to handle the cancellation */
+       case PUSH_TO_FD: {
+           int fd = *self->write_fdp;
+
+           if (elt->cancelled) {
+               if (!elt->expect_eof || !buf) {
+                   close(fd);
+                   *self->write_fdp = -1;
+
+                   /* hack to ensure we won't close the fd again, if we get another push */
+                   elt->expect_eof = TRUE;
+               }
+
+               amfree(buf);
+
+               return;
            }
-           amfree(buf);
-       } else {
-           close(fd);
-           *fdp = -1;
+
+           /* write the full buffer to the fd, or close on EOF */
+           if (buf) {
+               if (full_write(fd, buf, len) < len) {
+                   if (!elt->cancelled) {
+                       xfer_cancel_with_error(elt,
+                           _("Error writing to fd %d: %s"), fd, strerror(errno));
+                       wait_until_xfer_cancelled(elt->xfer);
+                   }
+                   /* nothing special to do to handle a cancellation */
+               }
+               amfree(buf);
+           } else {
+               close(fd);
+               *self->write_fdp = -1;
+           }
+
+           return;
        }
 
-       return;
+       default:
+       case PUSH_INVALID:
+           g_assert_not_reached();
+           break;
     }
 }
 
@@ -519,6 +1056,10 @@ instance_init(
     XferElement *elt = (XferElement *)self;
     elt->can_generate_eof = TRUE;
     self->pipe[0] = self->pipe[1] = -1;
+    self->input_listen_socket = -1;
+    self->output_listen_socket = -1;
+    self->input_data_socket = -1;
+    self->output_data_socket = -1;
 }
 
 static void
@@ -530,6 +1071,10 @@ finalize_impl(
     /* close our pipes if they're still open (they shouldn't be!) */
     if (self->pipe[0] != -1) close(self->pipe[0]);
     if (self->pipe[1] != -1) close(self->pipe[1]);
+    if (self->input_listen_socket != -1) close(self->input_listen_socket);
+    if (self->output_listen_socket != -1) close(self->output_listen_socket);
+    if (self->input_data_socket != -1) close(self->input_data_socket);
+    if (self->output_data_socket != -1) close(self->output_data_socket);
 
     if (self->ring) {
        /* empty the ring buffer, ignoring syncronization issues */
@@ -552,18 +1097,38 @@ static xfer_element_mech_pair_t _pairs[] = {
     { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
     { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
     { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
+    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
+    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
 
     { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
     { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
     { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
+    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* pipe + splice or copy*/
+    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy + pipe */
 
     { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
     { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
     { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 0 }, /* write on demand */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 0 }, /* write on demand */
 
     { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
     { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
     { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 1 }, /* call and write */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 1 }, /* call and write */
+
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
+
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy + pipe */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy  */
 
     /* terminator */
     { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},