X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fxfer-dest-device.c;h=38e7a70808dfa25889acc4fc19486859724731ae;hb=HEAD;hp=13cb35495e12b355429c1015d7bcec616b1934a2;hpb=d5853102f67d85d8e169f9dbe973ad573306c215;p=debian%2Famanda diff --git a/device-src/xfer-dest-device.c b/device-src/xfer-dest-device.c index 13cb354..38e7a70 100644 --- a/device-src/xfer-dest-device.c +++ b/device-src/xfer-dest-device.c @@ -1,10 +1,11 @@ /* * Amanda, The Advanced Maryland Automatic Network Disk Archiver - * Copyright (c) 2009 Zmanda, Inc. All Rights Reserved. + * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved. * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 as published - * by the Free Software Foundation. + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY @@ -19,13 +20,11 @@ * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com */ -#include "amxfer.h" #include "amanda.h" +#include "amxfer.h" #include "device.h" #include "property.h" #include "xfer-device.h" -#include "queueing.h" -#include "device-queueing.h" /* * Class declaration @@ -52,9 +51,12 @@ typedef struct XferDestDevice { XferElement __parent__; Device *device; - size_t max_memory; - GThread *thread; + gboolean cancel_at_leom; + + gpointer partial; + gsize block_size; + gsize partial_length; } XferDestDevice; /* @@ -69,116 +71,117 @@ typedef struct { * Implementation */ -static producer_result_t -pull_buffer_producer(gpointer data, - queue_buffer_t *buffer, - size_t hint_size G_GNUC_UNUSED) +static gboolean +do_block( + XferDestDevice *self, + guint size, + gpointer data) { - XferDestDevice *self = (XferDestDevice *)data; XferElement *elt = XFER_ELEMENT(self); - gpointer buf; - size_t size; - if (elt->cancelled) { - /* drain our upstream only if we're expecting an EOF */ - if (elt->expect_eof) { - xfer_element_drain_by_pulling(XFER_ELEMENT(self)->upstream); - } - - return PRODUCER_FINISHED; + if (!device_write_block(self->device, size, data)) { + xfer_cancel_with_error(elt, "%s: %s", + self->device->device_name, device_error_or_status(self->device)); + wait_until_xfer_cancelled(elt->xfer); + return FALSE; } - buf = xfer_element_pull_buffer(XFER_ELEMENT(self)->upstream, &size); - if (!buf) { - return PRODUCER_FINISHED; + /* check for LEOM */ + if (self->cancel_at_leom && self->device->is_eom) { + xfer_cancel_with_error(elt, "%s: LEOM detected", self->device->device_name); + wait_until_xfer_cancelled(elt->xfer); + return FALSE; } - /* queueing recycles allocated buffers back to us, but we don't need them.. */ - amfree(buffer->data); - buffer->data = buf; - buffer->alloc_size = buffer->data_size = size; - buffer->offset = 0; - - return PRODUCER_MORE; + return TRUE; } -static gpointer -queueing_thread( - gpointer data) +static void +push_buffer_impl( + XferElement *elt, + gpointer buf, + size_t len) { - XferDestDevice *self = (XferDestDevice *)data; - XferElement *elt = (XferElement *)self; - queue_result_flags result; - GValue val; - StreamingRequirement streaming_mode; - size_t block_size; - size_t max_memory; - - /* Get the device's parameters */ - bzero(&val, sizeof(val)); - if (!device_property_get(self->device, PROPERTY_STREAMING, &val) - || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) { - g_warning("XferDestDevice Couldn't get streaming type for %s", self->device->device_name); - streaming_mode = STREAMING_REQUIREMENT_REQUIRED; - } else { - streaming_mode = g_value_get_enum(&val); - } + XferDestDevice *self = XFER_DEST_DEVICE(elt); + gpointer to_free = buf; - block_size = self->device->block_size; + /* Handle EOF */ + if (!buf) { + /* write out the partial buffer, if there's anything in it */ + if (self->partial_length) { + if (!do_block(self, self->block_size, self->partial)) { + return; + } + self->partial_length = 0; + } - max_memory = self->max_memory || DEFAULT_MAX_BUFFER_MEMORY; + device_finish_file(self->device); + return; + } - /* this thread creates two other threads (consumer and producer) and - * blocks waiting for them to finish. TODO: when taper no longer uses - * queueing, merge the queueing functionality here */ - result = - do_consumer_producer_queue_full(pull_buffer_producer, data, - device_write_consumer, self->device, - block_size, max_memory, - streaming_mode); + /* set up the block buffer, now that we can depend on having a blocksize + * from the device */ + if (!self->partial) { + self->partial = g_malloc(self->device->block_size); + self->block_size = self->device->block_size; + self->partial_length = 0; + } - /* finish the file explicitly */ - if (!(self->device->status & DEVICE_STATUS_DEVICE_ERROR)) - device_finish_file(self->device); + /* if we already have data in the buffer, add the new data to it */ + if (self->partial_length != 0) { + gsize to_copy = min(self->block_size - self->partial_length, len); + memmove(self->partial + self->partial_length, buf, to_copy); + buf = (gpointer)(to_copy + (char *)buf); + len -= to_copy; + self->partial_length += to_copy; + } + + /* and if the buffer is now full, write the block */ + if (self->partial_length == self->block_size) { + if (!do_block(self, self->block_size, self->partial)) { + g_free(to_free); + return; + } + self->partial_length = 0; + } - if (result != QUEUE_SUCCESS) { - /* note that our producer never returns an error */ - - if ((result & QUEUE_CONSUMER_ERROR) - && (self->device->status != DEVICE_STATUS_SUCCESS)) { - xfer_cancel_with_error(elt, "%s: %s", - self->device->device_name, device_error_or_status(self->device)); - wait_until_xfer_cancelled(elt->xfer); - } else { - xfer_cancel_with_error(elt, _("%s: internal error"), - xfer_element_repr(elt)); - wait_until_xfer_cancelled(elt->xfer); + /* write any whole blocks directly from the push buffer */ + while (len >= self->block_size) { + if (!do_block(self, self->block_size, buf)) { + g_free(to_free); + return; } - /* and drain our upstream, since the queueing loop is done */ - if (elt->expect_eof) - xfer_element_drain_by_pulling(elt->upstream); + buf = (gpointer)(self->block_size + (char *)buf); + len -= self->block_size; } - xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0)); + /* and finally store any leftover data in the partial buffer */ + if (len) { + memmove(self->partial, buf, len); + self->partial_length = len; + } - return NULL; + g_free(to_free); } -static gboolean -start_impl( +static void +instance_init( XferElement *elt) { - XferDestDevice *self = (XferDestDevice *)elt; - self->thread = g_thread_create(queueing_thread, (gpointer)self, FALSE, NULL); - return TRUE; + XferDestDevice *self = XFER_DEST_DEVICE(elt); + self->partial = NULL; } static void -instance_init( - XferElement *elt) +finalize_impl( + GObject * obj_self) { - elt->can_generate_eof = TRUE; + XferDestDevice *self = XFER_DEST_DEVICE(obj_self); + + if (self->partial) { + g_free(self->partial); + } } static void @@ -186,16 +189,19 @@ class_init( XferDestDeviceClass * selfc) { XferElementClass *klass = XFER_ELEMENT_CLASS(selfc); + GObjectClass *goc = (GObjectClass*) klass; static xfer_element_mech_pair_t mech_pairs[] = { - { XFER_MECH_PULL_BUFFER, XFER_MECH_NONE, 0, 0}, + { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 0, 0}, { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0}, }; - klass->start = start_impl; + klass->push_buffer = push_buffer_impl; klass->perl_class = "Amanda::Xfer::Dest::Device"; klass->mech_pairs = mech_pairs; + goc->finalize = finalize_impl; + parent_class = g_type_class_peek_parent(selfc); } @@ -228,7 +234,7 @@ xfer_dest_device_get_type (void) XferElement * xfer_dest_device( Device *device, - size_t max_memory) + gboolean cancel_at_leom) { XferDestDevice *self = (XferDestDevice *)g_object_new(XFER_DEST_DEVICE_TYPE, NULL); XferElement *elt = XFER_ELEMENT(self); @@ -236,7 +242,7 @@ xfer_dest_device( g_assert(device != NULL); self->device = device; - self->max_memory = max_memory; + self->cancel_at_leom = cancel_at_leom; return elt; }