X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fxfer-dest-taper-directtcp.c;h=9257538013a1b5e991d9efa0fcb24a7ff91c1414;hb=HEAD;hp=9036d22382bdcf282f8c06461d4d37e1591cca3e;hpb=fd48f3e498442f0cbff5f3606c7c403d0566150e;p=debian%2Famanda diff --git a/device-src/xfer-dest-taper-directtcp.c b/device-src/xfer-dest-taper-directtcp.c index 9036d22..9257538 100644 --- a/device-src/xfer-dest-taper-directtcp.c +++ b/device-src/xfer-dest-taper-directtcp.c @@ -1,10 +1,11 @@ /* * Amanda, The Advanced Maryland Automatic Network Disk Archiver - * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved. + * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved. * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 as published - * by the Free Software Foundation. + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY @@ -19,8 +20,8 @@ * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com */ -#include "amxfer.h" #include "amanda.h" +#include "amxfer.h" #include "xfer-device.h" #include "arglist.h" #include "conffile.h" @@ -74,6 +75,7 @@ typedef struct XferDestTaperDirectTCP { * corresponding condition variable. */ volatile gboolean paused; GCond *paused_cond; + GCond *abort_cond; /* condition to trigger to abort an NDMP command */ } XferDestTaperDirectTCP; @@ -95,7 +97,7 @@ _xdt_dbg(const char *fmt, ...) arglist_start(argp, fmt); g_vsnprintf(msg, sizeof(msg), fmt, argp); arglist_end(argp); - g_debug("XDT thd-%p: %s", g_thread_self(), msg); + g_debug("XDT: %s", msg); } /* * Worker Thread @@ -108,6 +110,7 @@ worker_thread( XferElement *elt = (XferElement *)data; XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)data; GTimer *timer = g_timer_new(); + int result; /* This thread's job is to accept() an incoming connection, then call * write_from_connection for each part, and then close the connection */ @@ -124,17 +127,30 @@ worker_thread( /* first, accept a new connection from the device */ DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name); - if (!device_accept(self->device, &self->conn, NULL, NULL)) { + result = device_accept(self->device, &self->conn, &elt->cancelled, + self->state_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { xfer_cancel_with_error(XFER_ELEMENT(self), "accepting DirectTCP connection: %s", device_error_or_status(self->device)); g_mutex_unlock(self->state_mutex); - return NULL; + wait_until_xfer_cancelled(elt->xfer); + goto send_xmsg_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->state_mutex); + wait_until_xfer_cancelled(elt->xfer); + goto send_xmsg_done; } DBG(2, "connection accepted; sending XMSG_READY"); xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0)); + /* round the part size up to the next multiple of the block size */ + if (self->part_size) { + self->part_size += self->device->block_size-1; + self->part_size -= self->part_size % self->device->block_size; + } + /* now loop until we're out of parts */ while (1) { guint64 size; @@ -173,14 +189,18 @@ worker_thread( /* write the part */ g_timer_start(timer); - if (!device_write_from_connection(self->device, - self->part_size, &size)) { + result = device_write_from_connection(self->device, + self->part_size, &size, &elt->cancelled, + self->state_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { /* even if this is just a physical EOM, we may have lost data, so * the whole transfer is dead. */ xfer_cancel_with_error(XFER_ELEMENT(self), "Error writing from DirectTCP connection: %s", device_error_or_status(self->device)); goto cancelled; + } else if (result == 2 || elt->cancelled) { + goto cancelled; } g_timer_stop(timer); @@ -209,6 +229,10 @@ worker_thread( msg->successful = TRUE; msg->eom = eom; msg->eof = eof; + + /* time runs backward on some test boxes, so make sure this is positive */ + if (msg->duration < 0) msg->duration = 0; + xfer_queue_message(elt->xfer, msg); self->partnum++; @@ -317,6 +341,7 @@ cancel_impl( * longer paused */ g_mutex_lock(self->state_mutex); g_cond_broadcast(self->paused_cond); + g_cond_broadcast(self->abort_cond); g_mutex_unlock(self->state_mutex); return rv; @@ -388,16 +413,6 @@ use_device_impl( g_mutex_unlock(self->state_mutex); } -static void -cache_inform_impl( - XferDestTaper *xdtself G_GNUC_UNUSED, - const char *filename G_GNUC_UNUSED, - off_t offset G_GNUC_UNUSED, - off_t length G_GNUC_UNUSED) -{ - /* do nothing */ -} - static guint64 get_part_bytes_written_impl( XferDestTaper *xdtself G_GNUC_UNUSED) @@ -418,6 +433,7 @@ instance_init( self->conn = NULL; self->state_mutex = g_mutex_new(); self->paused_cond = g_cond_new(); + self->abort_cond = g_cond_new(); } static void @@ -440,6 +456,7 @@ finalize_impl( g_mutex_free(self->state_mutex); g_cond_free(self->paused_cond); + g_cond_free(self->abort_cond); if (self->part_header) dumpfile_free(self->part_header); @@ -466,7 +483,6 @@ class_init( klass->cancel = cancel_impl; xdt_klass->start_part = start_part_impl; xdt_klass->use_device = use_device_impl; - xdt_klass->cache_inform = cache_inform_impl; xdt_klass->get_part_bytes_written = get_part_bytes_written_impl; goc->finalize = finalize_impl;