X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fxfer-source-recovery.c;h=112301cd462c00597b61469f55d8540a0729b411;hb=d28952249e392eb31bc8eecc53f6c477f30c617b;hp=16673e0d1932abecda5f1ba130320e81bf797b81;hpb=c6f0a88c567f8536c498f554285aed1f8150da18;p=debian%2Famanda diff --git a/device-src/xfer-source-recovery.c b/device-src/xfer-source-recovery.c index 16673e0..112301c 100644 --- a/device-src/xfer-source-recovery.c +++ b/device-src/xfer-source-recovery.c @@ -1,10 +1,11 @@ /* * Amanda, The Advanced Maryland Automatic Network Disk Archiver - * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved. + * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved. * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 as published - * by the Free Software Foundation. + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY @@ -55,7 +56,7 @@ typedef struct XferSourceRecovery { GThread *thread; /* this mutex in this condition variable governs all variables below */ - GCond *start_part_cond; + GCond *start_part_cond; GMutex *start_part_mutex; /* is this device currently paused and awaiting a new part? */ @@ -76,6 +77,9 @@ typedef struct XferSourceRecovery { * part) */ size_t block_size; + /* bytes read for this image */ + guint64 bytes_read; + /* part size (potentially including any zero-padding from the * device) */ guint64 part_size; @@ -84,6 +88,8 @@ typedef struct XferSourceRecovery { GTimer *part_timer; gint64 size; + + GCond *abort_cond; /* condition to trigger to abort ndmp command */ } XferSourceRecovery; /* @@ -132,6 +138,7 @@ directtcp_common_thread( { XferElement *elt = XFER_ELEMENT(self); char *errmsg = NULL; + int result; /* send XMSG_READY to indicate it's OK to call start_part now */ DBG(2, "sending XMSG_READY"); @@ -162,11 +169,17 @@ directtcp_common_thread( while (1) { DBG(2, "reading part from %s", self->device->device_name); - if (!device_read_to_connection(self->device, G_MAXUINT64, &actual_size)) { + result = device_read_to_connection(self->device, G_MAXUINT64, + &actual_size, &elt->cancelled, + self->start_part_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { xfer_cancel_with_error(elt, _("error reading from device: %s"), device_error_or_status(self->device)); g_mutex_unlock(self->start_part_mutex); goto close_conn_and_send_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + goto close_conn_and_send_done; } /* break on EOF; otherwise do another read_to_connection */ @@ -220,6 +233,7 @@ directtcp_connect_thread( { XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data); XferElement *elt = XFER_ELEMENT(self); + int result; DBG(1, "(this is directtcp_connect_thread)") @@ -238,13 +252,18 @@ directtcp_connect_thread( g_assert(self->listen_ok); DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name); - if (!device_accept(self->device, &self->conn, NULL, NULL)) { + result = device_accept(self->device, &self->conn, &elt->cancelled, + self->start_part_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { xfer_cancel_with_error(elt, _("error accepting DirectTCP connection: %s"), device_error_or_status(self->device)); g_mutex_unlock(self->start_part_mutex); wait_until_xfer_cancelled(elt->xfer); goto send_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + goto send_done; } DBG(2, "DirectTCP connection accepted"); @@ -261,6 +280,7 @@ directtcp_listen_thread( { XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data); XferElement *elt = XFER_ELEMENT(self); + int result; DBG(1, "(this is directtcp_listen_thread)"); @@ -278,14 +298,21 @@ directtcp_listen_thread( g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */ DBG(2, "making DirectTCP connection on device %s", self->device->device_name); - if (!device_connect(self->device, FALSE, elt->downstream->input_listen_addrs, - &self->conn, NULL, NULL)) { + result = device_connect(self->device, FALSE, + elt->downstream->input_listen_addrs, + &self->conn, &elt->cancelled, + self->start_part_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { xfer_cancel_with_error(elt, _("error making DirectTCP connection: %s"), device_error_or_status(self->device)); g_mutex_unlock(self->start_part_mutex); wait_until_xfer_cancelled(elt->xfer); goto send_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + wait_until_xfer_cancelled(elt->xfer); + goto send_done; } DBG(2, "DirectTCP connect succeeded"); @@ -426,6 +453,7 @@ pull_buffer_impl( self->paused = TRUE; g_object_unref(self->device); self->device = NULL; + self->bytes_read += self->part_size; self->part_size = 0; self->block_size = 0; if (self->part_timer) { @@ -480,6 +508,7 @@ cancel_impl( /* trigger the condition variable, in case the thread is waiting on it */ g_mutex_lock(self->start_part_mutex); g_cond_broadcast(self->start_part_cond); + g_cond_broadcast(self->abort_cond); g_mutex_unlock(self->start_part_mutex); return TRUE; @@ -596,6 +625,7 @@ finalize_impl( g_object_unref(self->device); g_cond_free(self->start_part_cond); + g_cond_free(self->abort_cond); g_mutex_free(self->start_part_mutex); } @@ -607,6 +637,7 @@ instance_init( self->paused = TRUE; self->start_part_cond = g_cond_new(); + self->abort_cond = g_cond_new(); self->start_part_mutex = g_mutex_new(); } @@ -700,3 +731,17 @@ xfer_source_recovery_use_device( klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt); klass->use_device(XFER_SOURCE_RECOVERY(elt), device); } + +guint64 +xfer_source_recovery_get_bytes_read( + XferElement *elt) +{ + XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt); + guint64 bytes_read = self->bytes_read; + + if (self->device) + bytes_read += device_get_bytes_read(self->device); + + return bytes_read; +} +