X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fxfer-source-recovery.c;h=112301cd462c00597b61469f55d8540a0729b411;hb=HEAD;hp=425a83789218e3f8a3a248d5f7696e16583475a9;hpb=d5853102f67d85d8e169f9dbe973ad573306c215;p=debian%2Famanda diff --git a/device-src/xfer-source-recovery.c b/device-src/xfer-source-recovery.c index 425a837..112301c 100644 --- a/device-src/xfer-source-recovery.c +++ b/device-src/xfer-source-recovery.c @@ -1,10 +1,11 @@ /* * Amanda, The Advanced Maryland Automatic Network Disk Archiver - * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved. + * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved. * - * This program is free software; you can redistribute it and/or modify it - * under the terms of the GNU General Public License version 2 as published - * by the Free Software Foundation. + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. * * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY @@ -19,8 +20,8 @@ * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com */ -#include "amxfer.h" #include "amanda.h" +#include "amxfer.h" #include "device.h" #include "property.h" #include "xfer-device.h" @@ -55,7 +56,7 @@ typedef struct XferSourceRecovery { GThread *thread; /* this mutex in this condition variable governs all variables below */ - GCond *start_part_cond; + GCond *start_part_cond; GMutex *start_part_mutex; /* is this device currently paused and awaiting a new part? */ @@ -76,12 +77,19 @@ typedef struct XferSourceRecovery { * part) */ size_t block_size; + /* bytes read for this image */ + guint64 bytes_read; + /* part size (potentially including any zero-padding from the * device) */ guint64 part_size; /* timer for the duration; NULL while paused or cancelled */ GTimer *part_timer; + + gint64 size; + + GCond *abort_cond; /* condition to trigger to abort ndmp command */ } XferSourceRecovery; /* @@ -113,47 +121,27 @@ _xsr_dbg(const char *fmt, ...) arglist_start(argp, fmt); g_vsnprintf(msg, sizeof(msg), fmt, argp); arglist_end(argp); - g_debug("XSR thd-%p: %s", g_thread_self(), msg); + g_debug("XSR: %s", msg); } /* * Implementation */ +/* common code for both directtcp_listen_thread and directtcp_connect_thread; + * this is called after self->conn is filled in and carries out the data + * transfer over that connection. NOTE: start_part_mutex is HELD when this + * function begins */ static gpointer -directtcp_thread( - gpointer data) +directtcp_common_thread( + XferSourceRecovery *self) { - XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data); XferElement *elt = XFER_ELEMENT(self); char *errmsg = NULL; - - /* first, we need to accept the incoming connection; we do this while - * holding the start_part_mutex, so that a part doesn't get started until - * we're finished with the device */ - g_mutex_lock(self->start_part_mutex); - - if (elt->cancelled) { - g_mutex_unlock(self->start_part_mutex); - goto send_done; - } - - g_assert(self->device != NULL); /* have a device */ - g_assert(elt->output_listen_addrs != NULL); /* listening on it */ - g_assert(self->listen_ok); - - DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name); - if (!device_accept(self->device, &self->conn, NULL, NULL)) { - xfer_cancel_with_error(elt, - _("error accepting DirectTCP connection: %s"), - device_error_or_status(self->device)); - g_mutex_unlock(self->start_part_mutex); - wait_until_xfer_cancelled(elt->xfer); - goto send_done; - } + int result; /* send XMSG_READY to indicate it's OK to call start_part now */ - DBG(2, "connection accepted; sending XMSG_READY"); + DBG(2, "sending XMSG_READY"); xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0)); /* now we sit around waiting for signals to write a part */ @@ -181,11 +169,17 @@ directtcp_thread( while (1) { DBG(2, "reading part from %s", self->device->device_name); - if (!device_read_to_connection(self->device, G_MAXUINT64, &actual_size)) { + result = device_read_to_connection(self->device, G_MAXUINT64, + &actual_size, &elt->cancelled, + self->start_part_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { xfer_cancel_with_error(elt, _("error reading from device: %s"), device_error_or_status(self->device)); g_mutex_unlock(self->start_part_mutex); goto close_conn_and_send_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + goto close_conn_and_send_done; } /* break on EOF; otherwise do another read_to_connection */ @@ -228,9 +222,104 @@ close_conn_and_send_done: } } + xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0)); + + return NULL; +} + +static gpointer +directtcp_connect_thread( + gpointer data) +{ + XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data); + XferElement *elt = XFER_ELEMENT(self); + int result; + + DBG(1, "(this is directtcp_connect_thread)") + + /* first, we need to accept the incoming connection; we do this while + * holding the start_part_mutex, so that a part doesn't get started until + * we're finished with the device */ + g_mutex_lock(self->start_part_mutex); + + if (elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + goto send_done; + } + + g_assert(self->device != NULL); /* have a device */ + g_assert(elt->output_listen_addrs != NULL); /* listening on it */ + g_assert(self->listen_ok); + + DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name); + result = device_accept(self->device, &self->conn, &elt->cancelled, + self->start_part_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { + xfer_cancel_with_error(elt, + _("error accepting DirectTCP connection: %s"), + device_error_or_status(self->device)); + g_mutex_unlock(self->start_part_mutex); + wait_until_xfer_cancelled(elt->xfer); + goto send_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + goto send_done; + } + DBG(2, "DirectTCP connection accepted"); + + return directtcp_common_thread(self); + send_done: xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0)); + return NULL; +} +static gpointer +directtcp_listen_thread( + gpointer data) +{ + XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data); + XferElement *elt = XFER_ELEMENT(self); + int result; + + DBG(1, "(this is directtcp_listen_thread)"); + + /* we need to make an outgoing connection to downstream; we do this while + * holding the start_part_mutex, so that a part doesn't get started until + * we're finished with the device */ + g_mutex_lock(self->start_part_mutex); + + if (elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + goto send_done; + } + + g_assert(self->device != NULL); /* have a device */ + g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */ + + DBG(2, "making DirectTCP connection on device %s", self->device->device_name); + result = device_connect(self->device, FALSE, + elt->downstream->input_listen_addrs, + &self->conn, &elt->cancelled, + self->start_part_mutex, self->abort_cond); + if (result == 1 && !elt->cancelled) { + xfer_cancel_with_error(elt, + _("error making DirectTCP connection: %s"), + device_error_or_status(self->device)); + g_mutex_unlock(self->start_part_mutex); + wait_until_xfer_cancelled(elt->xfer); + goto send_done; + } else if (result == 2 || elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + wait_until_xfer_cancelled(elt->xfer); + goto send_done; + } + DBG(2, "DirectTCP connect succeeded"); + + return directtcp_common_thread(self); + +send_done: + xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0)); return NULL; } @@ -251,6 +340,8 @@ setup_impl( } self->listen_ok = TRUE; } else { + /* no output_listen_addrs for either XFER_MECH_DIRECTTCP_LISTEN or + * XFER_MECH_PULL_BUFFER */ elt->output_listen_addrs = NULL; } @@ -265,7 +356,11 @@ start_impl( if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) { g_assert(elt->output_listen_addrs != NULL); - self->thread = g_thread_create(directtcp_thread, (gpointer)self, FALSE, NULL); + self->thread = g_thread_create(directtcp_connect_thread, (gpointer)self, FALSE, NULL); + return TRUE; /* we'll send XMSG_DONE */ + } else if (elt->output_mech == XFER_MECH_DIRECTTCP_LISTEN) { + g_assert(elt->output_listen_addrs == NULL); + self->thread = g_thread_create(directtcp_listen_thread, (gpointer)self, FALSE, NULL); return TRUE; /* we'll send XMSG_DONE */ } else { /* nothing to prepare for - we're ready already! */ @@ -339,8 +434,9 @@ pull_buffer_impl( _("error reading from %s: %s"), self->device->device_name, device_error_or_status(self->device)); + g_mutex_unlock(self->start_part_mutex); wait_until_xfer_cancelled(elt->xfer); - goto error; + goto error_unlocked; } /* the device has signalled EOF (really end-of-part), so clean up instance @@ -357,6 +453,7 @@ pull_buffer_impl( self->paused = TRUE; g_object_unref(self->device); self->device = NULL; + self->bytes_read += self->part_size; self->part_size = 0; self->block_size = 0; if (self->part_timer) { @@ -372,9 +469,30 @@ pull_buffer_impl( g_mutex_unlock(self->start_part_mutex); + if (elt->size > 0) { + /* initialize on first pass */ + if (self->size == 0) + self->size = elt->size; + + if (self->size == -1) { + *size = 0; + amfree(buf); + return NULL; + } + + if (*size > (guint64)self->size) { + /* return only self->size bytes */ + *size = self->size; + self->size = -1; + } else { + self->size -= *size; + } + } + return buf; error: g_mutex_unlock(self->start_part_mutex); +error_unlocked: *size = 0; return NULL; } @@ -390,6 +508,7 @@ cancel_impl( /* trigger the condition variable, in case the thread is waiting on it */ g_mutex_lock(self->start_part_mutex); g_cond_broadcast(self->start_part_cond); + g_cond_broadcast(self->abort_cond); g_mutex_unlock(self->start_part_mutex); return TRUE; @@ -414,7 +533,8 @@ start_part_impl( /* make sure we're ready to go */ g_assert(self->paused); - if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT) { + if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT + || XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_LISTEN) { g_assert(self->conn != NULL); } @@ -480,6 +600,7 @@ get_mech_pairs_impl( }; static xfer_element_mech_pair_t directtcp_mech_pairs[] = { { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 0, 1}, + { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 0, 1}, /* devices which support DirectTCP are usually not very efficient * at delivering data via device_read_block, so this counts an extra * byte operation in the cost metrics (2 here vs. 1 in basic_mech_pairs). @@ -504,6 +625,7 @@ finalize_impl( g_object_unref(self->device); g_cond_free(self->start_part_cond); + g_cond_free(self->abort_cond); g_mutex_free(self->start_part_mutex); } @@ -515,6 +637,7 @@ instance_init( self->paused = TRUE; self->start_part_cond = g_cond_new(); + self->abort_cond = g_cond_new(); self->start_part_mutex = g_mutex_new(); } @@ -608,3 +731,17 @@ xfer_source_recovery_use_device( klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt); klass->use_device(XFER_SOURCE_RECOVERY(elt), device); } + +guint64 +xfer_source_recovery_get_bytes_read( + XferElement *elt) +{ + XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt); + guint64 bytes_read = self->bytes_read; + + if (self->device) + bytes_read += device_get_bytes_read(self->device); + + return bytes_read; +} +