X-Git-Url: https://git.gag.com/?p=debian%2Famanda;a=blobdiff_plain;f=xfer-src%2Felement-glue.c;h=271369c516b598dbc29421e3c3a97bca196ad9d6;hp=29e639ad5aac16ff89bf6421b300a5138c1cd9e4;hb=b116e9366c7b2ea2c2eb53b0a13df4090e176235;hpb=fd48f3e498442f0cbff5f3606c7c403d0566150e diff --git a/xfer-src/element-glue.c b/xfer-src/element-glue.c index 29e639a..271369c 100644 --- a/xfer-src/element-glue.c +++ b/xfer-src/element-glue.c @@ -19,10 +19,11 @@ * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com */ +#include "amanda.h" #include "amxfer.h" #include "element-glue.h" -#include "amanda.h" #include "directtcp.h" +#include "util.h" #include "sockaddr-util.h" /* @@ -35,7 +36,7 @@ typedef struct XferElementGlue_ { /* instructions to push_buffer_impl */ enum { PUSH_TO_RING_BUFFER, - PUSH_TO_FD, /* write to *write_fdp */ + PUSH_TO_FD, /* write to write_fd */ PUSH_INVALID, PUSH_ACCEPT_FIRST = (1 << 16), @@ -45,7 +46,7 @@ typedef struct XferElementGlue_ { /* instructions to pull_buffer_impl */ enum { PULL_FROM_RING_BUFFER, - PULL_FROM_FD, /* read from *read_fdp */ + PULL_FROM_FD, /* read from read_fd */ PULL_INVALID, PULL_ACCEPT_FIRST = (1 << 16), @@ -62,6 +63,7 @@ typedef struct XferElementGlue_ { int pipe[2]; int input_listen_socket, output_listen_socket; int input_data_socket, output_data_socket; + int read_fd, write_fd; /* a ring buffer of ptr/size pairs with semaphores */ struct { gpointer buf; size_t size; } *ring; @@ -138,6 +140,13 @@ do_directtcp_listen( return TRUE; } +static gboolean +prolong_accept( + gpointer data) +{ + return !XFER_ELEMENT(data)->cancelled; +} + static int do_directtcp_accept( XferElementGlue *self, @@ -146,7 +155,13 @@ do_directtcp_accept( int sock; g_assert(*socketp != -1); - if ((sock = accept(*socketp, NULL, NULL)) == -1) { + if ((sock = interruptible_accept(*socketp, NULL, NULL, + prolong_accept, self)) == -1) { + /* if the accept was interrupted due to a cancellation, then do not + * add a further error message */ + if (errno == 0 && XFER_ELEMENT(self)->cancelled) + return -1; + xfer_cancel_with_error(XFER_ELEMENT(self), _("Error accepting incoming connection: %s"), strerror(errno)); wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer); @@ -211,11 +226,68 @@ cancel_wait: #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT)) -/* if self->read_fdp or self->write_fdp are pointing to this integer, then - * they should be redirected to point to the upstream's output_fd or - * downstream's input_fd, respectively, at start() */ +/* + * fd handling + */ + +/* if self->read_fdp or self->write_fdp are pointing to this integer, then they + * should be redirected to point to the upstream's output_fd or downstream's + * input_fd, respectively, at the first call to get_read_fd or get_write_fd, + * respectively. */ static int neighboring_element_fd = -1; +#define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd) +static int +_get_read_fd(XferElementGlue *self) +{ + if (!self->read_fdp) + return -1; /* shouldn't happen.. */ + + if (self->read_fdp == &neighboring_element_fd) { + XferElement *elt = XFER_ELEMENT(self); + self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1); + } else { + self->read_fd = *self->read_fdp; + *self->read_fdp = -1; + } + self->read_fdp = NULL; + return self->read_fd; +} + +#define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd) +static int +_get_write_fd(XferElementGlue *self) +{ + if (!self->write_fdp) + return -1; /* shouldn't happen.. */ + + if (self->write_fdp == &neighboring_element_fd) { + XferElement *elt = XFER_ELEMENT(self); + self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1); + } else { + self->write_fd = *self->write_fdp; + *self->write_fdp = -1; + } + self->write_fdp = NULL; + return self->write_fd; +} + +static int +close_read_fd(XferElementGlue *self) +{ + int fd = get_read_fd(self); + self->read_fd = -1; + return close(fd); +} + +static int +close_write_fd(XferElementGlue *self) +{ + int fd = get_write_fd(self); + self->write_fd = -1; + return close(fd); +} + /* * Worker thread utility functions */ @@ -224,7 +296,8 @@ static void pull_and_write(XferElementGlue *self) { XferElement *elt = XFER_ELEMENT(self); - int fd = *self->write_fdp; + int fd = get_write_fd(self); + self->write_fdp = NULL; while (!elt->cancelled) { size_t len; @@ -254,20 +327,18 @@ pull_and_write(XferElementGlue *self) /* close the fd we've been writing, as an EOF signal to downstream, and * set it to -1 to avoid accidental re-use */ - close(fd); - *self->write_fdp = -1; + close_write_fd(self); } static void read_and_write(XferElementGlue *self) { XferElement *elt = XFER_ELEMENT(self); - int rfd = *self->read_fdp; - int wfd = *self->write_fdp; - /* dynamically allocate a buffer, in case this thread has * a limited amount of stack allocated */ char *buf = g_malloc(GLUE_BUFFER_SIZE); + int rfd = get_read_fd(self); + int wfd = get_write_fd(self); while (!elt->cancelled) { size_t len; @@ -299,19 +370,14 @@ read_and_write(XferElementGlue *self) } if (elt->cancelled && elt->expect_eof) - xfer_element_drain_by_pulling(elt->upstream); + xfer_element_drain_by_reading(rfd); - /* close the read fd, if it's at EOF, and set it to -1 to avoid accidental - * re-use */ - if (!elt->cancelled || elt->expect_eof) { - close(rfd); - *self->read_fdp = -1; - } + /* close the read fd. If it's not at EOF, then upstream will get EPIPE, which will hopefully + * kill it and complete the cancellation */ + close_read_fd(self); - /* close the fd we've been writing, as an EOF signal to downstream, and - * set it to -1 to avoid accidental re-use */ - close(wfd); - *self->write_fdp = -1; + /* close the fd we've been writing, as an EOF signal to downstream */ + close_write_fd(self); amfree(buf); } @@ -321,7 +387,7 @@ read_and_push( XferElementGlue *self) { XferElement *elt = XFER_ELEMENT(self); - int fd = *self->read_fdp; + int fd = get_read_fd(self); while (!elt->cancelled) { char *buf = g_malloc(GLUE_BUFFER_SIZE); @@ -355,10 +421,8 @@ read_and_push( /* send an EOF indication downstream */ xfer_element_push_buffer(elt->downstream, NULL, 0); - /* close the read fd, since it's at EOF, and set it to -1 to avoid accidental - * re-use */ - close(fd); - *self->read_fdp = -1; + /* close the read fd, since it's at EOF */ + close_read_fd(self); } static void @@ -587,16 +651,16 @@ setup_impl( case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD): make_pipe(self); - elt->input_fd = self->pipe[1]; + g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1); self->pipe[1] = -1; /* upstream will close this for us */ - elt->output_fd = self->pipe[0]; + g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1); self->pipe[0] = -1; /* downstream will close this for us */ break; case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER): /* thread will read from pipe and call downstream's push_buffer */ make_pipe(self); - elt->input_fd = self->pipe[1]; + g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1); self->pipe[1] = -1; /* upstream will close this for us */ self->read_fdp = &self->pipe[0]; self->need_thread = TRUE; @@ -604,7 +668,7 @@ setup_impl( case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER): make_pipe(self); - elt->input_fd = self->pipe[1]; + g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1); self->pipe[1] = -1; /* upstream will close this for us */ self->on_pull = PULL_FROM_FD; self->read_fdp = &self->pipe[0]; @@ -613,7 +677,7 @@ setup_impl( case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN): /* thread will connect for output, then read from pipe and write to socket */ make_pipe(self); - elt->input_fd = self->pipe[1]; + g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1); self->pipe[1] = -1; /* upstream will close this for us */ self->read_fdp = &self->pipe[0]; self->need_thread = TRUE; @@ -622,7 +686,7 @@ setup_impl( case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT): /* thread will accept output conn, then read from pipe and write to socket */ make_pipe(self); - elt->input_fd = self->pipe[1]; + g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1); self->pipe[1] = -1; /* upstream will close this for us */ self->read_fdp = &self->pipe[0]; self->need_thread = TRUE; @@ -631,7 +695,7 @@ setup_impl( case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD): make_pipe(self); - elt->output_fd = self->pipe[0]; + g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1); self->pipe[0] = -1; /* downstream will close this for us */ self->on_push = PUSH_TO_FD; self->write_fdp = &self->pipe[1]; @@ -662,7 +726,7 @@ setup_impl( case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD): /* thread will pull from upstream and write to pipe */ make_pipe(self); - elt->output_fd = self->pipe[0]; + g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1); self->pipe[0] = -1; /* downstream will close this for us */ self->write_fdp = &self->pipe[1]; self->need_thread = TRUE; @@ -693,7 +757,7 @@ setup_impl( case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD): /* thread will accept for input, then read from socket and write to pipe */ make_pipe(self); - elt->output_fd = self->pipe[0]; + g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1); self->pipe[0] = -1; /* downstream will close this for us */ self->write_fdp = &self->pipe[1]; self->need_thread = TRUE; @@ -729,7 +793,7 @@ setup_impl( case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD): /* thread will connect for input, then read from socket and write to pipe */ make_pipe(self); - elt->output_fd = self->pipe[0]; + g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1); self->pipe[0] = -1; /* downstream will close this for us */ self->write_fdp = &self->pipe[1]; self->need_thread = TRUE; @@ -789,14 +853,8 @@ start_impl( { XferElementGlue *self = (XferElementGlue *)elt; - /* upstream and downstream are now set, so we can point our fdp's to them */ - if (self->write_fdp == &neighboring_element_fd) - self->write_fdp = &elt->downstream->input_fd; - if (self->read_fdp == &neighboring_element_fd) - self->read_fdp = &elt->upstream->output_fd; - if (self->need_thread) - self->thread = g_thread_create(worker_thread, (gpointer)self, FALSE, NULL); + self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL); /* we're active if we have a thread that will eventually die */ return self->need_thread; @@ -878,16 +936,19 @@ pull_buffer_impl( } case PULL_FROM_FD: { - int fd = *self->read_fdp; + int fd = get_read_fd(self); char *buf = g_malloc(GLUE_BUFFER_SIZE); ssize_t len; - if (elt->cancelled) { - if (elt->expect_eof) - xfer_element_drain_by_reading(fd); + /* if the fd is already closed, it's possible upstream bailed out + * so quickly that we didn't even get a look at the fd */ + if (elt->cancelled || fd == -1) { + if (fd != -1) { + if (elt->expect_eof) + xfer_element_drain_by_reading(fd); - close(fd); - *self->read_fdp = -1; + close_read_fd(self); + } *size = 0; return NULL; @@ -911,8 +972,7 @@ pull_buffer_impl( if (elt->expect_eof) { xfer_element_drain_by_reading(fd); } - close(fd); - *self->read_fdp = -1; + close_read_fd(self); } else if (len == 0) { /* EOF */ g_free(buf); @@ -920,8 +980,7 @@ pull_buffer_impl( *size = 0; /* signal EOF to downstream */ - close(fd); - *self->read_fdp = -1; + close_read_fd(self); } } @@ -1007,12 +1066,18 @@ push_buffer_impl( return; case PUSH_TO_FD: { - int fd = *self->write_fdp; + int fd = get_write_fd(self); + + /* if the fd is already closed, it's possible upstream bailed out + * so quickly that we didn't even get a look at the fd. In this + * case we can assume the xfer has been cancelled and just discard + * the data. */ + if (fd == -1) + return; if (elt->cancelled) { if (!elt->expect_eof || !buf) { - close(fd); - *self->write_fdp = -1; + close_write_fd(self); /* hack to ensure we won't close the fd again, if we get another push */ elt->expect_eof = TRUE; @@ -1035,8 +1100,7 @@ push_buffer_impl( } amfree(buf); } else { - close(fd); - *self->write_fdp = -1; + close_write_fd(self); } return; @@ -1060,6 +1124,8 @@ instance_init( self->output_listen_socket = -1; self->input_data_socket = -1; self->output_data_socket = -1; + self->read_fd = -1; + self->write_fd = -1; } static void @@ -1068,13 +1134,19 @@ finalize_impl( { XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self); - /* close our pipes if they're still open (they shouldn't be!) */ + /* first make sure the worker thread has finished up */ + if (self->thread) + g_thread_join(self->thread); + + /* close our pipes and fd's if they're still open */ if (self->pipe[0] != -1) close(self->pipe[0]); if (self->pipe[1] != -1) close(self->pipe[1]); if (self->input_listen_socket != -1) close(self->input_listen_socket); if (self->output_listen_socket != -1) close(self->output_listen_socket); if (self->input_data_socket != -1) close(self->input_data_socket); if (self->output_data_socket != -1) close(self->output_data_socket); + if (self->read_fd != -1) close(self->read_fd); + if (self->write_fd != -1) close(self->write_fd); if (self->ring) { /* empty the ring buffer, ignoring syncronization issues */