Merge tag 'upstream/3.3.2'
[debian/amanda] / xfer-src / element-glue.c
index 29e639ad5aac16ff89bf6421b300a5138c1cd9e4..bb31e938dd0f2206efb6ce99da56ba2992279fa6 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
+ * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 as published
  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
  */
 
+#include "amanda.h"
 #include "amxfer.h"
 #include "element-glue.h"
-#include "amanda.h"
 #include "directtcp.h"
+#include "util.h"
 #include "sockaddr-util.h"
+#include "debug.h"
 
 /*
  * Instance definition
@@ -35,7 +37,7 @@ typedef struct XferElementGlue_ {
     /* instructions to push_buffer_impl */
     enum {
        PUSH_TO_RING_BUFFER,
-       PUSH_TO_FD, /* write to *write_fdp */
+       PUSH_TO_FD, /* write to write_fd */
        PUSH_INVALID,
 
        PUSH_ACCEPT_FIRST = (1 << 16),
@@ -45,7 +47,7 @@ typedef struct XferElementGlue_ {
     /* instructions to pull_buffer_impl */
     enum {
        PULL_FROM_RING_BUFFER,
-       PULL_FROM_FD, /* read from *read_fdp */
+       PULL_FROM_FD, /* read from read_fd */
        PULL_INVALID,
 
        PULL_ACCEPT_FIRST = (1 << 16),
@@ -62,10 +64,11 @@ typedef struct XferElementGlue_ {
     int pipe[2];
     int input_listen_socket, output_listen_socket;
     int input_data_socket, output_data_socket;
+    int read_fd, write_fd;
 
     /* a ring buffer of ptr/size pairs with semaphores */
     struct { gpointer buf; size_t size; } *ring;
-    semaphore_t *ring_used_sem, *ring_free_sem;
+    amsemaphore_t *ring_used_sem, *ring_free_sem;
     gint ring_head, ring_tail;
 
     GThread *thread;
@@ -109,35 +112,64 @@ do_directtcp_listen(
     DirectTCPAddr **addrsp)
 {
     int sock;
-    sockaddr_union addr;
+    sockaddr_union data_addr;
     DirectTCPAddr *addrs;
     socklen_t len;
+    struct addrinfo *res;
+    struct addrinfo *res_addr;
+    sockaddr_union *addr = NULL;
+
+    if (resolve_hostname("localhost", 0, &res, NULL) != 0) {
+       xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno));
+       return FALSE;
+    }
+    for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
+       if (res_addr->ai_family == AF_INET) {
+            addr = (sockaddr_union *)res_addr->ai_addr;
+            break;
+        }
+    }
+    if (!addr) {
+        addr = (sockaddr_union *)res->ai_addr;
+    }
 
-    sock = *sockp = socket(AF_INET, SOCK_STREAM, 0);
+    sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0);
     if (sock < 0) {
        xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
        return FALSE;
     }
 
+    len = SS_LEN(addr);
+    if (bind(sock, (struct sockaddr *)addr, len) != 0) {
+       xfer_cancel_with_error(elt, "bind(): %s", strerror(errno));
+       freeaddrinfo(res);
+       return FALSE;
+    }
+
     if (listen(sock, 1) < 0) {
        xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
        return FALSE;
     }
 
     /* TODO: which addresses should this display? all ifaces? localhost? */
-    len = sizeof(addr);
-    if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
+    len = sizeof(data_addr);
+    if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0)
        error("getsockname(): %s", strerror(errno));
-    g_assert(SU_GET_FAMILY(&addr) == AF_INET);
 
     addrs = g_new0(DirectTCPAddr, 2);
-    addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); /* TODO: be smarter! */
-    addrs[0].port = SU_GET_PORT(&addr);
+    copy_sockaddr(&addrs[0], &data_addr);
     *addrsp = addrs;
 
     return TRUE;
 }
 
+static gboolean
+prolong_accept(
+    gpointer data)
+{
+    return !XFER_ELEMENT(data)->cancelled;
+}
+
 static int
 do_directtcp_accept(
     XferElementGlue *self,
@@ -146,7 +178,13 @@ do_directtcp_accept(
     int sock;
     g_assert(*socketp != -1);
 
-    if ((sock = accept(*socketp, NULL, NULL)) == -1) {
+    if ((sock = interruptible_accept(*socketp, NULL, NULL,
+                                    prolong_accept, self)) == -1) {
+       /* if the accept was interrupted due to a cancellation, then do not
+        * add a further error message */
+       if (errno == 0 && XFER_ELEMENT(self)->cancelled)
+           return -1;
+
        xfer_cancel_with_error(XFER_ELEMENT(self),
            _("Error accepting incoming connection: %s"), strerror(errno));
        wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
@@ -180,12 +218,10 @@ do_directtcp_connect(
     }
 
     /* set up the sockaddr -- IPv4 only */
-    SU_INIT(&addr, AF_INET);
-    SU_SET_PORT(&addr, addrs->port);
-    ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+    copy_sockaddr(&addr, addrs);
 
-    g_debug("making data connection to %s", str_sockaddr(&addr));
-    sock = socket(AF_INET, SOCK_STREAM, 0);
+    g_debug("do_directtcp_connect making data connection to %s", str_sockaddr(&addr));
+    sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
     if (sock < 0) {
        xfer_cancel_with_error(elt,
            "socket(): %s", strerror(errno));
@@ -211,11 +247,68 @@ cancel_wait:
 
 #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
 
-/* if self->read_fdp or self->write_fdp are pointing to this integer, then
- * they should be redirected to point to the upstream's output_fd or
- * downstream's input_fd, respectively, at start() */
+/*
+ * fd handling
+ */
+
+/* if self->read_fdp or self->write_fdp are pointing to this integer, then they
+ * should be redirected to point to the upstream's output_fd or downstream's
+ * input_fd, respectively, at the first call to get_read_fd or get_write_fd,
+ * respectively. */
 static int neighboring_element_fd = -1;
 
+#define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd)
+static int
+_get_read_fd(XferElementGlue *self)
+{
+    if (!self->read_fdp)
+       return -1; /* shouldn't happen.. */
+
+    if (self->read_fdp == &neighboring_element_fd) {
+       XferElement *elt = XFER_ELEMENT(self);
+       self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1);
+    } else {
+       self->read_fd = *self->read_fdp;
+       *self->read_fdp = -1;
+    }
+    self->read_fdp = NULL;
+    return self->read_fd;
+}
+
+#define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd)
+static int
+_get_write_fd(XferElementGlue *self)
+{
+    if (!self->write_fdp)
+       return -1; /* shouldn't happen.. */
+
+    if (self->write_fdp == &neighboring_element_fd) {
+       XferElement *elt = XFER_ELEMENT(self);
+       self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1);
+    } else {
+       self->write_fd = *self->write_fdp;
+       *self->write_fdp = -1;
+    }
+    self->write_fdp = NULL;
+    return self->write_fd;
+}
+
+static int
+close_read_fd(XferElementGlue *self)
+{
+    int fd = get_read_fd(self);
+    self->read_fd = -1;
+    return close(fd);
+}
+
+static int
+close_write_fd(XferElementGlue *self)
+{
+    int fd = get_write_fd(self);
+    self->write_fd = -1;
+    return close(fd);
+}
+
 /*
  * Worker thread utility functions
  */
@@ -224,7 +317,8 @@ static void
 pull_and_write(XferElementGlue *self)
 {
     XferElement *elt = XFER_ELEMENT(self);
-    int fd = *self->write_fdp;
+    int fd = get_write_fd(self);
+    self->write_fdp = NULL;
 
     while (!elt->cancelled) {
        size_t len;
@@ -250,24 +344,22 @@ pull_and_write(XferElementGlue *self)
     }
 
     if (elt->cancelled && elt->expect_eof)
-       xfer_element_drain_by_pulling(elt->upstream);
+       xfer_element_drain_buffers(elt->upstream);
 
     /* close the fd we've been writing, as an EOF signal to downstream, and
      * set it to -1 to avoid accidental re-use */
-    close(fd);
-    *self->write_fdp = -1;
+    close_write_fd(self);
 }
 
 static void
 read_and_write(XferElementGlue *self)
 {
     XferElement *elt = XFER_ELEMENT(self);
-    int rfd = *self->read_fdp;
-    int wfd = *self->write_fdp;
-
     /* dynamically allocate a buffer, in case this thread has
      * a limited amount of stack allocated */
     char *buf = g_malloc(GLUE_BUFFER_SIZE);
+    int rfd = get_read_fd(self);
+    int wfd = get_write_fd(self);
 
     while (!elt->cancelled) {
        size_t len;
@@ -299,19 +391,14 @@ read_and_write(XferElementGlue *self)
     }
 
     if (elt->cancelled && elt->expect_eof)
-       xfer_element_drain_by_pulling(elt->upstream);
+       xfer_element_drain_fd(rfd);
 
-    /* close the read fd, if it's at EOF, and set it to -1 to avoid accidental
-     * re-use */
-    if (!elt->cancelled || elt->expect_eof) {
-       close(rfd);
-       *self->read_fdp = -1;
-    }
+    /* close the read fd.  If it's not at EOF, then upstream will get EPIPE, which will hopefully
+     * kill it and complete the cancellation */
+    close_read_fd(self);
 
-    /* close the fd we've been writing, as an EOF signal to downstream, and
-     * set it to -1 to avoid accidental re-use */
-    close(wfd);
-    *self->write_fdp = -1;
+    /* close the fd we've been writing, as an EOF signal to downstream */
+    close_write_fd(self);
 
     amfree(buf);
 }
@@ -321,7 +408,7 @@ read_and_push(
     XferElementGlue *self)
 {
     XferElement *elt = XFER_ELEMENT(self);
-    int fd = *self->read_fdp;
+    int fd = get_read_fd(self);
 
     while (!elt->cancelled) {
        char *buf = g_malloc(GLUE_BUFFER_SIZE);
@@ -339,6 +426,7 @@ read_and_push(
                            fd, strerror(saved_errno));
                    wait_until_xfer_cancelled(elt->xfer);
                }
+                amfree(buf);
                break;
            } else if (len == 0) { /* we only count a zero-length read as EOF */
                amfree(buf);
@@ -350,15 +438,13 @@ read_and_push(
     }
 
     if (elt->cancelled && elt->expect_eof)
-       xfer_element_drain_by_reading(fd);
+       xfer_element_drain_fd(fd);
 
     /* send an EOF indication downstream */
     xfer_element_push_buffer(elt->downstream, NULL, 0);
 
-    /* close the read fd, since it's at EOF, and set it to -1 to avoid accidental
-     * re-use */
-    close(fd);
-    *self->read_fdp = -1;
+    /* close the read fd, since it's at EOF */
+    close_read_fd(self);
 }
 
 static void
@@ -384,7 +470,7 @@ pull_and_push(XferElementGlue *self)
     }
 
     if (elt->cancelled && elt->expect_eof)
-       xfer_element_drain_by_pulling(elt->upstream);
+       xfer_element_drain_buffers(elt->upstream);
 
     if (!eof_sent)
        xfer_element_push_buffer(elt->downstream, NULL, 0);
@@ -587,16 +673,16 @@ setup_impl(
 
     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
        make_pipe(self);
-       elt->input_fd = self->pipe[1];
+       g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
        self->pipe[1] = -1; /* upstream will close this for us */
-       elt->output_fd = self->pipe[0];
+       g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
        self->pipe[0] = -1; /* downstream will close this for us */
        break;
 
     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
        /* thread will read from pipe and call downstream's push_buffer */
        make_pipe(self);
-       elt->input_fd = self->pipe[1];
+       g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
        self->pipe[1] = -1; /* upstream will close this for us */
        self->read_fdp = &self->pipe[0];
        self->need_thread = TRUE;
@@ -604,7 +690,7 @@ setup_impl(
 
     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
        make_pipe(self);
-       elt->input_fd = self->pipe[1];
+       g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
        self->pipe[1] = -1; /* upstream will close this for us */
        self->on_pull = PULL_FROM_FD;
        self->read_fdp = &self->pipe[0];
@@ -613,7 +699,7 @@ setup_impl(
     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
        /* thread will connect for output, then read from pipe and write to socket */
        make_pipe(self);
-       elt->input_fd = self->pipe[1];
+       g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
        self->pipe[1] = -1; /* upstream will close this for us */
        self->read_fdp = &self->pipe[0];
        self->need_thread = TRUE;
@@ -622,7 +708,7 @@ setup_impl(
     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
        /* thread will accept output conn, then read from pipe and write to socket */
        make_pipe(self);
-       elt->input_fd = self->pipe[1];
+       g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
        self->pipe[1] = -1; /* upstream will close this for us */
        self->read_fdp = &self->pipe[0];
        self->need_thread = TRUE;
@@ -631,7 +717,7 @@ setup_impl(
 
     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
        make_pipe(self);
-       elt->output_fd = self->pipe[0];
+       g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
        self->pipe[0] = -1; /* downstream will close this for us */
        self->on_push = PUSH_TO_FD;
        self->write_fdp = &self->pipe[1];
@@ -662,7 +748,7 @@ setup_impl(
     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
        /* thread will pull from upstream and write to pipe */
        make_pipe(self);
-       elt->output_fd = self->pipe[0];
+       g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
        self->pipe[0] = -1; /* downstream will close this for us */
        self->write_fdp = &self->pipe[1];
        self->need_thread = TRUE;
@@ -693,7 +779,7 @@ setup_impl(
     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
        /* thread will accept for input, then read from socket and write to pipe */
        make_pipe(self);
-       elt->output_fd = self->pipe[0];
+       g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
        self->pipe[0] = -1; /* downstream will close this for us */
        self->write_fdp = &self->pipe[1];
        self->need_thread = TRUE;
@@ -729,7 +815,7 @@ setup_impl(
     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
        /* thread will connect for input, then read from socket and write to pipe */
        make_pipe(self);
-       elt->output_fd = self->pipe[0];
+       g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
        self->pipe[0] = -1; /* downstream will close this for us */
        self->write_fdp = &self->pipe[1];
        self->need_thread = TRUE;
@@ -765,8 +851,8 @@ setup_impl(
     /* set up ring if desired */
     if (need_ring) {
        self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
-       self->ring_used_sem = semaphore_new_with_value(0);
-       self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
+       self->ring_used_sem = amsemaphore_new_with_value(0);
+       self->ring_free_sem = amsemaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
     }
 
     if (need_listen_input) {
@@ -789,14 +875,8 @@ start_impl(
 {
     XferElementGlue *self = (XferElementGlue *)elt;
 
-    /* upstream and downstream are now set, so we can point our fdp's to them */
-    if (self->write_fdp == &neighboring_element_fd)
-       self->write_fdp = &elt->downstream->input_fd;
-    if (self->read_fdp == &neighboring_element_fd)
-       self->read_fdp = &elt->upstream->output_fd;
-
     if (self->need_thread)
-       self->thread = g_thread_create(worker_thread, (gpointer)self, FALSE, NULL);
+       self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL);
 
     /* we're active if we have a thread that will eventually die */
     return self->need_thread;
@@ -864,7 +944,7 @@ pull_buffer_impl(
            }
 
            /* make sure there's at least one element available */
-           semaphore_down(self->ring_used_sem);
+           amsemaphore_down(self->ring_used_sem);
 
            /* get it */
            buf = self->ring[self->ring_tail].buf;
@@ -872,27 +952,32 @@ pull_buffer_impl(
            self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
 
            /* and mark this element as free to be overwritten */
-           semaphore_up(self->ring_free_sem);
+           amsemaphore_up(self->ring_free_sem);
 
            return buf;
        }
 
        case PULL_FROM_FD: {
-           int fd = *self->read_fdp;
-           char *buf = g_malloc(GLUE_BUFFER_SIZE);
+           int fd = get_read_fd(self);
+           char *buf;
            ssize_t len;
 
-           if (elt->cancelled) {
-               if (elt->expect_eof)
-                   xfer_element_drain_by_reading(fd);
+           /* if the fd is already closed, it's possible upstream bailed out
+            * so quickly that we didn't even get a look at the fd */
+           if (elt->cancelled || fd == -1) {
+               if (fd != -1) {
+                   if (elt->expect_eof)
+                       xfer_element_drain_fd(fd);
 
-               close(fd);
-               *self->read_fdp = -1;
+                   close_read_fd(self);
+               }
 
                *size = 0;
                return NULL;
            }
 
+           buf = g_malloc(GLUE_BUFFER_SIZE);
+
            /* read from upstream */
            len = full_read(fd, buf, GLUE_BUFFER_SIZE);
            if (len < GLUE_BUFFER_SIZE) {
@@ -909,10 +994,9 @@ pull_buffer_impl(
 
                    /* and finish off the upstream */
                    if (elt->expect_eof) {
-                       xfer_element_drain_by_reading(fd);
+                       xfer_element_drain_fd(fd);
                    }
-                   close(fd);
-                   *self->read_fdp = -1;
+                   close_read_fd(self);
                } else if (len == 0) {
                    /* EOF */
                    g_free(buf);
@@ -920,8 +1004,7 @@ pull_buffer_impl(
                    *size = 0;
 
                    /* signal EOF to downstream */
-                   close(fd);
-                   *self->read_fdp = -1;
+                   close_read_fd(self);
                }
            }
 
@@ -994,7 +1077,7 @@ push_buffer_impl(
            }
 
            /* make sure there's at least one element free */
-           semaphore_down(self->ring_free_sem);
+           amsemaphore_down(self->ring_free_sem);
 
            /* set it */
            self->ring[self->ring_head].buf = buf;
@@ -1002,17 +1085,23 @@ push_buffer_impl(
            self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
 
            /* and mark this element as available for reading */
-           semaphore_up(self->ring_used_sem);
+           amsemaphore_up(self->ring_used_sem);
 
            return;
 
        case PUSH_TO_FD: {
-           int fd = *self->write_fdp;
+           int fd = get_write_fd(self);
+
+           /* if the fd is already closed, it's possible upstream bailed out
+            * so quickly that we didn't even get a look at the fd.  In this
+            * case we can assume the xfer has been cancelled and just discard
+            * the data. */
+           if (fd == -1)
+               return;
 
            if (elt->cancelled) {
                if (!elt->expect_eof || !buf) {
-                   close(fd);
-                   *self->write_fdp = -1;
+                   close_write_fd(self);
 
                    /* hack to ensure we won't close the fd again, if we get another push */
                    elt->expect_eof = TRUE;
@@ -1035,8 +1124,7 @@ push_buffer_impl(
                }
                amfree(buf);
            } else {
-               close(fd);
-               *self->write_fdp = -1;
+               close_write_fd(self);
            }
 
            return;
@@ -1060,6 +1148,8 @@ instance_init(
     self->output_listen_socket = -1;
     self->input_data_socket = -1;
     self->output_data_socket = -1;
+    self->read_fd = -1;
+    self->write_fd = -1;
 }
 
 static void
@@ -1068,13 +1158,19 @@ finalize_impl(
 {
     XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
 
-    /* close our pipes if they're still open (they shouldn't be!) */
+    /* first make sure the worker thread has finished up */
+    if (self->thread)
+       g_thread_join(self->thread);
+
+    /* close our pipes and fd's if they're still open */
     if (self->pipe[0] != -1) close(self->pipe[0]);
     if (self->pipe[1] != -1) close(self->pipe[1]);
-    if (self->input_listen_socket != -1) close(self->input_listen_socket);
-    if (self->output_listen_socket != -1) close(self->output_listen_socket);
     if (self->input_data_socket != -1) close(self->input_data_socket);
     if (self->output_data_socket != -1) close(self->output_data_socket);
+    if (self->input_listen_socket != -1) close(self->input_listen_socket);
+    if (self->output_listen_socket != -1) close(self->output_listen_socket);
+    if (self->read_fd != -1) close(self->read_fd);
+    if (self->write_fd != -1) close(self->write_fd);
 
     if (self->ring) {
        /* empty the ring buffer, ignoring syncronization issues */
@@ -1085,8 +1181,8 @@ finalize_impl(
        }
 
        amfree(self->ring);
-       semaphore_free(self->ring_used_sem);
-       semaphore_free(self->ring_free_sem);
+       amsemaphore_free(self->ring_used_sem);
+       amsemaphore_free(self->ring_free_sem);
     }
 
     /* chain up */
@@ -1094,44 +1190,44 @@ finalize_impl(
 }
 
 static xfer_element_mech_pair_t _pairs[] = {
-    { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
-    { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
-    { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
-    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
-    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
-
-    { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
-    { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
-    { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
-    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* pipe + splice or copy*/
-    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy + pipe */
-
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 0 }, /* write on demand */
-    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 0 }, /* write on demand */
-
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 1 }, /* call and write */
-    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 1 }, /* call and write */
-
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
-    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
-
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy + pipe */
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
-    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy  */
+    { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+    { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+    { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+    { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+
+    { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */
+    { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/
+    { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */
+    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/
+    { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+    { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+    { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy  */
 
     /* terminator */
-    { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
 };
 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;