Imported Upstream version 2.6.1
[debian/amanda] / xfer-src / element-glue.c
diff --git a/xfer-src/element-glue.c b/xfer-src/element-glue.c
new file mode 100644 (file)
index 0000000..75c7177
--- /dev/null
@@ -0,0 +1,626 @@
+/*
+ * Amanda, The Advanced Maryland Automatic Network Disk Archiver
+ * Copyright (c) 2008 Zmanda Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
+ */
+
+#include "amxfer.h"
+#include "element-glue.h"
+#include "amanda.h"
+
+static GObjectClass *parent_class = NULL;
+
+/*
+ * Utility functions, etc.
+ */
+
+static void
+make_pipe(
+    XferElementGlue *self)
+{
+    if (pipe(self->pipe) < 0)
+       g_critical(_("Could not create pipe: %s"), strerror(errno));
+}
+
+static void
+send_xfer_done(
+    XferElementGlue *self)
+{
+    xfer_queue_message(XFER_ELEMENT(self)->xfer,
+           xmsg_new((XferElement *)self, XMSG_DONE, 0));
+}
+
+#define GLUE_BUFFER_SIZE 32768
+#define GLUE_RING_BUFFER_SIZE 32
+
+/*
+ * Worker threads
+ *
+ * At most one of these runs in a given instance, as selected in setup_impl
+ */
+
+static gpointer
+call_and_write_thread(
+    gpointer data)
+{
+    XferElement *elt = XFER_ELEMENT(data);
+    XferElementGlue *self = XFER_ELEMENT_GLUE(data);
+    int *fdp = (self->pipe[1] == -1)? &elt->downstream->input_fd : &self->pipe[1];
+    int fd = *fdp;
+
+    while (!elt->cancelled) {
+       size_t len;
+       char *buf;
+
+       /* get a buffer from upstream */
+       buf = xfer_element_pull_buffer(elt->upstream, &len);
+       if (!buf)
+           break;
+
+       /* write it */
+       if (full_write(fd, buf, len) < len) {
+           xfer_element_handle_error(elt,
+               _("Error writing to fd %d: %s"), fd, strerror(errno));
+           amfree(buf);
+           break;
+       }
+
+       amfree(buf);
+    }
+
+    if (elt->cancelled && elt->expect_eof)
+       xfer_element_drain_by_pulling(elt->upstream);
+
+    /* close the fd we've been writing, as an EOF signal to downstream, and
+     * set it to -1 to avoid accidental re-use */
+    close(fd);
+    *fdp = -1;
+
+    send_xfer_done(self);
+
+    return NULL;
+}
+
+static gpointer
+read_and_write_thread(
+    gpointer data)
+{
+    XferElement *elt = XFER_ELEMENT(data);
+    XferElementGlue *self = XFER_ELEMENT_GLUE(data);
+    int rfd = elt->upstream->output_fd;
+    int wfd = elt->downstream->input_fd;
+
+    /* dynamically allocate a buffer, in case this thread has
+     * a limited amount of stack allocated */
+    char *buf = g_malloc(GLUE_BUFFER_SIZE);
+
+    while (!elt->cancelled) {
+       size_t len;
+
+       /* read from upstream */
+       len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
+       if (len < GLUE_BUFFER_SIZE) {
+           if (errno) {
+               xfer_element_handle_error(elt,
+                   _("Error reading from fd %d: %s"), rfd, strerror(errno));
+               break;
+           } else if (len == 0) { /* we only count a zero-length read as EOF */
+               break;
+           }
+       }
+
+       /* write the buffer fully */
+       if (full_write(wfd, buf, len) < len) {
+           xfer_element_handle_error(elt,
+               _("Could not write to fd %d: %s"), wfd, strerror(errno));
+           break;
+       }
+    }
+
+    if (elt->cancelled && elt->expect_eof)
+       xfer_element_drain_by_pulling(elt->upstream);
+
+    /* close the read fd, if it's at EOF, and set it to -1 to avoid accidental
+     * re-use */
+    if (!elt->cancelled || elt->expect_eof) {
+       close(rfd);
+       elt->upstream->output_fd = -1;
+    }
+
+    /* close the fd we've been writing, as an EOF signal to downstream, and
+     * set it to -1 to avoid accidental re-use */
+    close(wfd);
+    elt->downstream->input_fd = -1;
+
+    send_xfer_done(self);
+
+    amfree(buf);
+    return NULL;
+}
+
+static gpointer
+read_and_call_thread(
+    gpointer data)
+{
+    XferElement *elt = XFER_ELEMENT(data);
+    XferElementGlue *self = XFER_ELEMENT_GLUE(data);
+    int *fdp = (self->pipe[0] == -1)? &elt->upstream->output_fd : &self->pipe[0];
+    int fd = *fdp;
+
+    while (!elt->cancelled) {
+       char *buf = g_malloc(GLUE_BUFFER_SIZE);
+       size_t len;
+
+       /* read a buffer from upstream */
+       len = full_read(fd, buf, GLUE_BUFFER_SIZE);
+       if (len < GLUE_BUFFER_SIZE) {
+           if (errno) {
+               xfer_element_handle_error(elt,
+                   _("Error reading from fd %d: %s"), fd, strerror(errno));
+               break;
+           } else if (len == 0) { /* we only count a zero-length read as EOF */
+               amfree(buf);
+               break;
+           }
+       }
+
+       xfer_element_push_buffer(elt->downstream, buf, len);
+    }
+
+    if (elt->cancelled && elt->expect_eof)
+       xfer_element_drain_by_reading(fd);
+
+    /* send an EOF indication downstream */
+    xfer_element_push_buffer(elt->downstream, NULL, 0);
+
+    /* close the read fd, since it's at EOF, and set it to -1 to avoid accidental
+     * re-use */
+    close(fd);
+    *fdp = -1;
+
+    send_xfer_done(self);
+
+    return NULL;
+}
+
+static gpointer
+call_and_call_thread(
+    gpointer data)
+{
+    XferElement *elt = XFER_ELEMENT(data);
+    XferElementGlue *self = XFER_ELEMENT_GLUE(data);
+    gboolean eof_sent = FALSE;
+
+    /* TODO: consider breaking this into two cooperating threads: one to pull
+     * buffers from upstream and one to push them downstream.  This would gain
+     * parallelism at the cost of a lot of synchronization operations. */
+
+    while (!elt->cancelled) {
+       char *buf;
+       size_t len;
+
+       /* get a buffer from upstream */
+       buf = xfer_element_pull_buffer(elt->upstream, &len);
+
+       /* and push it downstream */
+       xfer_element_push_buffer(elt->downstream, buf, len);
+
+       if (!buf) {
+           eof_sent = TRUE;
+           break;
+       }
+    }
+
+    if (elt->cancelled && elt->expect_eof)
+       xfer_element_drain_by_pulling(elt->upstream);
+
+    if (!eof_sent)
+       xfer_element_push_buffer(elt->downstream, NULL, 0);
+
+    send_xfer_done(self);
+
+    return NULL;
+}
+
+/*
+ * Implementation
+ */
+
+static void
+setup_impl(
+    XferElement *elt)
+{
+    XferElementGlue *self = (XferElementGlue *)elt;
+
+    switch (elt->input_mech) {
+    case XFER_MECH_READFD:
+       switch (elt->output_mech) {
+       case XFER_MECH_READFD:
+           g_assert_not_reached(); /* no glue needed */
+           break;
+
+       case XFER_MECH_WRITEFD:
+           self->threadfunc = read_and_write_thread;
+           break;
+
+       case XFER_MECH_PUSH_BUFFER:
+           self->threadfunc = read_and_call_thread;
+           break;
+
+       case XFER_MECH_PULL_BUFFER:
+           break;
+
+       case XFER_MECH_NONE:
+           g_assert_not_reached();
+           break;
+       }
+       break;
+
+    case XFER_MECH_WRITEFD:
+       make_pipe(self);
+       elt->input_fd = self->pipe[1];
+       self->pipe[1] = -1; /* upstream will close this for us */
+
+       switch (elt->output_mech) {
+       case XFER_MECH_READFD:
+           elt->output_fd = self->pipe[0];
+           self->pipe[0] = -1; /* downstream will close this for us */
+           break;
+
+       case XFER_MECH_WRITEFD:
+           g_assert_not_reached(); /* no glue needed */
+           break;
+
+       case XFER_MECH_PUSH_BUFFER:
+           self->threadfunc = read_and_call_thread;
+           break;
+
+       case XFER_MECH_PULL_BUFFER:
+           break;
+
+       case XFER_MECH_NONE:
+           g_assert_not_reached();
+           break;
+       }
+       break;
+
+    case XFER_MECH_PUSH_BUFFER:
+       switch (elt->output_mech) {
+       case XFER_MECH_READFD:
+           make_pipe(self);
+           elt->output_fd = self->pipe[0];
+           self->pipe[0] = -1; /* downstream will close this for us */
+           break;
+
+       case XFER_MECH_WRITEFD:
+           break;
+
+       case XFER_MECH_PUSH_BUFFER:
+           g_assert_not_reached(); /* no glue needed */
+           break;
+
+       case XFER_MECH_PULL_BUFFER:
+           self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
+           self->ring_used_sem = semaphore_new_with_value(0);
+           self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
+           break;
+
+       case XFER_MECH_NONE:
+           g_assert_not_reached();
+           break;
+       }
+       break;
+
+    case XFER_MECH_PULL_BUFFER:
+       switch (elt->output_mech) {
+       case XFER_MECH_READFD:
+           make_pipe(self);
+           elt->output_fd = self->pipe[0];
+           self->pipe[0] = -1; /* downstream will close this for us */
+           self->threadfunc = call_and_write_thread;
+           break;
+
+       case XFER_MECH_WRITEFD:
+           self->threadfunc = call_and_write_thread;
+           break;
+
+       case XFER_MECH_PUSH_BUFFER:
+           self->threadfunc = call_and_call_thread;
+           break;
+
+       case XFER_MECH_PULL_BUFFER:
+           g_assert_not_reached(); /* no glue needed */
+           break;
+
+       case XFER_MECH_NONE:
+           g_assert_not_reached();
+           break;
+       }
+       break;
+
+    case XFER_MECH_NONE:
+       g_assert_not_reached();
+       break;
+    }
+}
+
+static gboolean
+start_impl(
+    XferElement *elt)
+{
+    XferElementGlue *self = (XferElementGlue *)elt;
+
+    if (self->threadfunc) {
+       self->thread = g_thread_create(self->threadfunc, (gpointer)self, FALSE, NULL);
+    }
+
+    /* we're active if we have a thread that will eventually die */
+    return self->threadfunc? TRUE : FALSE;
+}
+
+static gpointer
+pull_buffer_impl(
+    XferElement *elt,
+    size_t *size)
+{
+    XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
+
+    if (self->ring) {
+       gpointer buf;
+
+       if (elt->cancelled) {
+           /* The finalize method will empty the ring buffer */
+           *size = 0;
+           return NULL;
+       }
+
+       /* make sure there's at least one element available */
+       semaphore_down(self->ring_used_sem);
+
+       /* get it */
+       buf = self->ring[self->ring_tail].buf;
+       *size = self->ring[self->ring_tail].size;
+       self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
+
+       /* and mark this element as free to be overwritten */
+       semaphore_up(self->ring_free_sem);
+
+       return buf;
+    } else {
+       int *fdp = (self->pipe[0] == -1)? &elt->upstream->output_fd : &self->pipe[0];
+       int fd = *fdp;
+       char *buf = g_malloc(GLUE_BUFFER_SIZE);
+       ssize_t len;
+
+       if (elt->cancelled) {
+           if (elt->expect_eof)
+               xfer_element_drain_by_reading(fd);
+
+           close(fd);
+           *fdp = -1;
+
+           *size = 0;
+           return NULL;
+       }
+
+       /* read from upstream */
+       len = full_read(fd, buf, GLUE_BUFFER_SIZE);
+       if (len < GLUE_BUFFER_SIZE) {
+           if (errno) {
+               xfer_element_handle_error(elt,
+                   _("Error reading from fd %d: %s"), fd, strerror(errno));
+
+               /* return an EOF */
+               amfree(buf);
+               len = 0;
+
+               /* and finish off the upstream */
+               if (elt->expect_eof) {
+                   xfer_element_drain_by_reading(fd);
+               }
+               close(fd);
+               *fdp = -1;
+           } else if (len == 0) {
+               /* EOF */
+               g_free(buf);
+               buf = NULL;
+               *size = 0;
+
+               /* signal EOF to downstream */
+               close(fd);
+               *fdp = -1;
+           }
+       }
+
+       *size = (size_t)len;
+       return buf;
+    }
+}
+
+static void
+push_buffer_impl(
+    XferElement *elt,
+    gpointer buf,
+    size_t len)
+{
+    XferElementGlue *self = (XferElementGlue *)elt;
+
+    if (self->ring) {
+       /* just drop packets if the transfer has been cancelled */
+       if (elt->cancelled) {
+           amfree(buf);
+           return;
+       }
+
+       /* make sure there's at least one element free */
+       semaphore_down(self->ring_free_sem);
+
+       /* set it */
+       self->ring[self->ring_head].buf = buf;
+       self->ring[self->ring_head].size = len;
+       self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
+
+       /* and mark this element as available for reading */
+       semaphore_up(self->ring_used_sem);
+
+       return;
+    } else {
+       int *fdp = (self->pipe[1] == -1)? &elt->downstream->input_fd : &self->pipe[1];
+       int fd = *fdp;
+
+       if (elt->cancelled) {
+           if (!elt->expect_eof || !buf) {
+               close(fd);
+               *fdp = -1;
+
+               /* hack to ensure we won't close the fd again, if we get another push */
+               elt->expect_eof = TRUE;
+           }
+
+           amfree(buf);
+
+           return;
+       }
+
+       /* write the full buffer to the fd, or close on EOF */
+       if (buf) {
+           if (full_write(fd, buf, len) < len) {
+               xfer_element_handle_error(elt,
+                   _("Error writing to fd %d: %s"), fd, strerror(errno));
+               /* nothing special to do to handle the cancellation */
+           }
+           amfree(buf);
+       } else {
+           close(fd);
+           *fdp = -1;
+       }
+
+       return;
+    }
+}
+
+static void
+instance_init(
+    XferElementGlue *self)
+{
+    XferElement *elt = (XferElement *)self;
+    elt->can_generate_eof = TRUE;
+    self->pipe[0] = self->pipe[1] = -1;
+}
+
+static void
+finalize_impl(
+    GObject * obj_self)
+{
+    XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
+
+    /* close our pipes if they're still open (they shouldn't be!) */
+    if (self->pipe[0] != -1) close(self->pipe[0]);
+    if (self->pipe[1] != -1) close(self->pipe[1]);
+
+    if (self->ring) {
+       /* empty the ring buffer, ignoring syncronization issues */
+       while (self->ring_used_sem->value) {
+           if (self->ring[self->ring_tail].buf)
+               amfree(self->ring[self->ring_tail].buf);
+           self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
+       }
+
+       amfree(self->ring);
+       semaphore_free(self->ring_used_sem);
+       semaphore_free(self->ring_free_sem);
+    }
+
+    /* chain up */
+    G_OBJECT_CLASS(parent_class)->finalize(obj_self);
+}
+
+static xfer_element_mech_pair_t _pairs[] = {
+    { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
+    { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
+    { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
+
+    { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
+    { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
+    { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
+
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
+    { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
+
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
+    { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
+
+    /* terminator */
+    { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+};
+xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
+
+static void
+class_init(
+    XferElementGlueClass * selfc)
+{
+    XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
+    GObjectClass *goc = G_OBJECT_CLASS(selfc);
+
+    klass->setup = setup_impl;
+    klass->start = start_impl;
+    klass->push_buffer = push_buffer_impl;
+    klass->pull_buffer = pull_buffer_impl;
+
+    klass->perl_class = "Amanda::Xfer::Element::Glue";
+    klass->mech_pairs = xfer_element_glue_mech_pairs;
+
+    goc->finalize = finalize_impl;
+
+    parent_class = g_type_class_peek_parent(selfc);
+}
+
+GType
+xfer_element_glue_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferElementGlueClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferElementGlue),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) instance_init,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
+    }
+
+    return type;
+}
+
+/* create an element of this class; prototype is in xfer-element.h */
+XferElement *
+xfer_element_glue(void)
+{
+    XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
+    XferElement *elt = XFER_ELEMENT(self);
+
+    return elt;
+}