/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
+ * Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 as published
- * by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
*/
+#include "amanda.h"
#include "amxfer.h"
#include "element-glue.h"
-#include "amanda.h"
#include "directtcp.h"
+#include "util.h"
#include "sockaddr-util.h"
+#include "debug.h"
/*
* Instance definition
/* instructions to push_buffer_impl */
enum {
PUSH_TO_RING_BUFFER,
- PUSH_TO_FD, /* write to *write_fdp */
+ PUSH_TO_FD, /* write to write_fd */
PUSH_INVALID,
PUSH_ACCEPT_FIRST = (1 << 16),
/* instructions to pull_buffer_impl */
enum {
PULL_FROM_RING_BUFFER,
- PULL_FROM_FD, /* read from *read_fdp */
+ PULL_FROM_FD, /* read from read_fd */
PULL_INVALID,
PULL_ACCEPT_FIRST = (1 << 16),
int pipe[2];
int input_listen_socket, output_listen_socket;
int input_data_socket, output_data_socket;
+ int read_fd, write_fd;
/* a ring buffer of ptr/size pairs with semaphores */
struct { gpointer buf; size_t size; } *ring;
- semaphore_t *ring_used_sem, *ring_free_sem;
+ amsemaphore_t *ring_used_sem, *ring_free_sem;
gint ring_head, ring_tail;
GThread *thread;
DirectTCPAddr **addrsp)
{
int sock;
- sockaddr_union addr;
+ sockaddr_union data_addr;
DirectTCPAddr *addrs;
socklen_t len;
+ struct addrinfo *res;
+ struct addrinfo *res_addr;
+ sockaddr_union *addr = NULL;
+
+ if (resolve_hostname("localhost", 0, &res, NULL) != 0) {
+ xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno));
+ return FALSE;
+ }
+ for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
+ if (res_addr->ai_family == AF_INET) {
+ addr = (sockaddr_union *)res_addr->ai_addr;
+ break;
+ }
+ }
+ if (!addr) {
+ addr = (sockaddr_union *)res->ai_addr;
+ }
- sock = *sockp = socket(AF_INET, SOCK_STREAM, 0);
+ sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0);
if (sock < 0) {
xfer_cancel_with_error(elt, "socket(): %s", strerror(errno));
return FALSE;
}
+ len = SS_LEN(addr);
+ if (bind(sock, (struct sockaddr *)addr, len) != 0) {
+ xfer_cancel_with_error(elt, "bind(): %s", strerror(errno));
+ freeaddrinfo(res);
+ return FALSE;
+ }
+
if (listen(sock, 1) < 0) {
xfer_cancel_with_error(elt, "listen(): %s", strerror(errno));
return FALSE;
}
/* TODO: which addresses should this display? all ifaces? localhost? */
- len = sizeof(addr);
- if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0)
+ len = sizeof(data_addr);
+ if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0)
error("getsockname(): %s", strerror(errno));
- g_assert(SU_GET_FAMILY(&addr) == AF_INET);
addrs = g_new0(DirectTCPAddr, 2);
- addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); /* TODO: be smarter! */
- addrs[0].port = SU_GET_PORT(&addr);
+ copy_sockaddr(&addrs[0], &data_addr);
*addrsp = addrs;
return TRUE;
}
+static gboolean
+prolong_accept(
+ gpointer data)
+{
+ return !XFER_ELEMENT(data)->cancelled;
+}
+
static int
do_directtcp_accept(
XferElementGlue *self,
int sock;
g_assert(*socketp != -1);
- if ((sock = accept(*socketp, NULL, NULL)) == -1) {
+ if ((sock = interruptible_accept(*socketp, NULL, NULL,
+ prolong_accept, self)) == -1) {
+ /* if the accept was interrupted due to a cancellation, then do not
+ * add a further error message */
+ if (errno == 0 && XFER_ELEMENT(self)->cancelled)
+ return -1;
+
xfer_cancel_with_error(XFER_ELEMENT(self),
_("Error accepting incoming connection: %s"), strerror(errno));
wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
}
/* set up the sockaddr -- IPv4 only */
- SU_INIT(&addr, AF_INET);
- SU_SET_PORT(&addr, addrs->port);
- ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4);
+ copy_sockaddr(&addr, addrs);
- g_debug("making data connection to %s", str_sockaddr(&addr));
- sock = socket(AF_INET, SOCK_STREAM, 0);
+ g_debug("do_directtcp_connect making data connection to %s", str_sockaddr(&addr));
+ sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0);
if (sock < 0) {
xfer_cancel_with_error(elt,
"socket(): %s", strerror(errno));
#define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
-/* if self->read_fdp or self->write_fdp are pointing to this integer, then
- * they should be redirected to point to the upstream's output_fd or
- * downstream's input_fd, respectively, at start() */
+/*
+ * fd handling
+ */
+
+/* if self->read_fdp or self->write_fdp are pointing to this integer, then they
+ * should be redirected to point to the upstream's output_fd or downstream's
+ * input_fd, respectively, at the first call to get_read_fd or get_write_fd,
+ * respectively. */
static int neighboring_element_fd = -1;
+#define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd)
+static int
+_get_read_fd(XferElementGlue *self)
+{
+ if (!self->read_fdp)
+ return -1; /* shouldn't happen.. */
+
+ if (self->read_fdp == &neighboring_element_fd) {
+ XferElement *elt = XFER_ELEMENT(self);
+ self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1);
+ } else {
+ self->read_fd = *self->read_fdp;
+ *self->read_fdp = -1;
+ }
+ self->read_fdp = NULL;
+ return self->read_fd;
+}
+
+#define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd)
+static int
+_get_write_fd(XferElementGlue *self)
+{
+ if (!self->write_fdp)
+ return -1; /* shouldn't happen.. */
+
+ if (self->write_fdp == &neighboring_element_fd) {
+ XferElement *elt = XFER_ELEMENT(self);
+ self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1);
+ } else {
+ self->write_fd = *self->write_fdp;
+ *self->write_fdp = -1;
+ }
+ self->write_fdp = NULL;
+ return self->write_fd;
+}
+
+static int
+close_read_fd(XferElementGlue *self)
+{
+ int fd = get_read_fd(self);
+ self->read_fd = -1;
+ return close(fd);
+}
+
+static int
+close_write_fd(XferElementGlue *self)
+{
+ int fd = get_write_fd(self);
+ self->write_fd = -1;
+ return close(fd);
+}
+
/*
* Worker thread utility functions
*/
pull_and_write(XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
- int fd = *self->write_fdp;
+ int fd = get_write_fd(self);
+ self->write_fdp = NULL;
while (!elt->cancelled) {
size_t len;
}
if (elt->cancelled && elt->expect_eof)
- xfer_element_drain_by_pulling(elt->upstream);
+ xfer_element_drain_buffers(elt->upstream);
/* close the fd we've been writing, as an EOF signal to downstream, and
* set it to -1 to avoid accidental re-use */
- close(fd);
- *self->write_fdp = -1;
+ close_write_fd(self);
}
static void
read_and_write(XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
- int rfd = *self->read_fdp;
- int wfd = *self->write_fdp;
-
/* dynamically allocate a buffer, in case this thread has
* a limited amount of stack allocated */
char *buf = g_malloc(GLUE_BUFFER_SIZE);
+ int rfd = get_read_fd(self);
+ int wfd = get_write_fd(self);
while (!elt->cancelled) {
size_t len;
}
if (elt->cancelled && elt->expect_eof)
- xfer_element_drain_by_pulling(elt->upstream);
+ xfer_element_drain_fd(rfd);
- /* close the read fd, if it's at EOF, and set it to -1 to avoid accidental
- * re-use */
- if (!elt->cancelled || elt->expect_eof) {
- close(rfd);
- *self->read_fdp = -1;
- }
+ /* close the read fd. If it's not at EOF, then upstream will get EPIPE, which will hopefully
+ * kill it and complete the cancellation */
+ close_read_fd(self);
- /* close the fd we've been writing, as an EOF signal to downstream, and
- * set it to -1 to avoid accidental re-use */
- close(wfd);
- *self->write_fdp = -1;
+ /* close the fd we've been writing, as an EOF signal to downstream */
+ close_write_fd(self);
amfree(buf);
}
XferElementGlue *self)
{
XferElement *elt = XFER_ELEMENT(self);
- int fd = *self->read_fdp;
+ int fd = get_read_fd(self);
while (!elt->cancelled) {
char *buf = g_malloc(GLUE_BUFFER_SIZE);
fd, strerror(saved_errno));
wait_until_xfer_cancelled(elt->xfer);
}
+ amfree(buf);
break;
} else if (len == 0) { /* we only count a zero-length read as EOF */
amfree(buf);
}
if (elt->cancelled && elt->expect_eof)
- xfer_element_drain_by_reading(fd);
+ xfer_element_drain_fd(fd);
/* send an EOF indication downstream */
xfer_element_push_buffer(elt->downstream, NULL, 0);
- /* close the read fd, since it's at EOF, and set it to -1 to avoid accidental
- * re-use */
- close(fd);
- *self->read_fdp = -1;
+ /* close the read fd, since it's at EOF */
+ close_read_fd(self);
}
static void
}
if (elt->cancelled && elt->expect_eof)
- xfer_element_drain_by_pulling(elt->upstream);
+ xfer_element_drain_buffers(elt->upstream);
if (!eof_sent)
xfer_element_push_buffer(elt->downstream, NULL, 0);
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
make_pipe(self);
- elt->input_fd = self->pipe[1];
+ g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
- elt->output_fd = self->pipe[0];
+ g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
break;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
/* thread will read from pipe and call downstream's push_buffer */
make_pipe(self);
- elt->input_fd = self->pipe[1];
+ g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->read_fdp = &self->pipe[0];
self->need_thread = TRUE;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
make_pipe(self);
- elt->input_fd = self->pipe[1];
+ g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->on_pull = PULL_FROM_FD;
self->read_fdp = &self->pipe[0];
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
/* thread will connect for output, then read from pipe and write to socket */
make_pipe(self);
- elt->input_fd = self->pipe[1];
+ g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->read_fdp = &self->pipe[0];
self->need_thread = TRUE;
case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
/* thread will accept output conn, then read from pipe and write to socket */
make_pipe(self);
- elt->input_fd = self->pipe[1];
+ g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
self->pipe[1] = -1; /* upstream will close this for us */
self->read_fdp = &self->pipe[0];
self->need_thread = TRUE;
case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
make_pipe(self);
- elt->output_fd = self->pipe[0];
+ g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
self->on_push = PUSH_TO_FD;
self->write_fdp = &self->pipe[1];
case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
/* thread will pull from upstream and write to pipe */
make_pipe(self);
- elt->output_fd = self->pipe[0];
+ g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
self->write_fdp = &self->pipe[1];
self->need_thread = TRUE;
case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
/* thread will accept for input, then read from socket and write to pipe */
make_pipe(self);
- elt->output_fd = self->pipe[0];
+ g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
self->write_fdp = &self->pipe[1];
self->need_thread = TRUE;
case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
/* thread will connect for input, then read from socket and write to pipe */
make_pipe(self);
- elt->output_fd = self->pipe[0];
+ g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
self->pipe[0] = -1; /* downstream will close this for us */
self->write_fdp = &self->pipe[1];
self->need_thread = TRUE;
/* set up ring if desired */
if (need_ring) {
self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
- self->ring_used_sem = semaphore_new_with_value(0);
- self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
+ self->ring_used_sem = amsemaphore_new_with_value(0);
+ self->ring_free_sem = amsemaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
}
if (need_listen_input) {
{
XferElementGlue *self = (XferElementGlue *)elt;
- /* upstream and downstream are now set, so we can point our fdp's to them */
- if (self->write_fdp == &neighboring_element_fd)
- self->write_fdp = &elt->downstream->input_fd;
- if (self->read_fdp == &neighboring_element_fd)
- self->read_fdp = &elt->upstream->output_fd;
-
if (self->need_thread)
- self->thread = g_thread_create(worker_thread, (gpointer)self, FALSE, NULL);
+ self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL);
/* we're active if we have a thread that will eventually die */
return self->need_thread;
}
/* make sure there's at least one element available */
- semaphore_down(self->ring_used_sem);
+ amsemaphore_down(self->ring_used_sem);
/* get it */
buf = self->ring[self->ring_tail].buf;
self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
/* and mark this element as free to be overwritten */
- semaphore_up(self->ring_free_sem);
+ amsemaphore_up(self->ring_free_sem);
return buf;
}
case PULL_FROM_FD: {
- int fd = *self->read_fdp;
- char *buf = g_malloc(GLUE_BUFFER_SIZE);
+ int fd = get_read_fd(self);
+ char *buf;
ssize_t len;
- if (elt->cancelled) {
- if (elt->expect_eof)
- xfer_element_drain_by_reading(fd);
+ /* if the fd is already closed, it's possible upstream bailed out
+ * so quickly that we didn't even get a look at the fd */
+ if (elt->cancelled || fd == -1) {
+ if (fd != -1) {
+ if (elt->expect_eof)
+ xfer_element_drain_fd(fd);
- close(fd);
- *self->read_fdp = -1;
+ close_read_fd(self);
+ }
*size = 0;
return NULL;
}
+ buf = g_malloc(GLUE_BUFFER_SIZE);
+
/* read from upstream */
len = full_read(fd, buf, GLUE_BUFFER_SIZE);
if (len < GLUE_BUFFER_SIZE) {
/* and finish off the upstream */
if (elt->expect_eof) {
- xfer_element_drain_by_reading(fd);
+ xfer_element_drain_fd(fd);
}
- close(fd);
- *self->read_fdp = -1;
+ close_read_fd(self);
} else if (len == 0) {
/* EOF */
g_free(buf);
*size = 0;
/* signal EOF to downstream */
- close(fd);
- *self->read_fdp = -1;
+ close_read_fd(self);
}
}
}
/* make sure there's at least one element free */
- semaphore_down(self->ring_free_sem);
+ amsemaphore_down(self->ring_free_sem);
/* set it */
self->ring[self->ring_head].buf = buf;
self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
/* and mark this element as available for reading */
- semaphore_up(self->ring_used_sem);
+ amsemaphore_up(self->ring_used_sem);
return;
case PUSH_TO_FD: {
- int fd = *self->write_fdp;
+ int fd = get_write_fd(self);
+
+ /* if the fd is already closed, it's possible upstream bailed out
+ * so quickly that we didn't even get a look at the fd. In this
+ * case we can assume the xfer has been cancelled and just discard
+ * the data. */
+ if (fd == -1)
+ return;
if (elt->cancelled) {
if (!elt->expect_eof || !buf) {
- close(fd);
- *self->write_fdp = -1;
+ close_write_fd(self);
/* hack to ensure we won't close the fd again, if we get another push */
elt->expect_eof = TRUE;
}
amfree(buf);
} else {
- close(fd);
- *self->write_fdp = -1;
+ close_write_fd(self);
}
return;
self->output_listen_socket = -1;
self->input_data_socket = -1;
self->output_data_socket = -1;
+ self->read_fd = -1;
+ self->write_fd = -1;
}
static void
{
XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
- /* close our pipes if they're still open (they shouldn't be!) */
+ /* first make sure the worker thread has finished up */
+ if (self->thread)
+ g_thread_join(self->thread);
+
+ /* close our pipes and fd's if they're still open */
if (self->pipe[0] != -1) close(self->pipe[0]);
if (self->pipe[1] != -1) close(self->pipe[1]);
- if (self->input_listen_socket != -1) close(self->input_listen_socket);
- if (self->output_listen_socket != -1) close(self->output_listen_socket);
if (self->input_data_socket != -1) close(self->input_data_socket);
if (self->output_data_socket != -1) close(self->output_data_socket);
+ if (self->input_listen_socket != -1) close(self->input_listen_socket);
+ if (self->output_listen_socket != -1) close(self->output_listen_socket);
+ if (self->read_fd != -1) close(self->read_fd);
+ if (self->write_fd != -1) close(self->write_fd);
if (self->ring) {
/* empty the ring buffer, ignoring syncronization issues */
}
amfree(self->ring);
- semaphore_free(self->ring_used_sem);
- semaphore_free(self->ring_free_sem);
+ amsemaphore_free(self->ring_used_sem);
+ amsemaphore_free(self->ring_free_sem);
}
/* chain up */
}
static xfer_element_mech_pair_t _pairs[] = {
- { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
- { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
- { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
- { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
- { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
-
- { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
- { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
- { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
- { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* pipe + splice or copy*/
- { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy + pipe */
-
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 0 }, /* write on demand */
- { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 0 }, /* write on demand */
-
- { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
- { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
- { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
- { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 1 }, /* call and write */
- { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 1 }, /* call and write */
-
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
- { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */
-
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy + pipe */
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
- { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */
+ { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+ { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+ { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+ { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+ { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+
+ { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */
+ { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/
+ { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */
+ { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/
+ { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */
+
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+ { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */
+
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+ { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
+
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */
+ { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */
/* terminator */
- { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) },
};
xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;