X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fxfer-source-recovery.c;h=112301cd462c00597b61469f55d8540a0729b411;hb=HEAD;hp=da2813473ebbdc9f3bb3efeae70c3b141f71b8ac;hpb=b116e9366c7b2ea2c2eb53b0a13df4090e176235;p=debian%2Famanda diff --git a/device-src/xfer-source-recovery.c b/device-src/xfer-source-recovery.c index da28134..112301c 100644 --- a/device-src/xfer-source-recovery.c +++ b/device-src/xfer-source-recovery.c @@ -1,10 +1,11 @@ /* * Amanda, The Advanced Maryland Automatic Network Disk Archiver - * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved. + * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved. * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 as published - * by the Free Software Foundation. + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY @@ -55,7 +56,7 @@ typedef struct XferSourceRecovery { GThread *thread; /* this mutex in this condition variable governs all variables below */ - GCond *start_part_cond; + GCond *start_part_cond; GMutex *start_part_mutex; /* is this device currently paused and awaiting a new part? */ @@ -76,12 +77,19 @@ typedef struct XferSourceRecovery { * part) */ size_t block_size; + /* bytes read for this image */ + guint64 bytes_read; + /* part size (potentially including any zero-padding from the * device) */ guint64 part_size; /* timer for the duration; NULL while paused or cancelled */ GTimer *part_timer; + + gint64 size; + + GCond *abort_cond; /* condition to trigger to abort ndmp command */ } XferSourceRecovery; /* @@ -113,7 +121,7 @@ _xsr_dbg(const char *fmt, ...) arglist_start(argp, fmt); g_vsnprintf(msg, sizeof(msg), fmt, argp); arglist_end(argp); - g_debug("XSR thd-%p: %s", g_thread_self(), msg); + g_debug("XSR: %s", msg); } /* @@ -130,6 +138,7 @@ directtcp_common_thread( { XferElement *elt = XFER_ELEMENT(self); char *errmsg = NULL; + int result; /* send XMSG_READY to indicate it's OK to call start_part now */ DBG(2, "sending XMSG_READY"); @@ -160,11 +169,17 @@ directtcp_common_thread( while (1) { DBG(2, "reading part from %s", self->device->device_name); - if (!device_read_to_connection(self->device, G_MAXUINT64, &actual_size)) { + result = device_read_to_connection(self->device, G_MAXUINT64, + &actual_size, &elt->cancelled, + self->start_part_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { xfer_cancel_with_error(elt, _("error reading from device: %s"), device_error_or_status(self->device)); g_mutex_unlock(self->start_part_mutex); goto close_conn_and_send_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + goto close_conn_and_send_done; } /* break on EOF; otherwise do another read_to_connection */ @@ -218,6 +233,7 @@ directtcp_connect_thread( { XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data); XferElement *elt = XFER_ELEMENT(self); + int result; DBG(1, "(this is directtcp_connect_thread)") @@ -236,13 +252,18 @@ directtcp_connect_thread( g_assert(self->listen_ok); DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name); - if (!device_accept(self->device, &self->conn, NULL, NULL)) { + result = device_accept(self->device, &self->conn, &elt->cancelled, + self->start_part_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { xfer_cancel_with_error(elt, _("error accepting DirectTCP connection: %s"), device_error_or_status(self->device)); g_mutex_unlock(self->start_part_mutex); wait_until_xfer_cancelled(elt->xfer); goto send_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + goto send_done; } DBG(2, "DirectTCP connection accepted"); @@ -259,6 +280,7 @@ directtcp_listen_thread( { XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data); XferElement *elt = XFER_ELEMENT(self); + int result; DBG(1, "(this is directtcp_listen_thread)"); @@ -276,14 +298,21 @@ directtcp_listen_thread( g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */ DBG(2, "making DirectTCP connection on device %s", self->device->device_name); - if (!device_connect(self->device, FALSE, elt->downstream->input_listen_addrs, - &self->conn, NULL, NULL)) { + result = device_connect(self->device, FALSE, + elt->downstream->input_listen_addrs, + &self->conn, &elt->cancelled, + self->start_part_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { xfer_cancel_with_error(elt, _("error making DirectTCP connection: %s"), device_error_or_status(self->device)); g_mutex_unlock(self->start_part_mutex); wait_until_xfer_cancelled(elt->xfer); goto send_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + wait_until_xfer_cancelled(elt->xfer); + goto send_done; } DBG(2, "DirectTCP connect succeeded"); @@ -405,8 +434,9 @@ pull_buffer_impl( _("error reading from %s: %s"), self->device->device_name, device_error_or_status(self->device)); + g_mutex_unlock(self->start_part_mutex); wait_until_xfer_cancelled(elt->xfer); - goto error; + goto error_unlocked; } /* the device has signalled EOF (really end-of-part), so clean up instance @@ -423,6 +453,7 @@ pull_buffer_impl( self->paused = TRUE; g_object_unref(self->device); self->device = NULL; + self->bytes_read += self->part_size; self->part_size = 0; self->block_size = 0; if (self->part_timer) { @@ -438,9 +469,30 @@ pull_buffer_impl( g_mutex_unlock(self->start_part_mutex); + if (elt->size > 0) { + /* initialize on first pass */ + if (self->size == 0) + self->size = elt->size; + + if (self->size == -1) { + *size = 0; + amfree(buf); + return NULL; + } + + if (*size > (guint64)self->size) { + /* return only self->size bytes */ + *size = self->size; + self->size = -1; + } else { + self->size -= *size; + } + } + return buf; error: g_mutex_unlock(self->start_part_mutex); +error_unlocked: *size = 0; return NULL; } @@ -456,6 +508,7 @@ cancel_impl( /* trigger the condition variable, in case the thread is waiting on it */ g_mutex_lock(self->start_part_mutex); g_cond_broadcast(self->start_part_cond); + g_cond_broadcast(self->abort_cond); g_mutex_unlock(self->start_part_mutex); return TRUE; @@ -572,6 +625,7 @@ finalize_impl( g_object_unref(self->device); g_cond_free(self->start_part_cond); + g_cond_free(self->abort_cond); g_mutex_free(self->start_part_mutex); } @@ -583,6 +637,7 @@ instance_init( self->paused = TRUE; self->start_part_cond = g_cond_new(); + self->abort_cond = g_cond_new(); self->start_part_mutex = g_mutex_new(); } @@ -676,3 +731,17 @@ xfer_source_recovery_use_device( klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt); klass->use_device(XFER_SOURCE_RECOVERY(elt), device); } + +guint64 +xfer_source_recovery_get_bytes_read( + XferElement *elt) +{ + XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt); + guint64 bytes_read = self->bytes_read; + + if (self->device) + bytes_read += device_get_bytes_read(self->device); + + return bytes_read; +} +