/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2009 Zmanda, Inc. All Rights Reserved.
+ * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
* Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
*/
-#include "amxfer.h"
#include "amanda.h"
+#include "amxfer.h"
#include "device.h"
#include "property.h"
#include "xfer-device.h"
-#include "queueing.h"
-#include "device-queueing.h"
/*
* Class declaration
XferElement __parent__;
Device *device;
- size_t max_memory;
- GThread *thread;
+ gboolean cancel_at_leom;
+
+ gpointer partial;
+ gsize block_size;
+ gsize partial_length;
} XferDestDevice;
/*
* Implementation
*/
-static producer_result_t
-pull_buffer_producer(gpointer data,
- queue_buffer_t *buffer,
- size_t hint_size G_GNUC_UNUSED)
+static gboolean
+do_block(
+ XferDestDevice *self,
+ guint size,
+ gpointer data)
{
- XferDestDevice *self = (XferDestDevice *)data;
XferElement *elt = XFER_ELEMENT(self);
- gpointer buf;
- size_t size;
- if (elt->cancelled) {
- /* drain our upstream only if we're expecting an EOF */
- if (elt->expect_eof) {
- xfer_element_drain_by_pulling(XFER_ELEMENT(self)->upstream);
- }
-
- return PRODUCER_FINISHED;
+ if (!device_write_block(self->device, size, data)) {
+ xfer_cancel_with_error(elt, "%s: %s",
+ self->device->device_name, device_error_or_status(self->device));
+ wait_until_xfer_cancelled(elt->xfer);
+ return FALSE;
}
- buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size);
- if (!buf) {
- return PRODUCER_FINISHED;
+ /* check for LEOM */
+ if (self->cancel_at_leom && self->device->is_eom) {
+ xfer_cancel_with_error(elt, "%s: LEOM detected", self->device->device_name);
+ wait_until_xfer_cancelled(elt->xfer);
+ return FALSE;
}
- /* queueing recycles allocated buffers back to us, but we don't need them.. */
- amfree(buffer->data);
- buffer->data = buf;
- buffer->alloc_size = buffer->data_size = size;
- buffer->offset = 0;
-
- return PRODUCER_MORE;
+ return TRUE;
}
-static gpointer
-queueing_thread(
- gpointer data)
+static void
+push_buffer_impl(
+ XferElement *elt,
+ gpointer buf,
+ size_t len)
{
- XferDestDevice *self = (XferDestDevice *)data;
- XferElement *elt = (XferElement *)self;
- queue_result_flags result;
- GValue val;
- StreamingRequirement streaming_mode;
- size_t block_size;
- size_t max_memory;
-
- /* Get the device's parameters */
- bzero(&val, sizeof(val));
- if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
- || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
- g_warning("XferDestDevice Couldn't get streaming type for %s", self->device->device_name);
- streaming_mode = STREAMING_REQUIREMENT_REQUIRED;
- } else {
- streaming_mode = g_value_get_enum(&val);
- }
+ XferDestDevice *self = XFER_DEST_DEVICE(elt);
+ gpointer to_free = buf;
- block_size = self->device->block_size;
+ /* Handle EOF */
+ if (!buf) {
+ /* write out the partial buffer, if there's anything in it */
+ if (self->partial_length) {
+ if (!do_block(self, self->block_size, self->partial)) {
+ return;
+ }
+ self->partial_length = 0;
+ }
- max_memory = self->max_memory || DEFAULT_MAX_BUFFER_MEMORY;
+ device_finish_file(self->device);
+ return;
+ }
- /* this thread creates two other threads (consumer and producer) and
- * blocks waiting for them to finish. TODO: when taper no longer uses
- * queueing, merge the queueing functionality here */
- result =
- do_consumer_producer_queue_full(pull_buffer_producer, data,
- device_write_consumer, self->device,
- block_size, max_memory,
- streaming_mode);
+ /* set up the block buffer, now that we can depend on having a blocksize
+ * from the device */
+ if (!self->partial) {
+ self->partial = g_malloc(self->device->block_size);
+ self->block_size = self->device->block_size;
+ self->partial_length = 0;
+ }
- /* finish the file explicitly */
- if (!(self->device->status & DEVICE_STATUS_DEVICE_ERROR))
- device_finish_file(self->device);
+ /* if we already have data in the buffer, add the new data to it */
+ if (self->partial_length != 0) {
+ gsize to_copy = min(self->block_size - self->partial_length, len);
+ memmove(self->partial + self->partial_length, buf, to_copy);
+ buf = (gpointer)(to_copy + (char *)buf);
+ len -= to_copy;
+ self->partial_length += to_copy;
+ }
+
+ /* and if the buffer is now full, write the block */
+ if (self->partial_length == self->block_size) {
+ if (!do_block(self, self->block_size, self->partial)) {
+ g_free(to_free);
+ return;
+ }
+ self->partial_length = 0;
+ }
- if (result != QUEUE_SUCCESS) {
- /* note that our producer never returns an error */
-
- if ((result & QUEUE_CONSUMER_ERROR)
- && (self->device->status != DEVICE_STATUS_SUCCESS)) {
- xfer_cancel_with_error(elt, "%s: %s",
- self->device->device_name, device_error_or_status(self->device));
- wait_until_xfer_cancelled(elt->xfer);
- } else {
- xfer_cancel_with_error(elt, _("%s: internal error"),
- xfer_element_repr(elt));
- wait_until_xfer_cancelled(elt->xfer);
+ /* write any whole blocks directly from the push buffer */
+ while (len >= self->block_size) {
+ if (!do_block(self, self->block_size, buf)) {
+ g_free(to_free);
+ return;
}
- /* and drain our upstream, since the queueing loop is done */
- if (elt->expect_eof)
- xfer_element_drain_by_pulling(elt->upstream);
+ buf = (gpointer)(self->block_size + (char *)buf);
+ len -= self->block_size;
}
- xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+ /* and finally store any leftover data in the partial buffer */
+ if (len) {
+ memmove(self->partial, buf, len);
+ self->partial_length = len;
+ }
- return NULL;
+ g_free(to_free);
}
-static gboolean
-start_impl(
+static void
+instance_init(
XferElement *elt)
{
- XferDestDevice *self = (XferDestDevice *)elt;
- self->thread = g_thread_create(queueing_thread, (gpointer)self, FALSE, NULL);
- return TRUE;
+ XferDestDevice *self = XFER_DEST_DEVICE(elt);
+ self->partial = NULL;
}
static void
-instance_init(
- XferElement *elt)
+finalize_impl(
+ GObject * obj_self)
{
- elt->can_generate_eof = TRUE;
+ XferDestDevice *self = XFER_DEST_DEVICE(obj_self);
+
+ if (self->partial) {
+ g_free(self->partial);
+ }
}
static void
XferDestDeviceClass * selfc)
{
XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
+ GObjectClass *goc = (GObjectClass*) klass;
static xfer_element_mech_pair_t mech_pairs[] = {
- { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 0, 0},
+ { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 0, 0},
{ XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
};
- klass->start = start_impl;
+ klass->push_buffer = push_buffer_impl;
klass->perl_class = "Amanda::Xfer::Dest::Device";
klass->mech_pairs = mech_pairs;
+ goc->finalize = finalize_impl;
+
parent_class = g_type_class_peek_parent(selfc);
}
XferElement *
xfer_dest_device(
Device *device,
- size_t max_memory)
+ gboolean cancel_at_leom)
{
XferDestDevice *self = (XferDestDevice *)g_object_new(XFER_DEST_DEVICE_TYPE, NULL);
XferElement *elt = XFER_ELEMENT(self);
g_assert(device != NULL);
self->device = device;
- self->max_memory = max_memory;
+ self->cancel_at_leom = cancel_at_leom;
return elt;
}