/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved.
+ * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 as published
- * by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
*/
-#include "amxfer.h"
#include "amanda.h"
+#include "amxfer.h"
#include "xfer-device.h"
#include "arglist.h"
#include "conffile.h"
* corresponding condition variable. */
volatile gboolean paused;
GCond *paused_cond;
+ GCond *abort_cond; /* condition to trigger to abort an NDMP command */
} XferDestTaperDirectTCP;
arglist_start(argp, fmt);
g_vsnprintf(msg, sizeof(msg), fmt, argp);
arglist_end(argp);
- g_debug("XDT thd-%p: %s", g_thread_self(), msg);
+ g_debug("XDT: %s", msg);
}
/*
* Worker Thread
XferElement *elt = (XferElement *)data;
XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)data;
GTimer *timer = g_timer_new();
+ int result;
/* This thread's job is to accept() an incoming connection, then call
* write_from_connection for each part, and then close the connection */
/* first, accept a new connection from the device */
DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
- if (!device_accept(self->device, &self->conn, NULL, NULL)) {
+ result = device_accept(self->device, &self->conn, &elt->cancelled,
+ self->state_mutex, self->abort_cond);
+ if (result == 1 && !elt->cancelled) {
xfer_cancel_with_error(XFER_ELEMENT(self),
"accepting DirectTCP connection: %s",
device_error_or_status(self->device));
g_mutex_unlock(self->state_mutex);
- return NULL;
+ wait_until_xfer_cancelled(elt->xfer);
+ goto send_xmsg_done;
+ } else if (result == 2 || elt->cancelled) {
+ g_mutex_unlock(self->state_mutex);
+ wait_until_xfer_cancelled(elt->xfer);
+ goto send_xmsg_done;
}
DBG(2, "connection accepted; sending XMSG_READY");
xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
+ /* round the part size up to the next multiple of the block size */
+ if (self->part_size) {
+ self->part_size += self->device->block_size-1;
+ self->part_size -= self->part_size % self->device->block_size;
+ }
+
/* now loop until we're out of parts */
while (1) {
guint64 size;
/* write the part */
g_timer_start(timer);
- if (!device_write_from_connection(self->device,
- self->part_size, &size)) {
+ result = device_write_from_connection(self->device,
+ self->part_size, &size, &elt->cancelled,
+ self->state_mutex, self->abort_cond);
+ if (result == 1 && !elt->cancelled) {
/* even if this is just a physical EOM, we may have lost data, so
* the whole transfer is dead. */
xfer_cancel_with_error(XFER_ELEMENT(self),
"Error writing from DirectTCP connection: %s",
device_error_or_status(self->device));
goto cancelled;
+ } else if (result == 2 || elt->cancelled) {
+ goto cancelled;
}
g_timer_stop(timer);
msg->successful = TRUE;
msg->eom = eom;
msg->eof = eof;
+
+ /* time runs backward on some test boxes, so make sure this is positive */
+ if (msg->duration < 0) msg->duration = 0;
+
xfer_queue_message(elt->xfer, msg);
self->partnum++;
* longer paused */
g_mutex_lock(self->state_mutex);
g_cond_broadcast(self->paused_cond);
+ g_cond_broadcast(self->abort_cond);
g_mutex_unlock(self->state_mutex);
return rv;
g_mutex_unlock(self->state_mutex);
}
-static void
-cache_inform_impl(
- XferDestTaper *xdtself G_GNUC_UNUSED,
- const char *filename G_GNUC_UNUSED,
- off_t offset G_GNUC_UNUSED,
- off_t length G_GNUC_UNUSED)
-{
- /* do nothing */
-}
-
static guint64
get_part_bytes_written_impl(
XferDestTaper *xdtself G_GNUC_UNUSED)
self->conn = NULL;
self->state_mutex = g_mutex_new();
self->paused_cond = g_cond_new();
+ self->abort_cond = g_cond_new();
}
static void
g_mutex_free(self->state_mutex);
g_cond_free(self->paused_cond);
+ g_cond_free(self->abort_cond);
if (self->part_header)
dumpfile_free(self->part_header);
klass->cancel = cancel_impl;
xdt_klass->start_part = start_part_impl;
xdt_klass->use_device = use_device_impl;
- xdt_klass->cache_inform = cache_inform_impl;
xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
goc->finalize = finalize_impl;