Imported Upstream version 3.2.0
[debian/amanda] / device-src / xfer-dest-device.c
index 13cb35495e12b355429c1015d7bcec616b1934a2..28fd0580c2592268fbed81bedcaea07b448bf708 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2009 Zmanda, Inc.  All Rights Reserved.
+ * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 as published
  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
  */
 
-#include "amxfer.h"
 #include "amanda.h"
+#include "amxfer.h"
 #include "device.h"
 #include "property.h"
 #include "xfer-device.h"
-#include "queueing.h"
-#include "device-queueing.h"
 
 /*
  * Class declaration
@@ -52,9 +50,12 @@ typedef struct XferDestDevice {
     XferElement __parent__;
 
     Device *device;
-    size_t max_memory;
 
-    GThread *thread;
+    gboolean cancel_at_leom;
+
+    gpointer partial;
+    gsize block_size;
+    gsize partial_length;
 } XferDestDevice;
 
 /*
@@ -69,116 +70,117 @@ typedef struct {
  * Implementation
  */
 
-static producer_result_t
-pull_buffer_producer(gpointer data,
-    queue_buffer_t *buffer,
-    size_t hint_size G_GNUC_UNUSED)
+static gboolean
+do_block(
+    XferDestDevice *self,
+    guint size,
+    gpointer data)
 {
-    XferDestDevice *self = (XferDestDevice *)data;
     XferElement *elt = XFER_ELEMENT(self);
-    gpointer buf;
-    size_t size;
 
-    if (elt->cancelled) {
-       /* drain our upstream only if we're expecting an EOF */
-       if (elt->expect_eof) {
-           xfer_element_drain_by_pulling(XFER_ELEMENT(self)->upstream);
-       }
-
-       return PRODUCER_FINISHED;
+    if (!device_write_block(self->device, size, data)) {
+       xfer_cancel_with_error(elt, "%s: %s",
+               self->device->device_name, device_error_or_status(self->device));
+       wait_until_xfer_cancelled(elt->xfer);
+       return FALSE;
     }
 
-    buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size);
-    if (!buf) {
-       return PRODUCER_FINISHED;
+    /* check for LEOM */
+    if (self->cancel_at_leom && self->device->is_eom) {
+       xfer_cancel_with_error(elt, "%s: LEOM detected", self->device->device_name);
+       wait_until_xfer_cancelled(elt->xfer);
+       return FALSE;
     }
 
-    /* queueing recycles allocated buffers back to us, but we don't need them.. */
-    amfree(buffer->data);
-    buffer->data = buf;
-    buffer->alloc_size = buffer->data_size = size;
-    buffer->offset = 0;
-
-    return PRODUCER_MORE;
+    return TRUE;
 }
 
-static gpointer
-queueing_thread(
-    gpointer data)
+static void
+push_buffer_impl(
+    XferElement *elt,
+    gpointer buf,
+    size_t len)
 {
-    XferDestDevice *self = (XferDestDevice *)data;
-    XferElement *elt = (XferElement *)self;
-    queue_result_flags result;
-    GValue val;
-    StreamingRequirement streaming_mode;
-    size_t block_size;
-    size_t max_memory;
-
-    /* Get the device's parameters */
-    bzero(&val, sizeof(val));
-    if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
-       || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
-       g_warning("XferDestDevice Couldn't get streaming type for %s", self->device->device_name);
-       streaming_mode = STREAMING_REQUIREMENT_REQUIRED;
-    } else {
-       streaming_mode = g_value_get_enum(&val);
-    }
+    XferDestDevice *self = XFER_DEST_DEVICE(elt);
+    gpointer to_free = buf;
 
-    block_size = self->device->block_size;
+    /* Handle EOF */
+    if (!buf) {
+       /* write out the partial buffer, if there's anything in it */
+       if (self->partial_length) {
+           if (!do_block(self, self->block_size, self->partial)) {
+               return;
+           }
+           self->partial_length = 0;
+       }
 
-    max_memory = self->max_memory || DEFAULT_MAX_BUFFER_MEMORY;
+       device_finish_file(self->device);
+       return;
+    }
 
-    /* this thread creates two other threads (consumer and producer) and
-     * blocks waiting for them to finish.  TODO: when taper no longer uses
-     * queueing, merge the queueing functionality here */
-    result =
-        do_consumer_producer_queue_full(pull_buffer_producer, data,
-                                        device_write_consumer, self->device,
-                                        block_size, max_memory,
-                                        streaming_mode);
+    /* set up the block buffer, now that we can depend on having a blocksize
+     * from the device */
+    if (!self->partial) {
+       self->partial = g_malloc(self->device->block_size);
+       self->block_size = self->device->block_size;
+       self->partial_length = 0;
+    }
 
-    /* finish the file explicitly */
-    if (!(self->device->status & DEVICE_STATUS_DEVICE_ERROR))
-       device_finish_file(self->device);
+    /* if we already have data in the buffer, add the new data to it */
+    if (self->partial_length != 0) {
+       gsize to_copy = min(self->block_size - self->partial_length, len);
+       memmove(self->partial + self->partial_length, buf, to_copy);
+       buf = (gpointer)(to_copy + (char *)buf);
+       len -= to_copy;
+       self->partial_length += to_copy;
+    }
+
+    /* and if the buffer is now full, write the block */
+    if (self->partial_length == self->block_size) {
+       if (!do_block(self, self->block_size, self->partial)) {
+           g_free(to_free);
+           return;
+       }
+       self->partial_length = 0;
+    }
 
-    if (result != QUEUE_SUCCESS) {
-       /* note that our producer never returns an error */
-
-       if ((result & QUEUE_CONSUMER_ERROR)
-               && (self->device->status != DEVICE_STATUS_SUCCESS)) {
-           xfer_cancel_with_error(elt, "%s: %s",
-                   self->device->device_name, device_error_or_status(self->device));
-           wait_until_xfer_cancelled(elt->xfer);
-       } else {
-           xfer_cancel_with_error(elt, _("%s: internal error"),
-                   xfer_element_repr(elt));
-           wait_until_xfer_cancelled(elt->xfer);
+    /* write any whole blocks directly from the push buffer */
+    while (len >= self->block_size) {
+       if (!do_block(self, self->block_size, buf)) {
+           g_free(to_free);
+           return;
        }
 
-       /* and drain our upstream, since the queueing loop is done */
-       if (elt->expect_eof)
-           xfer_element_drain_by_pulling(elt->upstream);
+       buf = (gpointer)(self->block_size + (char *)buf);
+       len -= self->block_size;
     }
 
-    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+    /* and finally store any leftover data in the partial buffer */
+    if (len) {
+       memmove(self->partial, buf, len);
+       self->partial_length = len;
+    }
 
-    return NULL;
+    g_free(to_free);
 }
 
-static gboolean
-start_impl(
+static void
+instance_init(
     XferElement *elt)
 {
-    XferDestDevice *self = (XferDestDevice *)elt;
-    self->thread = g_thread_create(queueing_thread, (gpointer)self, FALSE, NULL);
-    return TRUE;
+    XferDestDevice *self = XFER_DEST_DEVICE(elt);
+    self->partial = NULL;
 }
 
 static void
-instance_init(
-    XferElement *elt)
+finalize_impl(
+    GObject * obj_self)
 {
-    elt->can_generate_eof = TRUE;
+    XferDestDevice *self = XFER_DEST_DEVICE(obj_self);
+
+    if (self->partial) {
+       g_free(self->partial);
+    }
 }
 
 static void
@@ -186,16 +188,19 @@ class_init(
     XferDestDeviceClass * selfc)
 {
     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
+    GObjectClass *goc = (GObjectClass*) klass;
     static xfer_element_mech_pair_t mech_pairs[] = {
-       { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 0, 0},
+       { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 0, 0},
        { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
     };
 
-    klass->start = start_impl;
+    klass->push_buffer = push_buffer_impl;
 
     klass->perl_class = "Amanda::Xfer::Dest::Device";
     klass->mech_pairs = mech_pairs;
 
+    goc->finalize = finalize_impl;
+
     parent_class = g_type_class_peek_parent(selfc);
 }
 
@@ -228,7 +233,7 @@ xfer_dest_device_get_type (void)
 XferElement *
 xfer_dest_device(
     Device *device,
-    size_t max_memory)
+    gboolean cancel_at_leom)
 {
     XferDestDevice *self = (XferDestDevice *)g_object_new(XFER_DEST_DEVICE_TYPE, NULL);
     XferElement *elt = XFER_ELEMENT(self);
@@ -236,7 +241,7 @@ xfer_dest_device(
     g_assert(device != NULL);
 
     self->device = device;
-    self->max_memory = max_memory;
+    self->cancel_at_leom = cancel_at_leom;
 
     return elt;
 }