X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fxfer-source-recovery.c;h=da2813473ebbdc9f3bb3efeae70c3b141f71b8ac;hb=b116e9366c7b2ea2c2eb53b0a13df4090e176235;hp=425a83789218e3f8a3a248d5f7696e16583475a9;hpb=fd48f3e498442f0cbff5f3606c7c403d0566150e;p=debian%2Famanda diff --git a/device-src/xfer-source-recovery.c b/device-src/xfer-source-recovery.c index 425a837..da28134 100644 --- a/device-src/xfer-source-recovery.c +++ b/device-src/xfer-source-recovery.c @@ -19,8 +19,8 @@ * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com */ -#include "amxfer.h" #include "amanda.h" +#include "amxfer.h" #include "device.h" #include "property.h" #include "xfer-device.h" @@ -120,40 +120,19 @@ _xsr_dbg(const char *fmt, ...) * Implementation */ +/* common code for both directtcp_listen_thread and directtcp_connect_thread; + * this is called after self->conn is filled in and carries out the data + * transfer over that connection. NOTE: start_part_mutex is HELD when this + * function begins */ static gpointer -directtcp_thread( - gpointer data) +directtcp_common_thread( + XferSourceRecovery *self) { - XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data); XferElement *elt = XFER_ELEMENT(self); char *errmsg = NULL; - /* first, we need to accept the incoming connection; we do this while - * holding the start_part_mutex, so that a part doesn't get started until - * we're finished with the device */ - g_mutex_lock(self->start_part_mutex); - - if (elt->cancelled) { - g_mutex_unlock(self->start_part_mutex); - goto send_done; - } - - g_assert(self->device != NULL); /* have a device */ - g_assert(elt->output_listen_addrs != NULL); /* listening on it */ - g_assert(self->listen_ok); - - DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name); - if (!device_accept(self->device, &self->conn, NULL, NULL)) { - xfer_cancel_with_error(elt, - _("error accepting DirectTCP connection: %s"), - device_error_or_status(self->device)); - g_mutex_unlock(self->start_part_mutex); - wait_until_xfer_cancelled(elt->xfer); - goto send_done; - } - /* send XMSG_READY to indicate it's OK to call start_part now */ - DBG(2, "connection accepted; sending XMSG_READY"); + DBG(2, "sending XMSG_READY"); xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0)); /* now we sit around waiting for signals to write a part */ @@ -228,9 +207,90 @@ close_conn_and_send_done: } } + xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0)); + + return NULL; +} + +static gpointer +directtcp_connect_thread( + gpointer data) +{ + XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data); + XferElement *elt = XFER_ELEMENT(self); + + DBG(1, "(this is directtcp_connect_thread)") + + /* first, we need to accept the incoming connection; we do this while + * holding the start_part_mutex, so that a part doesn't get started until + * we're finished with the device */ + g_mutex_lock(self->start_part_mutex); + + if (elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + goto send_done; + } + + g_assert(self->device != NULL); /* have a device */ + g_assert(elt->output_listen_addrs != NULL); /* listening on it */ + g_assert(self->listen_ok); + + DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name); + if (!device_accept(self->device, &self->conn, NULL, NULL)) { + xfer_cancel_with_error(elt, + _("error accepting DirectTCP connection: %s"), + device_error_or_status(self->device)); + g_mutex_unlock(self->start_part_mutex); + wait_until_xfer_cancelled(elt->xfer); + goto send_done; + } + DBG(2, "DirectTCP connection accepted"); + + return directtcp_common_thread(self); + send_done: xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0)); + return NULL; +} + +static gpointer +directtcp_listen_thread( + gpointer data) +{ + XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data); + XferElement *elt = XFER_ELEMENT(self); + + DBG(1, "(this is directtcp_listen_thread)"); + + /* we need to make an outgoing connection to downstream; we do this while + * holding the start_part_mutex, so that a part doesn't get started until + * we're finished with the device */ + g_mutex_lock(self->start_part_mutex); + + if (elt->cancelled) { + g_mutex_unlock(self->start_part_mutex); + goto send_done; + } + + g_assert(self->device != NULL); /* have a device */ + g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */ + + DBG(2, "making DirectTCP connection on device %s", self->device->device_name); + if (!device_connect(self->device, FALSE, elt->downstream->input_listen_addrs, + &self->conn, NULL, NULL)) { + xfer_cancel_with_error(elt, + _("error making DirectTCP connection: %s"), + device_error_or_status(self->device)); + g_mutex_unlock(self->start_part_mutex); + wait_until_xfer_cancelled(elt->xfer); + goto send_done; + } + DBG(2, "DirectTCP connect succeeded"); + + return directtcp_common_thread(self); +send_done: + xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0)); return NULL; } @@ -251,6 +311,8 @@ setup_impl( } self->listen_ok = TRUE; } else { + /* no output_listen_addrs for either XFER_MECH_DIRECTTCP_LISTEN or + * XFER_MECH_PULL_BUFFER */ elt->output_listen_addrs = NULL; } @@ -265,7 +327,11 @@ start_impl( if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) { g_assert(elt->output_listen_addrs != NULL); - self->thread = g_thread_create(directtcp_thread, (gpointer)self, FALSE, NULL); + self->thread = g_thread_create(directtcp_connect_thread, (gpointer)self, FALSE, NULL); + return TRUE; /* we'll send XMSG_DONE */ + } else if (elt->output_mech == XFER_MECH_DIRECTTCP_LISTEN) { + g_assert(elt->output_listen_addrs == NULL); + self->thread = g_thread_create(directtcp_listen_thread, (gpointer)self, FALSE, NULL); return TRUE; /* we'll send XMSG_DONE */ } else { /* nothing to prepare for - we're ready already! */ @@ -414,7 +480,8 @@ start_part_impl( /* make sure we're ready to go */ g_assert(self->paused); - if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT) { + if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT + || XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_LISTEN) { g_assert(self->conn != NULL); } @@ -480,6 +547,7 @@ get_mech_pairs_impl( }; static xfer_element_mech_pair_t directtcp_mech_pairs[] = { { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 0, 1}, + { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 0, 1}, /* devices which support DirectTCP are usually not very efficient * at delivering data via device_read_block, so this counts an extra * byte operation in the cost metrics (2 here vs. 1 in basic_mech_pairs).