X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fxfer-source-recovery.c;fp=device-src%2Fxfer-source-recovery.c;h=112301cd462c00597b61469f55d8540a0729b411;hb=d28952249e392eb31bc8eecc53f6c477f30c617b;hp=f305f0cb7f1e23fd91b2a5f99a6416d78563bcf1;hpb=949b8910a5e23c4285d0b1aedacfc82a14dc97a5;p=debian%2Famanda diff --git a/device-src/xfer-source-recovery.c b/device-src/xfer-source-recovery.c index f305f0c..112301c 100644 --- a/device-src/xfer-source-recovery.c +++ b/device-src/xfer-source-recovery.c @@ -2,9 +2,10 @@ * Amanda, The Advanced Maryland Automatic Network Disk Archiver * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved. * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 as published - * by the Free Software Foundation. + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY @@ -55,7 +56,7 @@ typedef struct XferSourceRecovery { GThread *thread; /* this mutex in this condition variable governs all variables below */ - GCond *start_part_cond; + GCond *start_part_cond; GMutex *start_part_mutex; /* is this device currently paused and awaiting a new part? */ @@ -87,6 +88,8 @@ typedef struct XferSourceRecovery { GTimer *part_timer; gint64 size; + + GCond *abort_cond; /* condition to trigger to abort ndmp command */ } XferSourceRecovery; /* @@ -135,6 +138,7 @@ directtcp_common_thread( { XferElement *elt = XFER_ELEMENT(self); char *errmsg = NULL; + int result; /* send XMSG_READY to indicate it's OK to call start_part now */ DBG(2, "sending XMSG_READY"); @@ -165,11 +169,17 @@ directtcp_common_thread( while (1) { DBG(2, "reading part from %s", self->device->device_name); - if (!device_read_to_connection(self->device, G_MAXUINT64, &actual_size)) { + result = device_read_to_connection(self->device, G_MAXUINT64, + &actual_size, &elt->cancelled, + self->start_part_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { xfer_cancel_with_error(elt, _("error reading from device: %s"), device_error_or_status(self->device)); g_mutex_unlock(self->start_part_mutex); goto close_conn_and_send_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + goto close_conn_and_send_done; } /* break on EOF; otherwise do another read_to_connection */ @@ -223,6 +233,7 @@ directtcp_connect_thread( { XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data); XferElement *elt = XFER_ELEMENT(self); + int result; DBG(1, "(this is directtcp_connect_thread)") @@ -241,13 +252,18 @@ directtcp_connect_thread( g_assert(self->listen_ok); DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name); - if (!device_accept(self->device, &self->conn, NULL, NULL)) { + result = device_accept(self->device, &self->conn, &elt->cancelled, + self->start_part_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { xfer_cancel_with_error(elt, _("error accepting DirectTCP connection: %s"), device_error_or_status(self->device)); g_mutex_unlock(self->start_part_mutex); wait_until_xfer_cancelled(elt->xfer); goto send_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + goto send_done; } DBG(2, "DirectTCP connection accepted"); @@ -264,6 +280,7 @@ directtcp_listen_thread( { XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data); XferElement *elt = XFER_ELEMENT(self); + int result; DBG(1, "(this is directtcp_listen_thread)"); @@ -281,14 +298,21 @@ directtcp_listen_thread( g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */ DBG(2, "making DirectTCP connection on device %s", self->device->device_name); - if (!device_connect(self->device, FALSE, elt->downstream->input_listen_addrs, - &self->conn, NULL, NULL)) { + result = device_connect(self->device, FALSE, + elt->downstream->input_listen_addrs, + &self->conn, &elt->cancelled, + self->start_part_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { xfer_cancel_with_error(elt, _("error making DirectTCP connection: %s"), device_error_or_status(self->device)); g_mutex_unlock(self->start_part_mutex); wait_until_xfer_cancelled(elt->xfer); goto send_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + wait_until_xfer_cancelled(elt->xfer); + goto send_done; } DBG(2, "DirectTCP connect succeeded"); @@ -484,6 +508,7 @@ cancel_impl( /* trigger the condition variable, in case the thread is waiting on it */ g_mutex_lock(self->start_part_mutex); g_cond_broadcast(self->start_part_cond); + g_cond_broadcast(self->abort_cond); g_mutex_unlock(self->start_part_mutex); return TRUE; @@ -600,6 +625,7 @@ finalize_impl( g_object_unref(self->device); g_cond_free(self->start_part_cond); + g_cond_free(self->abort_cond); g_mutex_free(self->start_part_mutex); } @@ -611,6 +637,7 @@ instance_init( self->paused = TRUE; self->start_part_cond = g_cond_new(); + self->abort_cond = g_cond_new(); self->start_part_mutex = g_mutex_new(); }