* Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
*/
-#include "amxfer.h"
#include "amanda.h"
+#include "amxfer.h"
#include "device.h"
#include "property.h"
#include "xfer-device.h"
/* timer for the duration; NULL while paused or cancelled */
GTimer *part_timer;
+
+ gint64 size;
} XferSourceRecovery;
/*
* Implementation
*/
+/* common code for both directtcp_listen_thread and directtcp_connect_thread;
+ * this is called after self->conn is filled in and carries out the data
+ * transfer over that connection. NOTE: start_part_mutex is HELD when this
+ * function begins */
static gpointer
-directtcp_thread(
- gpointer data)
+directtcp_common_thread(
+ XferSourceRecovery *self)
{
- XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
XferElement *elt = XFER_ELEMENT(self);
char *errmsg = NULL;
- /* first, we need to accept the incoming connection; we do this while
- * holding the start_part_mutex, so that a part doesn't get started until
- * we're finished with the device */
- g_mutex_lock(self->start_part_mutex);
-
- if (elt->cancelled) {
- g_mutex_unlock(self->start_part_mutex);
- goto send_done;
- }
-
- g_assert(self->device != NULL); /* have a device */
- g_assert(elt->output_listen_addrs != NULL); /* listening on it */
- g_assert(self->listen_ok);
-
- DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
- if (!device_accept(self->device, &self->conn, NULL, NULL)) {
- xfer_cancel_with_error(elt,
- _("error accepting DirectTCP connection: %s"),
- device_error_or_status(self->device));
- g_mutex_unlock(self->start_part_mutex);
- wait_until_xfer_cancelled(elt->xfer);
- goto send_done;
- }
-
/* send XMSG_READY to indicate it's OK to call start_part now */
- DBG(2, "connection accepted; sending XMSG_READY");
+ DBG(2, "sending XMSG_READY");
xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
/* now we sit around waiting for signals to write a part */
}
}
+ xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
+
+ return NULL;
+}
+
+static gpointer
+directtcp_connect_thread(
+ gpointer data)
+{
+ XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
+ XferElement *elt = XFER_ELEMENT(self);
+
+ DBG(1, "(this is directtcp_connect_thread)")
+
+ /* first, we need to accept the incoming connection; we do this while
+ * holding the start_part_mutex, so that a part doesn't get started until
+ * we're finished with the device */
+ g_mutex_lock(self->start_part_mutex);
+
+ if (elt->cancelled) {
+ g_mutex_unlock(self->start_part_mutex);
+ goto send_done;
+ }
+
+ g_assert(self->device != NULL); /* have a device */
+ g_assert(elt->output_listen_addrs != NULL); /* listening on it */
+ g_assert(self->listen_ok);
+
+ DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
+ if (!device_accept(self->device, &self->conn, NULL, NULL)) {
+ xfer_cancel_with_error(elt,
+ _("error accepting DirectTCP connection: %s"),
+ device_error_or_status(self->device));
+ g_mutex_unlock(self->start_part_mutex);
+ wait_until_xfer_cancelled(elt->xfer);
+ goto send_done;
+ }
+ DBG(2, "DirectTCP connection accepted");
+
+ return directtcp_common_thread(self);
+
send_done:
xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
+ return NULL;
+}
+static gpointer
+directtcp_listen_thread(
+ gpointer data)
+{
+ XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
+ XferElement *elt = XFER_ELEMENT(self);
+
+ DBG(1, "(this is directtcp_listen_thread)");
+
+ /* we need to make an outgoing connection to downstream; we do this while
+ * holding the start_part_mutex, so that a part doesn't get started until
+ * we're finished with the device */
+ g_mutex_lock(self->start_part_mutex);
+
+ if (elt->cancelled) {
+ g_mutex_unlock(self->start_part_mutex);
+ goto send_done;
+ }
+
+ g_assert(self->device != NULL); /* have a device */
+ g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */
+
+ DBG(2, "making DirectTCP connection on device %s", self->device->device_name);
+ if (!device_connect(self->device, FALSE, elt->downstream->input_listen_addrs,
+ &self->conn, NULL, NULL)) {
+ xfer_cancel_with_error(elt,
+ _("error making DirectTCP connection: %s"),
+ device_error_or_status(self->device));
+ g_mutex_unlock(self->start_part_mutex);
+ wait_until_xfer_cancelled(elt->xfer);
+ goto send_done;
+ }
+ DBG(2, "DirectTCP connect succeeded");
+
+ return directtcp_common_thread(self);
+
+send_done:
+ xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
return NULL;
}
}
self->listen_ok = TRUE;
} else {
+ /* no output_listen_addrs for either XFER_MECH_DIRECTTCP_LISTEN or
+ * XFER_MECH_PULL_BUFFER */
elt->output_listen_addrs = NULL;
}
if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
g_assert(elt->output_listen_addrs != NULL);
- self->thread = g_thread_create(directtcp_thread, (gpointer)self, FALSE, NULL);
+ self->thread = g_thread_create(directtcp_connect_thread, (gpointer)self, FALSE, NULL);
+ return TRUE; /* we'll send XMSG_DONE */
+ } else if (elt->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
+ g_assert(elt->output_listen_addrs == NULL);
+ self->thread = g_thread_create(directtcp_listen_thread, (gpointer)self, FALSE, NULL);
return TRUE; /* we'll send XMSG_DONE */
} else {
/* nothing to prepare for - we're ready already! */
g_mutex_unlock(self->start_part_mutex);
+ if (elt->size > 0) {
+ /* initialize on first pass */
+ if (self->size == 0)
+ self->size = elt->size;
+
+ if (self->size == -1) {
+ *size = 0;
+ amfree(buf);
+ return NULL;
+ }
+
+ if (*size > (guint64)self->size) {
+ /* return only self->size bytes */
+ *size = self->size;
+ self->size = -1;
+ } else {
+ self->size -= *size;
+ }
+ }
+
return buf;
error:
g_mutex_unlock(self->start_part_mutex);
/* make sure we're ready to go */
g_assert(self->paused);
- if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
+ if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT
+ || XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
g_assert(self->conn != NULL);
}
};
static xfer_element_mech_pair_t directtcp_mech_pairs[] = {
{ XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 0, 1},
+ { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 0, 1},
/* devices which support DirectTCP are usually not very efficient
* at delivering data via device_read_block, so this counts an extra
* byte operation in the cost metrics (2 here vs. 1 in basic_mech_pairs).