Imported Upstream version 3.2.0
[debian/amanda] / xfer-src / element-glue.c
index 29e639ad5aac16ff89bf6421b300a5138c1cd9e4..271369c516b598dbc29421e3c3a97bca196ad9d6 100644 (file)
  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
  */
 
+#include "amanda.h"
 #include "amxfer.h"
 #include "element-glue.h"
-#include "amanda.h"
 #include "directtcp.h"
+#include "util.h"
 #include "sockaddr-util.h"
 
 /*
@@ -35,7 +36,7 @@ typedef struct XferElementGlue_ {
     /* instructions to push_buffer_impl */
     enum {
        PUSH_TO_RING_BUFFER,
-       PUSH_TO_FD, /* write to *write_fdp */
+       PUSH_TO_FD, /* write to write_fd */
        PUSH_INVALID,
 
        PUSH_ACCEPT_FIRST = (1 << 16),
@@ -45,7 +46,7 @@ typedef struct XferElementGlue_ {
     /* instructions to pull_buffer_impl */
     enum {
        PULL_FROM_RING_BUFFER,
-       PULL_FROM_FD, /* read from *read_fdp */
+       PULL_FROM_FD, /* read from read_fd */
        PULL_INVALID,
 
        PULL_ACCEPT_FIRST = (1 << 16),
@@ -62,6 +63,7 @@ typedef struct XferElementGlue_ {
     int pipe[2];
     int input_listen_socket, output_listen_socket;
     int input_data_socket, output_data_socket;
+    int read_fd, write_fd;
 
     /* a ring buffer of ptr/size pairs with semaphores */
     struct { gpointer buf; size_t size; } *ring;
@@ -138,6 +140,13 @@ do_directtcp_listen(
     return TRUE;
 }
 
+static gboolean
+prolong_accept(
+    gpointer data)
+{
+    return !XFER_ELEMENT(data)->cancelled;
+}
+
 static int
 do_directtcp_accept(
     XferElementGlue *self,
@@ -146,7 +155,13 @@ do_directtcp_accept(
     int sock;
     g_assert(*socketp != -1);
 
-    if ((sock = accept(*socketp, NULL, NULL)) == -1) {
+    if ((sock = interruptible_accept(*socketp, NULL, NULL,
+                                    prolong_accept, self)) == -1) {
+       /* if the accept was interrupted due to a cancellation, then do not
+        * add a further error message */
+       if (errno == 0 && XFER_ELEMENT(self)->cancelled)
+           return -1;
+
        xfer_cancel_with_error(XFER_ELEMENT(self),
            _("Error accepting incoming connection: %s"), strerror(errno));
        wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
@@ -211,11 +226,68 @@ cancel_wait:
 
 #define mech_pair(IN,OUT) ((IN)*XFER_MECH_MAX+(OUT))
 
-/* if self->read_fdp or self->write_fdp are pointing to this integer, then
- * they should be redirected to point to the upstream's output_fd or
- * downstream's input_fd, respectively, at start() */
+/*
+ * fd handling
+ */
+
+/* if self->read_fdp or self->write_fdp are pointing to this integer, then they
+ * should be redirected to point to the upstream's output_fd or downstream's
+ * input_fd, respectively, at the first call to get_read_fd or get_write_fd,
+ * respectively. */
 static int neighboring_element_fd = -1;
 
+#define get_read_fd(self) (((self)->read_fd == -1)? _get_read_fd((self)) : (self)->read_fd)
+static int
+_get_read_fd(XferElementGlue *self)
+{
+    if (!self->read_fdp)
+       return -1; /* shouldn't happen.. */
+
+    if (self->read_fdp == &neighboring_element_fd) {
+       XferElement *elt = XFER_ELEMENT(self);
+       self->read_fd = xfer_element_swap_output_fd(elt->upstream, -1);
+    } else {
+       self->read_fd = *self->read_fdp;
+       *self->read_fdp = -1;
+    }
+    self->read_fdp = NULL;
+    return self->read_fd;
+}
+
+#define get_write_fd(self) (((self)->write_fd == -1)? _get_write_fd((self)) : (self)->write_fd)
+static int
+_get_write_fd(XferElementGlue *self)
+{
+    if (!self->write_fdp)
+       return -1; /* shouldn't happen.. */
+
+    if (self->write_fdp == &neighboring_element_fd) {
+       XferElement *elt = XFER_ELEMENT(self);
+       self->write_fd = xfer_element_swap_input_fd(elt->downstream, -1);
+    } else {
+       self->write_fd = *self->write_fdp;
+       *self->write_fdp = -1;
+    }
+    self->write_fdp = NULL;
+    return self->write_fd;
+}
+
+static int
+close_read_fd(XferElementGlue *self)
+{
+    int fd = get_read_fd(self);
+    self->read_fd = -1;
+    return close(fd);
+}
+
+static int
+close_write_fd(XferElementGlue *self)
+{
+    int fd = get_write_fd(self);
+    self->write_fd = -1;
+    return close(fd);
+}
+
 /*
  * Worker thread utility functions
  */
@@ -224,7 +296,8 @@ static void
 pull_and_write(XferElementGlue *self)
 {
     XferElement *elt = XFER_ELEMENT(self);
-    int fd = *self->write_fdp;
+    int fd = get_write_fd(self);
+    self->write_fdp = NULL;
 
     while (!elt->cancelled) {
        size_t len;
@@ -254,20 +327,18 @@ pull_and_write(XferElementGlue *self)
 
     /* close the fd we've been writing, as an EOF signal to downstream, and
      * set it to -1 to avoid accidental re-use */
-    close(fd);
-    *self->write_fdp = -1;
+    close_write_fd(self);
 }
 
 static void
 read_and_write(XferElementGlue *self)
 {
     XferElement *elt = XFER_ELEMENT(self);
-    int rfd = *self->read_fdp;
-    int wfd = *self->write_fdp;
-
     /* dynamically allocate a buffer, in case this thread has
      * a limited amount of stack allocated */
     char *buf = g_malloc(GLUE_BUFFER_SIZE);
+    int rfd = get_read_fd(self);
+    int wfd = get_write_fd(self);
 
     while (!elt->cancelled) {
        size_t len;
@@ -299,19 +370,14 @@ read_and_write(XferElementGlue *self)
     }
 
     if (elt->cancelled && elt->expect_eof)
-       xfer_element_drain_by_pulling(elt->upstream);
+       xfer_element_drain_by_reading(rfd);
 
-    /* close the read fd, if it's at EOF, and set it to -1 to avoid accidental
-     * re-use */
-    if (!elt->cancelled || elt->expect_eof) {
-       close(rfd);
-       *self->read_fdp = -1;
-    }
+    /* close the read fd.  If it's not at EOF, then upstream will get EPIPE, which will hopefully
+     * kill it and complete the cancellation */
+    close_read_fd(self);
 
-    /* close the fd we've been writing, as an EOF signal to downstream, and
-     * set it to -1 to avoid accidental re-use */
-    close(wfd);
-    *self->write_fdp = -1;
+    /* close the fd we've been writing, as an EOF signal to downstream */
+    close_write_fd(self);
 
     amfree(buf);
 }
@@ -321,7 +387,7 @@ read_and_push(
     XferElementGlue *self)
 {
     XferElement *elt = XFER_ELEMENT(self);
-    int fd = *self->read_fdp;
+    int fd = get_read_fd(self);
 
     while (!elt->cancelled) {
        char *buf = g_malloc(GLUE_BUFFER_SIZE);
@@ -355,10 +421,8 @@ read_and_push(
     /* send an EOF indication downstream */
     xfer_element_push_buffer(elt->downstream, NULL, 0);
 
-    /* close the read fd, since it's at EOF, and set it to -1 to avoid accidental
-     * re-use */
-    close(fd);
-    *self->read_fdp = -1;
+    /* close the read fd, since it's at EOF */
+    close_read_fd(self);
 }
 
 static void
@@ -587,16 +651,16 @@ setup_impl(
 
     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_READFD):
        make_pipe(self);
-       elt->input_fd = self->pipe[1];
+       g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
        self->pipe[1] = -1; /* upstream will close this for us */
-       elt->output_fd = self->pipe[0];
+       g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
        self->pipe[0] = -1; /* downstream will close this for us */
        break;
 
     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER):
        /* thread will read from pipe and call downstream's push_buffer */
        make_pipe(self);
-       elt->input_fd = self->pipe[1];
+       g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
        self->pipe[1] = -1; /* upstream will close this for us */
        self->read_fdp = &self->pipe[0];
        self->need_thread = TRUE;
@@ -604,7 +668,7 @@ setup_impl(
 
     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER):
        make_pipe(self);
-       elt->input_fd = self->pipe[1];
+       g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
        self->pipe[1] = -1; /* upstream will close this for us */
        self->on_pull = PULL_FROM_FD;
        self->read_fdp = &self->pipe[0];
@@ -613,7 +677,7 @@ setup_impl(
     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_LISTEN):
        /* thread will connect for output, then read from pipe and write to socket */
        make_pipe(self);
-       elt->input_fd = self->pipe[1];
+       g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
        self->pipe[1] = -1; /* upstream will close this for us */
        self->read_fdp = &self->pipe[0];
        self->need_thread = TRUE;
@@ -622,7 +686,7 @@ setup_impl(
     case mech_pair(XFER_MECH_WRITEFD, XFER_MECH_DIRECTTCP_CONNECT):
        /* thread will accept output conn, then read from pipe and write to socket */
        make_pipe(self);
-       elt->input_fd = self->pipe[1];
+       g_assert(xfer_element_swap_input_fd(elt, self->pipe[1]) == -1);
        self->pipe[1] = -1; /* upstream will close this for us */
        self->read_fdp = &self->pipe[0];
        self->need_thread = TRUE;
@@ -631,7 +695,7 @@ setup_impl(
 
     case mech_pair(XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD):
        make_pipe(self);
-       elt->output_fd = self->pipe[0];
+       g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
        self->pipe[0] = -1; /* downstream will close this for us */
        self->on_push = PUSH_TO_FD;
        self->write_fdp = &self->pipe[1];
@@ -662,7 +726,7 @@ setup_impl(
     case mech_pair(XFER_MECH_PULL_BUFFER, XFER_MECH_READFD):
        /* thread will pull from upstream and write to pipe */
        make_pipe(self);
-       elt->output_fd = self->pipe[0];
+       g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
        self->pipe[0] = -1; /* downstream will close this for us */
        self->write_fdp = &self->pipe[1];
        self->need_thread = TRUE;
@@ -693,7 +757,7 @@ setup_impl(
     case mech_pair(XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_READFD):
        /* thread will accept for input, then read from socket and write to pipe */
        make_pipe(self);
-       elt->output_fd = self->pipe[0];
+       g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
        self->pipe[0] = -1; /* downstream will close this for us */
        self->write_fdp = &self->pipe[1];
        self->need_thread = TRUE;
@@ -729,7 +793,7 @@ setup_impl(
     case mech_pair(XFER_MECH_DIRECTTCP_CONNECT, XFER_MECH_READFD):
        /* thread will connect for input, then read from socket and write to pipe */
        make_pipe(self);
-       elt->output_fd = self->pipe[0];
+       g_assert(xfer_element_swap_output_fd(elt, self->pipe[0]) == -1);
        self->pipe[0] = -1; /* downstream will close this for us */
        self->write_fdp = &self->pipe[1];
        self->need_thread = TRUE;
@@ -789,14 +853,8 @@ start_impl(
 {
     XferElementGlue *self = (XferElementGlue *)elt;
 
-    /* upstream and downstream are now set, so we can point our fdp's to them */
-    if (self->write_fdp == &neighboring_element_fd)
-       self->write_fdp = &elt->downstream->input_fd;
-    if (self->read_fdp == &neighboring_element_fd)
-       self->read_fdp = &elt->upstream->output_fd;
-
     if (self->need_thread)
-       self->thread = g_thread_create(worker_thread, (gpointer)self, FALSE, NULL);
+       self->thread = g_thread_create(worker_thread, (gpointer)self, TRUE, NULL);
 
     /* we're active if we have a thread that will eventually die */
     return self->need_thread;
@@ -878,16 +936,19 @@ pull_buffer_impl(
        }
 
        case PULL_FROM_FD: {
-           int fd = *self->read_fdp;
+           int fd = get_read_fd(self);
            char *buf = g_malloc(GLUE_BUFFER_SIZE);
            ssize_t len;
 
-           if (elt->cancelled) {
-               if (elt->expect_eof)
-                   xfer_element_drain_by_reading(fd);
+           /* if the fd is already closed, it's possible upstream bailed out
+            * so quickly that we didn't even get a look at the fd */
+           if (elt->cancelled || fd == -1) {
+               if (fd != -1) {
+                   if (elt->expect_eof)
+                       xfer_element_drain_by_reading(fd);
 
-               close(fd);
-               *self->read_fdp = -1;
+                   close_read_fd(self);
+               }
 
                *size = 0;
                return NULL;
@@ -911,8 +972,7 @@ pull_buffer_impl(
                    if (elt->expect_eof) {
                        xfer_element_drain_by_reading(fd);
                    }
-                   close(fd);
-                   *self->read_fdp = -1;
+                   close_read_fd(self);
                } else if (len == 0) {
                    /* EOF */
                    g_free(buf);
@@ -920,8 +980,7 @@ pull_buffer_impl(
                    *size = 0;
 
                    /* signal EOF to downstream */
-                   close(fd);
-                   *self->read_fdp = -1;
+                   close_read_fd(self);
                }
            }
 
@@ -1007,12 +1066,18 @@ push_buffer_impl(
            return;
 
        case PUSH_TO_FD: {
-           int fd = *self->write_fdp;
+           int fd = get_write_fd(self);
+
+           /* if the fd is already closed, it's possible upstream bailed out
+            * so quickly that we didn't even get a look at the fd.  In this
+            * case we can assume the xfer has been cancelled and just discard
+            * the data. */
+           if (fd == -1)
+               return;
 
            if (elt->cancelled) {
                if (!elt->expect_eof || !buf) {
-                   close(fd);
-                   *self->write_fdp = -1;
+                   close_write_fd(self);
 
                    /* hack to ensure we won't close the fd again, if we get another push */
                    elt->expect_eof = TRUE;
@@ -1035,8 +1100,7 @@ push_buffer_impl(
                }
                amfree(buf);
            } else {
-               close(fd);
-               *self->write_fdp = -1;
+               close_write_fd(self);
            }
 
            return;
@@ -1060,6 +1124,8 @@ instance_init(
     self->output_listen_socket = -1;
     self->input_data_socket = -1;
     self->output_data_socket = -1;
+    self->read_fd = -1;
+    self->write_fd = -1;
 }
 
 static void
@@ -1068,13 +1134,19 @@ finalize_impl(
 {
     XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
 
-    /* close our pipes if they're still open (they shouldn't be!) */
+    /* first make sure the worker thread has finished up */
+    if (self->thread)
+       g_thread_join(self->thread);
+
+    /* close our pipes and fd's if they're still open */
     if (self->pipe[0] != -1) close(self->pipe[0]);
     if (self->pipe[1] != -1) close(self->pipe[1]);
     if (self->input_listen_socket != -1) close(self->input_listen_socket);
     if (self->output_listen_socket != -1) close(self->output_listen_socket);
     if (self->input_data_socket != -1) close(self->input_data_socket);
     if (self->output_data_socket != -1) close(self->output_data_socket);
+    if (self->read_fd != -1) close(self->read_fd);
+    if (self->write_fd != -1) close(self->write_fd);
 
     if (self->ring) {
        /* empty the ring buffer, ignoring syncronization issues */