X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fxfer-dest-taper-directtcp.c;h=9257538013a1b5e991d9efa0fcb24a7ff91c1414;hb=refs%2Fheads%2Fmaster;hp=5bc492c67643ab1ccae64ef432485eb1afa298db;hpb=b116e9366c7b2ea2c2eb53b0a13df4090e176235;p=debian%2Famanda diff --git a/device-src/xfer-dest-taper-directtcp.c b/device-src/xfer-dest-taper-directtcp.c index 5bc492c..9257538 100644 --- a/device-src/xfer-dest-taper-directtcp.c +++ b/device-src/xfer-dest-taper-directtcp.c @@ -1,10 +1,11 @@ /* * Amanda, The Advanced Maryland Automatic Network Disk Archiver - * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved. + * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved. * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 as published - * by the Free Software Foundation. + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY @@ -74,6 +75,7 @@ typedef struct XferDestTaperDirectTCP { * corresponding condition variable. */ volatile gboolean paused; GCond *paused_cond; + GCond *abort_cond; /* condition to trigger to abort an NDMP command */ } XferDestTaperDirectTCP; @@ -95,7 +97,7 @@ _xdt_dbg(const char *fmt, ...) arglist_start(argp, fmt); g_vsnprintf(msg, sizeof(msg), fmt, argp); arglist_end(argp); - g_debug("XDT thd-%p: %s", g_thread_self(), msg); + g_debug("XDT: %s", msg); } /* * Worker Thread @@ -108,6 +110,7 @@ worker_thread( XferElement *elt = (XferElement *)data; XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)data; GTimer *timer = g_timer_new(); + int result; /* This thread's job is to accept() an incoming connection, then call * write_from_connection for each part, and then close the connection */ @@ -124,12 +127,19 @@ worker_thread( /* first, accept a new connection from the device */ DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name); - if (!device_accept(self->device, &self->conn, NULL, NULL)) { + result = device_accept(self->device, &self->conn, &elt->cancelled, + self->state_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { xfer_cancel_with_error(XFER_ELEMENT(self), "accepting DirectTCP connection: %s", device_error_or_status(self->device)); g_mutex_unlock(self->state_mutex); - return NULL; + wait_until_xfer_cancelled(elt->xfer); + goto send_xmsg_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->state_mutex); + wait_until_xfer_cancelled(elt->xfer); + goto send_xmsg_done; } DBG(2, "connection accepted; sending XMSG_READY"); @@ -179,14 +189,18 @@ worker_thread( /* write the part */ g_timer_start(timer); - if (!device_write_from_connection(self->device, - self->part_size, &size)) { + result = device_write_from_connection(self->device, + self->part_size, &size, &elt->cancelled, + self->state_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { /* even if this is just a physical EOM, we may have lost data, so * the whole transfer is dead. */ xfer_cancel_with_error(XFER_ELEMENT(self), "Error writing from DirectTCP connection: %s", device_error_or_status(self->device)); goto cancelled; + } else if (result == 2 || elt->cancelled) { + goto cancelled; } g_timer_stop(timer); @@ -327,6 +341,7 @@ cancel_impl( * longer paused */ g_mutex_lock(self->state_mutex); g_cond_broadcast(self->paused_cond); + g_cond_broadcast(self->abort_cond); g_mutex_unlock(self->state_mutex); return rv; @@ -418,6 +433,7 @@ instance_init( self->conn = NULL; self->state_mutex = g_mutex_new(); self->paused_cond = g_cond_new(); + self->abort_cond = g_cond_new(); } static void @@ -440,6 +456,7 @@ finalize_impl( g_mutex_free(self->state_mutex); g_cond_free(self->paused_cond); + g_cond_free(self->abort_cond); if (self->part_header) dumpfile_free(self->part_header);