X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=xfer-src%2Felement-glue.c;h=ecc543d66d924ed1c407ff58af3a6562305158de;hb=c0dcac5cea561cfb1faa01a0c0738a8768c22efd;hp=29e639ad5aac16ff89bf6421b300a5138c1cd9e4;hpb=d5853102f67d85d8e169f9dbe973ad573306c215;p=debian%2Famanda diff --git a/xfer-src/element-glue.c b/xfer-src/element-glue.c index 29e639a..ecc543d 100644 --- a/xfer-src/element-glue.c +++ b/xfer-src/element-glue.c @@ -19,11 +19,13 @@ * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com */ +#include "amanda.h" #include "amxfer.h" #include "element-glue.h" -#include "amanda.h" #include "directtcp.h" +#include "util.h" #include "sockaddr-util.h" +#include "debug.h" /* * Instance definition @@ -35,7 +37,7 @@ typedef struct XferElementGlue_ { /* instructions to push_buffer_impl */ enum { PUSH_TO_RING_BUFFER, - PUSH_TO_FD, /* write to *write_fdp */ + PUSH_TO_FD, /* write to write_fd */ PUSH_INVALID, PUSH_ACCEPT_FIRST = (1 << 16), @@ -45,7 +47,7 @@ typedef struct XferElementGlue_ { /* instructions to pull_buffer_impl */ enum { PULL_FROM_RING_BUFFER, - PULL_FROM_FD, /* read from *read_fdp */ + PULL_FROM_FD, /* read from read_fd */ PULL_INVALID, PULL_ACCEPT_FIRST = (1 << 16), @@ -62,6 +64,7 @@ typedef struct XferElementGlue_ { int pipe[2]; int input_listen_socket, output_listen_socket; int input_data_socket, output_data_socket; + int read_fd, write_fd; /* a ring buffer of ptr/size pairs with semaphores */ struct { gpointer buf; size_t size; } *ring; @@ -109,35 +112,64 @@ do_directtcp_listen( DirectTCPAddr **addrsp) { int sock; - sockaddr_union addr; + sockaddr_union data_addr; DirectTCPAddr *addrs; socklen_t len; + struct addrinfo *res; + struct addrinfo *res_addr; + sockaddr_union *addr = NULL; + + if (resolve_hostname("localhost", 0, &res, NULL) != 0) { + xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno)); + return FALSE; + } + for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) { + if (res_addr->ai_family == AF_INET) { + addr = (sockaddr_union *)res_addr->ai_addr; + break; + } + } + if (!addr) { + addr = (sockaddr_union *)res->ai_addr; + } - sock = *sockp = socket(AF_INET, SOCK_STREAM, 0); + sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0); if (sock < 0) { xfer_cancel_with_error(elt, "socket(): %s", strerror(errno)); return FALSE; } + len = SS_LEN(addr); + if (bind(sock, (struct sockaddr *)addr, len) != 0) { + xfer_cancel_with_error(elt, "bind(): %s", strerror(errno)); + freeaddrinfo(res); + return FALSE; + } + if (listen(sock, 1) < 0) { xfer_cancel_with_error(elt, "listen(): %s", strerror(errno)); return FALSE; } /* TODO: which addresses should this display? all ifaces? localhost? */ - len = sizeof(addr); - if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0) + len = sizeof(data_addr); + if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0) error("getsockname(): %s", strerror(errno)); - g_assert(SU_GET_FAMILY(&addr) == AF_INET); addrs = g_new0(DirectTCPAddr, 2); - addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); /* TODO: be smarter! */ - addrs[0].port = SU_GET_PORT(&addr); + copy_sockaddr(&addrs[0], &data_addr); *addrsp = addrs; return TRUE; } +static gboolean +prolong_accept( + gpointer data) +{ + return !XFER_ELEMENT(data)->cancelled; +} + static int do_directtcp_accept( XferElementGlue *self, @@ -146,7 +178,13 @@ do_directtcp_accept( int sock; g_assert(*socketp != -1); - if ((sock = accept(*socketp, NULL, NULL)) == -1) { + if ((sock = interruptible_accept(*socketp, NULL, NULL, + prolong_accept, self)) == -1) { + /* if the accept was interrupted due to a cancellation, then do not + * add a further error message */ + if (errno == 0 && XFER_ELEMENT(self)->cancelled) + return -1; + xfer_cancel_with_error(XFER_ELEMENT(self), _("Error accepting incoming connection: %s"), strerror(errno)); wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer); @@ -180,12 +218,10 @@ do_directtcp_connect( } /* set up the sockaddr -- IPv4 only */ - SU_INIT(&addr, AF_INET); - SU_SET_PORT(&addr, addrs->port); - ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4); + copy_sockaddr(&addr, addrs); g_debug("making data connection to %s", str_sockaddr(&addr)); - sock = socket(AF_INET, SOCK_STREAM, 0); + sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0); if (sock < 0) { xfer_cancel_with_error(elt, "socket(): %s", strerror(errno)); @@ -211,11 +247,68 @@ cancel_wait: #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT)) -/* if self->read_fdp or self->write_fdp are pointing to this integer, then - * they should be redirected to point to the upstream's output_fd or - * downstream's input_fd, respectively, at start() */ +/* + * fd handling + */ + +/* if self->read_fdp or self->write_fdp are pointing to this integer, then they + * should be redirected to point to the upstream's output_fd or downstream's + * input_fd, respectively, at the first call to get_read_fd or get_write_fd, + * respectively. */ static int neighboring_element_fd = -1; +#define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd) +static int +_get_read_fd(XferElementGlue *self) +{ + if (!self->read_fdp) + return -1; /* shouldn't happen.. */ + + if (self->read_fdp == &neighboring_element_fd) { + XferElement *elt = XFER_ELEMENT(self); + self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1); + } else { + self->read_fd = *self->read_fdp; + *self->read_fdp = -1; + } + self->read_fdp = NULL; + return self->read_fd; +} + +#define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd) +static int +_get_write_fd(XferElementGlue *self) +{ + if (!self->write_fdp) + return -1; /* shouldn't happen.. */ + + if (self->write_fdp == &neighboring_element_fd) { + XferElement *elt = XFER_ELEMENT(self); + self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1); + } else { + self->write_fd = *self->write_fdp; + *self->write_fdp = -1; + } + self->write_fdp = NULL; + return self->write_fd; +} + +static int +close_read_fd(XferElementGlue *self) +{ + int fd = get_read_fd(self); + self->read_fd = -1; + return close(fd); +} + +static int +close_write_fd(XferElementGlue *self) +{ + int fd = get_write_fd(self); + self->write_fd = -1; + return close(fd); +} + /* * Worker thread utility functions */ @@ -224,7 +317,8 @@ static void pull_and_write(XferElementGlue *self) { XferElement *elt = XFER_ELEMENT(self); - int fd = *self->write_fdp; + int fd = get_write_fd(self); + self->write_fdp = NULL; while (!elt->cancelled) { size_t len; @@ -250,24 +344,22 @@ pull_and_write(XferElementGlue *self) } if (elt->cancelled && elt->expect_eof) - xfer_element_drain_by_pulling(elt->upstream); + xfer_element_drain_buffers(elt->upstream); /* close the fd we've been writing, as an EOF signal to downstream, and * set it to -1 to avoid accidental re-use */ - close(fd); - *self->write_fdp = -1; + close_write_fd(self); } static void read_and_write(XferElementGlue *self) { XferElement *elt = XFER_ELEMENT(self); - int rfd = *self->read_fdp; - int wfd = *self->write_fdp; - /* dynamically allocate a buffer, in case this thread has * a limited amount of stack allocated */ char *buf = g_malloc(GLUE_BUFFER_SIZE); + int rfd = get_read_fd(self); + int wfd = get_write_fd(self); while (!elt->cancelled) { size_t len; @@ -299,19 +391,14 @@ read_and_write(XferElementGlue *self) } if (elt->cancelled && elt->expect_eof) - xfer_element_drain_by_pulling(elt->upstream); + xfer_element_drain_fd(rfd); - /* close the read fd, if it's at EOF, and set it to -1 to avoid accidental - * re-use */ - if (!elt->cancelled || elt->expect_eof) { - close(rfd); - *self->read_fdp = -1; - } + /* close the read fd. If it's not at EOF, then upstream will get EPIPE, which will hopefully + * kill it and complete the cancellation */ + close_read_fd(self); - /* close the fd we've been writing, as an EOF signal to downstream, and - * set it to -1 to avoid accidental re-use */ - close(wfd); - *self->write_fdp = -1; + /* close the fd we've been writing, as an EOF signal to downstream */ + close_write_fd(self); amfree(buf); } @@ -321,7 +408,7 @@ read_and_push( XferElementGlue *self) { XferElement *elt = XFER_ELEMENT(self); - int fd = *self->read_fdp; + int fd = get_read_fd(self); while (!elt->cancelled) { char *buf = g_malloc(GLUE_BUFFER_SIZE); @@ -339,6 +426,7 @@ read_and_push( fd, strerror(saved_errno)); wait_until_xfer_cancelled(elt->xfer); } + amfree(buf); break; } else if (len == 0) { /* we only count a zero-length read as EOF */ amfree(buf); @@ -350,15 +438,13 @@ read_and_push( } if (elt->cancelled && elt->expect_eof) - xfer_element_drain_by_reading(fd); + xfer_element_drain_fd(fd); /* send an EOF indication downstream */ xfer_element_push_buffer(elt->downstream, NULL, 0); - /* close the read fd, since it's at EOF, and set it to -1 to avoid accidental - * re-use */ - close(fd); - *self->read_fdp = -1; + /* close the read fd, since it's at EOF */ + close_read_fd(self); } static void @@ -384,7 +470,7 @@ pull_and_push(XferElementGlue *self) } if (elt->cancelled && elt->expect_eof) - xfer_element_drain_by_pulling(elt->upstream); + xfer_element_drain_buffers(elt->upstream); if (!eof_sent) xfer_element_push_buffer(elt->downstream, NULL, 0); @@ -587,16 +673,16 @@ setup_impl( case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD): make_pipe(self); - elt->input_fd = self->pipe[1]; + g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1); self->pipe[1] = -1; /* upstream will close this for us */ - elt->output_fd = self->pipe[0]; + g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1); self->pipe[0] = -1; /* downstream will close this for us */ break; case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER): /* thread will read from pipe and call downstream's push_buffer */ make_pipe(self); - elt->input_fd = self->pipe[1]; + g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1); self->pipe[1] = -1; /* upstream will close this for us */ self->read_fdp = &self->pipe[0]; self->need_thread = TRUE; @@ -604,7 +690,7 @@ setup_impl( case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER): make_pipe(self); - elt->input_fd = self->pipe[1]; + g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1); self->pipe[1] = -1; /* upstream will close this for us */ self->on_pull = PULL_FROM_FD; self->read_fdp = &self->pipe[0]; @@ -613,7 +699,7 @@ setup_impl( case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN): /* thread will connect for output, then read from pipe and write to socket */ make_pipe(self); - elt->input_fd = self->pipe[1]; + g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1); self->pipe[1] = -1; /* upstream will close this for us */ self->read_fdp = &self->pipe[0]; self->need_thread = TRUE; @@ -622,7 +708,7 @@ setup_impl( case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT): /* thread will accept output conn, then read from pipe and write to socket */ make_pipe(self); - elt->input_fd = self->pipe[1]; + g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1); self->pipe[1] = -1; /* upstream will close this for us */ self->read_fdp = &self->pipe[0]; self->need_thread = TRUE; @@ -631,7 +717,7 @@ setup_impl( case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD): make_pipe(self); - elt->output_fd = self->pipe[0]; + g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1); self->pipe[0] = -1; /* downstream will close this for us */ self->on_push = PUSH_TO_FD; self->write_fdp = &self->pipe[1]; @@ -662,7 +748,7 @@ setup_impl( case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD): /* thread will pull from upstream and write to pipe */ make_pipe(self); - elt->output_fd = self->pipe[0]; + g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1); self->pipe[0] = -1; /* downstream will close this for us */ self->write_fdp = &self->pipe[1]; self->need_thread = TRUE; @@ -693,7 +779,7 @@ setup_impl( case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD): /* thread will accept for input, then read from socket and write to pipe */ make_pipe(self); - elt->output_fd = self->pipe[0]; + g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1); self->pipe[0] = -1; /* downstream will close this for us */ self->write_fdp = &self->pipe[1]; self->need_thread = TRUE; @@ -729,7 +815,7 @@ setup_impl( case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD): /* thread will connect for input, then read from socket and write to pipe */ make_pipe(self); - elt->output_fd = self->pipe[0]; + g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1); self->pipe[0] = -1; /* downstream will close this for us */ self->write_fdp = &self->pipe[1]; self->need_thread = TRUE; @@ -789,14 +875,8 @@ start_impl( { XferElementGlue *self = (XferElementGlue *)elt; - /* upstream and downstream are now set, so we can point our fdp's to them */ - if (self->write_fdp == &neighboring_element_fd) - self->write_fdp = &elt->downstream->input_fd; - if (self->read_fdp == &neighboring_element_fd) - self->read_fdp = &elt->upstream->output_fd; - if (self->need_thread) - self->thread = g_thread_create(worker_thread, (gpointer)self, FALSE, NULL); + self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL); /* we're active if we have a thread that will eventually die */ return self->need_thread; @@ -878,21 +958,26 @@ pull_buffer_impl( } case PULL_FROM_FD: { - int fd = *self->read_fdp; - char *buf = g_malloc(GLUE_BUFFER_SIZE); + int fd = get_read_fd(self); + char *buf; ssize_t len; - if (elt->cancelled) { - if (elt->expect_eof) - xfer_element_drain_by_reading(fd); + /* if the fd is already closed, it's possible upstream bailed out + * so quickly that we didn't even get a look at the fd */ + if (elt->cancelled || fd == -1) { + if (fd != -1) { + if (elt->expect_eof) + xfer_element_drain_fd(fd); - close(fd); - *self->read_fdp = -1; + close_read_fd(self); + } *size = 0; return NULL; } + buf = g_malloc(GLUE_BUFFER_SIZE); + /* read from upstream */ len = full_read(fd, buf, GLUE_BUFFER_SIZE); if (len < GLUE_BUFFER_SIZE) { @@ -909,10 +994,9 @@ pull_buffer_impl( /* and finish off the upstream */ if (elt->expect_eof) { - xfer_element_drain_by_reading(fd); + xfer_element_drain_fd(fd); } - close(fd); - *self->read_fdp = -1; + close_read_fd(self); } else if (len == 0) { /* EOF */ g_free(buf); @@ -920,8 +1004,7 @@ pull_buffer_impl( *size = 0; /* signal EOF to downstream */ - close(fd); - *self->read_fdp = -1; + close_read_fd(self); } } @@ -1007,12 +1090,18 @@ push_buffer_impl( return; case PUSH_TO_FD: { - int fd = *self->write_fdp; + int fd = get_write_fd(self); + + /* if the fd is already closed, it's possible upstream bailed out + * so quickly that we didn't even get a look at the fd. In this + * case we can assume the xfer has been cancelled and just discard + * the data. */ + if (fd == -1) + return; if (elt->cancelled) { if (!elt->expect_eof || !buf) { - close(fd); - *self->write_fdp = -1; + close_write_fd(self); /* hack to ensure we won't close the fd again, if we get another push */ elt->expect_eof = TRUE; @@ -1035,8 +1124,7 @@ push_buffer_impl( } amfree(buf); } else { - close(fd); - *self->write_fdp = -1; + close_write_fd(self); } return; @@ -1060,6 +1148,8 @@ instance_init( self->output_listen_socket = -1; self->input_data_socket = -1; self->output_data_socket = -1; + self->read_fd = -1; + self->write_fd = -1; } static void @@ -1068,13 +1158,19 @@ finalize_impl( { XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self); - /* close our pipes if they're still open (they shouldn't be!) */ + /* first make sure the worker thread has finished up */ + if (self->thread) + g_thread_join(self->thread); + + /* close our pipes and fd's if they're still open */ if (self->pipe[0] != -1) close(self->pipe[0]); if (self->pipe[1] != -1) close(self->pipe[1]); - if (self->input_listen_socket != -1) close(self->input_listen_socket); - if (self->output_listen_socket != -1) close(self->output_listen_socket); if (self->input_data_socket != -1) close(self->input_data_socket); if (self->output_data_socket != -1) close(self->output_data_socket); + if (self->input_listen_socket != -1) close(self->input_listen_socket); + if (self->output_listen_socket != -1) close(self->output_listen_socket); + if (self->read_fd != -1) close(self->read_fd); + if (self->write_fd != -1) close(self->write_fd); if (self->ring) { /* empty the ring buffer, ignoring syncronization issues */ @@ -1094,44 +1190,44 @@ finalize_impl( } static xfer_element_mech_pair_t _pairs[] = { - { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */ - { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */ - { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */ - { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */ - { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */ - - { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */ - { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/ - { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */ - { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* pipe + splice or copy*/ - { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy + pipe */ - - { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */ - { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */ - { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */ - { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 0 }, /* write on demand */ - { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 0 }, /* write on demand */ - - { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */ - { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */ - { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */ - { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 1 }, /* call and write */ - { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 1 }, /* call and write */ - - { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */ - { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */ - { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */ - { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */ - { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */ - - { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */ - { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy + pipe */ - { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */ - { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */ - { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */ + { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */ + { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */ + { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */ + { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */ + { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */ + + { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */ + { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/ + { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */ + { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/ + { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */ + + { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */ + { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */ + { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */ + { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */ + { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */ + + { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */ + { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */ + { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */ + { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */ + { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */ + + { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */ + { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */ + { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */ + { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */ + { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */ + + { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */ + { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */ + { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */ + { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */ + { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */ /* terminator */ - { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) }, }; xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;