X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=xfer-src%2Felement-glue.c;h=319b576080658f3639913c52adf03c90e9fc6937;hb=refs%2Ftags%2Fdebian%2F3.3.1-2;hp=ce71f113b5a581f89c9f58a2db415ca5923b5888;hpb=cb38d19aa8dc2c5d380ab2d7ad9724a5d99eee5d;p=debian%2Famanda diff --git a/xfer-src/element-glue.c b/xfer-src/element-glue.c index ce71f11..319b576 100644 --- a/xfer-src/element-glue.c +++ b/xfer-src/element-glue.c @@ -25,6 +25,7 @@ #include "directtcp.h" #include "util.h" #include "sockaddr-util.h" +#include "debug.h" /* * Instance definition @@ -67,7 +68,7 @@ typedef struct XferElementGlue_ { /* a ring buffer of ptr/size pairs with semaphores */ struct { gpointer buf; size_t size; } *ring; - semaphore_t *ring_used_sem, *ring_free_sem; + amsemaphore_t *ring_used_sem, *ring_free_sem; gint ring_head, ring_tail; GThread *thread; @@ -111,30 +112,52 @@ do_directtcp_listen( DirectTCPAddr **addrsp) { int sock; - sockaddr_union addr; + sockaddr_union data_addr; DirectTCPAddr *addrs; socklen_t len; + struct addrinfo *res; + struct addrinfo *res_addr; + sockaddr_union *addr = NULL; + + if (resolve_hostname("localhost", 0, &res, NULL) != 0) { + xfer_cancel_with_error(elt, "resolve_hostname(): %s", strerror(errno)); + return FALSE; + } + for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) { + if (res_addr->ai_family == AF_INET) { + addr = (sockaddr_union *)res_addr->ai_addr; + break; + } + } + if (!addr) { + addr = (sockaddr_union *)res->ai_addr; + } - sock = *sockp = socket(AF_INET, SOCK_STREAM, 0); + sock = *sockp = socket(SU_GET_FAMILY(addr), SOCK_STREAM, 0); if (sock < 0) { xfer_cancel_with_error(elt, "socket(): %s", strerror(errno)); return FALSE; } + len = SS_LEN(addr); + if (bind(sock, (struct sockaddr *)addr, len) != 0) { + xfer_cancel_with_error(elt, "bind(): %s", strerror(errno)); + freeaddrinfo(res); + return FALSE; + } + if (listen(sock, 1) < 0) { xfer_cancel_with_error(elt, "listen(): %s", strerror(errno)); return FALSE; } /* TODO: which addresses should this display? all ifaces? localhost? */ - len = sizeof(addr); - if (getsockname(sock, (struct sockaddr *)&addr, &len) < 0) + len = sizeof(data_addr); + if (getsockname(sock, (struct sockaddr *)&data_addr, &len) < 0) error("getsockname(): %s", strerror(errno)); - g_assert(SU_GET_FAMILY(&addr) == AF_INET); addrs = g_new0(DirectTCPAddr, 2); - addrs[0].ipv4 = ntohl(inet_addr("127.0.0.1")); /* TODO: be smarter! */ - addrs[0].port = SU_GET_PORT(&addr); + copy_sockaddr(&addrs[0], &data_addr); *addrsp = addrs; return TRUE; @@ -195,12 +218,10 @@ do_directtcp_connect( } /* set up the sockaddr -- IPv4 only */ - SU_INIT(&addr, AF_INET); - SU_SET_PORT(&addr, addrs->port); - ((struct sockaddr_in *)&addr)->sin_addr.s_addr = htonl(addrs->ipv4); + copy_sockaddr(&addr, addrs); g_debug("making data connection to %s", str_sockaddr(&addr)); - sock = socket(AF_INET, SOCK_STREAM, 0); + sock = socket(SU_GET_FAMILY(&addr), SOCK_STREAM, 0); if (sock < 0) { xfer_cancel_with_error(elt, "socket(): %s", strerror(errno)); @@ -323,7 +344,7 @@ pull_and_write(XferElementGlue *self) } if (elt->cancelled && elt->expect_eof) - xfer_element_drain_by_pulling(elt->upstream); + xfer_element_drain_buffers(elt->upstream); /* close the fd we've been writing, as an EOF signal to downstream, and * set it to -1 to avoid accidental re-use */ @@ -370,7 +391,7 @@ read_and_write(XferElementGlue *self) } if (elt->cancelled && elt->expect_eof) - xfer_element_drain_by_reading(rfd); + xfer_element_drain_fd(rfd); /* close the read fd. If it's not at EOF, then upstream will get EPIPE, which will hopefully * kill it and complete the cancellation */ @@ -405,6 +426,7 @@ read_and_push( fd, strerror(saved_errno)); wait_until_xfer_cancelled(elt->xfer); } + amfree(buf); break; } else if (len == 0) { /* we only count a zero-length read as EOF */ amfree(buf); @@ -416,7 +438,7 @@ read_and_push( } if (elt->cancelled && elt->expect_eof) - xfer_element_drain_by_reading(fd); + xfer_element_drain_fd(fd); /* send an EOF indication downstream */ xfer_element_push_buffer(elt->downstream, NULL, 0); @@ -448,7 +470,7 @@ pull_and_push(XferElementGlue *self) } if (elt->cancelled && elt->expect_eof) - xfer_element_drain_by_pulling(elt->upstream); + xfer_element_drain_buffers(elt->upstream); if (!eof_sent) xfer_element_push_buffer(elt->downstream, NULL, 0); @@ -829,8 +851,8 @@ setup_impl( /* set up ring if desired */ if (need_ring) { self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE); - self->ring_used_sem = semaphore_new_with_value(0); - self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE); + self->ring_used_sem = amsemaphore_new_with_value(0); + self->ring_free_sem = amsemaphore_new_with_value(GLUE_RING_BUFFER_SIZE); } if (need_listen_input) { @@ -922,7 +944,7 @@ pull_buffer_impl( } /* make sure there's at least one element available */ - semaphore_down(self->ring_used_sem); + amsemaphore_down(self->ring_used_sem); /* get it */ buf = self->ring[self->ring_tail].buf; @@ -930,14 +952,14 @@ pull_buffer_impl( self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE; /* and mark this element as free to be overwritten */ - semaphore_up(self->ring_free_sem); + amsemaphore_up(self->ring_free_sem); return buf; } case PULL_FROM_FD: { int fd = get_read_fd(self); - char *buf = g_malloc(GLUE_BUFFER_SIZE); + char *buf; ssize_t len; /* if the fd is already closed, it's possible upstream bailed out @@ -945,7 +967,7 @@ pull_buffer_impl( if (elt->cancelled || fd == -1) { if (fd != -1) { if (elt->expect_eof) - xfer_element_drain_by_reading(fd); + xfer_element_drain_fd(fd); close_read_fd(self); } @@ -954,6 +976,8 @@ pull_buffer_impl( return NULL; } + buf = g_malloc(GLUE_BUFFER_SIZE); + /* read from upstream */ len = full_read(fd, buf, GLUE_BUFFER_SIZE); if (len < GLUE_BUFFER_SIZE) { @@ -970,7 +994,7 @@ pull_buffer_impl( /* and finish off the upstream */ if (elt->expect_eof) { - xfer_element_drain_by_reading(fd); + xfer_element_drain_fd(fd); } close_read_fd(self); } else if (len == 0) { @@ -1053,7 +1077,7 @@ push_buffer_impl( } /* make sure there's at least one element free */ - semaphore_down(self->ring_free_sem); + amsemaphore_down(self->ring_free_sem); /* set it */ self->ring[self->ring_head].buf = buf; @@ -1061,7 +1085,7 @@ push_buffer_impl( self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE; /* and mark this element as available for reading */ - semaphore_up(self->ring_used_sem); + amsemaphore_up(self->ring_used_sem); return; @@ -1157,8 +1181,8 @@ finalize_impl( } amfree(self->ring); - semaphore_free(self->ring_used_sem); - semaphore_free(self->ring_free_sem); + amsemaphore_free(self->ring_used_sem); + amsemaphore_free(self->ring_free_sem); } /* chain up */ @@ -1166,44 +1190,44 @@ finalize_impl( } static xfer_element_mech_pair_t _pairs[] = { - { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */ - { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */ - { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */ - { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */ - { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */ - - { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */ - { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/ - { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */ - { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* pipe + splice or copy*/ - { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy + pipe */ - - { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */ - { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */ - { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */ - { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 0 }, /* write on demand */ - { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 0 }, /* write on demand */ - - { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */ - { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */ - { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */ - { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, 1, 1 }, /* call and write */ - { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, 1, 1 }, /* call and write */ - - { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */ - { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */ - { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */ - { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */ - { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, 2, 1 }, /* splice or copy */ - - { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, 2, 1 }, /* splice or copy + pipe */ - { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy + pipe */ - { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */ - { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */ - { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, 2, 1 }, /* splice or copy */ + { XFER_MECH_READFD, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */ + { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */ + { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */ + { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */ + { XFER_MECH_READFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */ + + { XFER_MECH_WRITEFD, XFER_MECH_READFD, XFER_NROPS(0), XFER_NTHREADS(0) }, /* pipe */ + { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* pipe + read and call*/ + { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* pipe + read on demand */ + { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* pipe + splice or copy*/ + { XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */ + + { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand + pipe */ + { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */ + { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, XFER_NROPS(0), XFER_NTHREADS(0) }, /* async queue */ + { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */ + { XFER_MECH_PUSH_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(0) }, /* write on demand */ + + { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write + pipe */ + { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */ + { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, XFER_NROPS(0), XFER_NTHREADS(1) }, /* call and call */ + { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */ + { XFER_MECH_PULL_BUFFER, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(1), XFER_NTHREADS(1) }, /* call and write */ + + { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */ + { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */ + { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */ + { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */ + { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_DIRECTTCP_CONNECT, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */ + + { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */ + { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_WRITEFD, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy + pipe */ + { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PUSH_BUFFER, XFER_NROPS(1), XFER_NTHREADS(1) }, /* read and call */ + { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_PULL_BUFFER, XFER_NROPS(1), XFER_NTHREADS(0) }, /* read on demand */ + { XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_DIRECTTCP_LISTEN, XFER_NROPS(2), XFER_NTHREADS(1) }, /* splice or copy */ /* terminator */ - { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, + { XFER_MECH_NONE, XFER_MECH_NONE, XFER_NROPS(0), XFER_NTHREADS(0) }, }; xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;