/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved.
+ * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 as published
- * by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
GThread *thread;
/* this mutex in this condition variable governs all variables below */
- GCond *start_part_cond;
+ GCond *start_part_cond;
GMutex *start_part_mutex;
/* is this device currently paused and awaiting a new part? */
* part) */
size_t block_size;
+ /* bytes read for this image */
+ guint64 bytes_read;
+
/* part size (potentially including any zero-padding from the
* device) */
guint64 part_size;
GTimer *part_timer;
gint64 size;
+
+ GCond *abort_cond; /* condition to trigger to abort ndmp command */
} XferSourceRecovery;
/*
{
XferElement *elt = XFER_ELEMENT(self);
char *errmsg = NULL;
+ int result;
/* send XMSG_READY to indicate it's OK to call start_part now */
DBG(2, "sending XMSG_READY");
while (1) {
DBG(2, "reading part from %s", self->device->device_name);
- if (!device_read_to_connection(self->device, G_MAXUINT64, &actual_size)) {
+ result = device_read_to_connection(self->device, G_MAXUINT64,
+ &actual_size, &elt->cancelled,
+ self->start_part_mutex, self->abort_cond);
+ if (result == 1 && !elt->cancelled) {
xfer_cancel_with_error(elt, _("error reading from device: %s"),
device_error_or_status(self->device));
g_mutex_unlock(self->start_part_mutex);
goto close_conn_and_send_done;
+ } else if (result == 2 || elt->cancelled) {
+ g_mutex_unlock(self->start_part_mutex);
+ goto close_conn_and_send_done;
}
/* break on EOF; otherwise do another read_to_connection */
{
XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
XferElement *elt = XFER_ELEMENT(self);
+ int result;
DBG(1, "(this is directtcp_connect_thread)")
g_assert(self->listen_ok);
DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
- if (!device_accept(self->device, &self->conn, NULL, NULL)) {
+ result = device_accept(self->device, &self->conn, &elt->cancelled,
+ self->start_part_mutex, self->abort_cond);
+ if (result == 1 && !elt->cancelled) {
xfer_cancel_with_error(elt,
_("error accepting DirectTCP connection: %s"),
device_error_or_status(self->device));
g_mutex_unlock(self->start_part_mutex);
wait_until_xfer_cancelled(elt->xfer);
goto send_done;
+ } else if (result == 2 || elt->cancelled) {
+ g_mutex_unlock(self->start_part_mutex);
+ goto send_done;
}
DBG(2, "DirectTCP connection accepted");
{
XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
XferElement *elt = XFER_ELEMENT(self);
+ int result;
DBG(1, "(this is directtcp_listen_thread)");
g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */
DBG(2, "making DirectTCP connection on device %s", self->device->device_name);
- if (!device_connect(self->device, FALSE, elt->downstream->input_listen_addrs,
- &self->conn, NULL, NULL)) {
+ result = device_connect(self->device, FALSE,
+ elt->downstream->input_listen_addrs,
+ &self->conn, &elt->cancelled,
+ self->start_part_mutex, self->abort_cond);
+ if (result == 1 && !elt->cancelled) {
xfer_cancel_with_error(elt,
_("error making DirectTCP connection: %s"),
device_error_or_status(self->device));
g_mutex_unlock(self->start_part_mutex);
wait_until_xfer_cancelled(elt->xfer);
goto send_done;
+ } else if (result == 2 || elt->cancelled) {
+ g_mutex_unlock(self->start_part_mutex);
+ wait_until_xfer_cancelled(elt->xfer);
+ goto send_done;
}
DBG(2, "DirectTCP connect succeeded");
self->paused = TRUE;
g_object_unref(self->device);
self->device = NULL;
+ self->bytes_read += self->part_size;
self->part_size = 0;
self->block_size = 0;
if (self->part_timer) {
/* trigger the condition variable, in case the thread is waiting on it */
g_mutex_lock(self->start_part_mutex);
g_cond_broadcast(self->start_part_cond);
+ g_cond_broadcast(self->abort_cond);
g_mutex_unlock(self->start_part_mutex);
return TRUE;
g_object_unref(self->device);
g_cond_free(self->start_part_cond);
+ g_cond_free(self->abort_cond);
g_mutex_free(self->start_part_mutex);
}
self->paused = TRUE;
self->start_part_cond = g_cond_new();
+ self->abort_cond = g_cond_new();
self->start_part_mutex = g_mutex_new();
}
klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
klass->use_device(XFER_SOURCE_RECOVERY(elt), device);
}
+
+guint64
+xfer_source_recovery_get_bytes_read(
+ XferElement *elt)
+{
+ XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
+ guint64 bytes_read = self->bytes_read;
+
+ if (self->device)
+ bytes_read += device_get_bytes_read(self->device);
+
+ return bytes_read;
+}
+