/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved.
+ * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
* Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
*/
-#include "amxfer.h"
#include "amanda.h"
+#include "amxfer.h"
#include "xfer-device.h"
#include "arglist.h"
#include "conffile.h"
* corresponding condition variable. */
volatile gboolean paused;
GCond *paused_cond;
+ GCond *abort_accept_cond; /* condition to trigger to abort an accept */
} XferDestTaperDirectTCP;
arglist_start(argp, fmt);
g_vsnprintf(msg, sizeof(msg), fmt, argp);
arglist_end(argp);
- g_debug("XDT thd-%p: %s", g_thread_self(), msg);
+ g_debug("XDT: %s", msg);
}
/*
* Worker Thread
XferElement *elt = (XferElement *)data;
XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)data;
GTimer *timer = g_timer_new();
+ int result;
/* This thread's job is to accept() an incoming connection, then call
* write_from_connection for each part, and then close the connection */
/* first, accept a new connection from the device */
DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
- if (!device_accept(self->device, &self->conn, NULL, NULL)) {
+ result = device_accept_with_cond(self->device, &self->conn,
+ self->state_mutex,
+ self->abort_accept_cond);
+ if (result == 2) {
xfer_cancel_with_error(XFER_ELEMENT(self),
"accepting DirectTCP connection: %s",
device_error_or_status(self->device));
g_mutex_unlock(self->state_mutex);
return NULL;
+ } else if (result == 1) {
+ g_mutex_unlock(self->state_mutex);
+ return NULL;
}
DBG(2, "connection accepted; sending XMSG_READY");
xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
+ /* round the part size up to the next multiple of the block size */
+ if (self->part_size) {
+ self->part_size += self->device->block_size-1;
+ self->part_size -= self->part_size % self->device->block_size;
+ }
+
/* now loop until we're out of parts */
while (1) {
guint64 size;
msg->successful = TRUE;
msg->eom = eom;
msg->eof = eof;
+
+ /* time runs backward on some test boxes, so make sure this is positive */
+ if (msg->duration < 0) msg->duration = 0;
+
xfer_queue_message(elt->xfer, msg);
self->partnum++;
* longer paused */
g_mutex_lock(self->state_mutex);
g_cond_broadcast(self->paused_cond);
+ g_cond_broadcast(self->abort_accept_cond);
g_mutex_unlock(self->state_mutex);
return rv;
g_mutex_unlock(self->state_mutex);
}
-static void
-cache_inform_impl(
- XferDestTaper *xdtself G_GNUC_UNUSED,
- const char *filename G_GNUC_UNUSED,
- off_t offset G_GNUC_UNUSED,
- off_t length G_GNUC_UNUSED)
-{
- /* do nothing */
-}
-
static guint64
get_part_bytes_written_impl(
XferDestTaper *xdtself G_GNUC_UNUSED)
self->conn = NULL;
self->state_mutex = g_mutex_new();
self->paused_cond = g_cond_new();
+ self->abort_accept_cond = g_cond_new();
}
static void
g_mutex_free(self->state_mutex);
g_cond_free(self->paused_cond);
+ g_cond_free(self->abort_accept_cond);
if (self->part_header)
dumpfile_free(self->part_header);
klass->cancel = cancel_impl;
xdt_klass->start_part = start_part_impl;
xdt_klass->use_device = use_device_impl;
- xdt_klass->cache_inform = cache_inform_impl;
xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
goc->finalize = finalize_impl;