Imported Upstream version 3.3.2
[debian/amanda] / device-src / xfer-source-recovery.c
index 425a83789218e3f8a3a248d5f7696e16583475a9..f305f0cb7f1e23fd91b2a5f99a6416d78563bcf1 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
+ * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 as published
@@ -19,8 +19,8 @@
  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
  */
 
-#include "amxfer.h"
 #include "amanda.h"
+#include "amxfer.h"
 #include "device.h"
 #include "property.h"
 #include "xfer-device.h"
@@ -76,12 +76,17 @@ typedef struct XferSourceRecovery {
      * part) */
     size_t block_size;
 
+    /* bytes read for this image */
+    guint64 bytes_read;
+
     /* part size (potentially including any zero-padding from the
      * device) */
     guint64 part_size;
 
     /* timer for the duration; NULL while paused or cancelled */
     GTimer *part_timer;
+
+    gint64   size;
 } XferSourceRecovery;
 
 /*
@@ -113,47 +118,26 @@ _xsr_dbg(const char *fmt, ...)
     arglist_start(argp, fmt);
     g_vsnprintf(msg, sizeof(msg), fmt, argp);
     arglist_end(argp);
-    g_debug("XSR thd-%p: %s", g_thread_self(), msg);
+    g_debug("XSR: %s", msg);
 }
 
 /*
  * Implementation
  */
 
+/* common code for both directtcp_listen_thread and directtcp_connect_thread;
+ * this is called after self->conn is filled in and carries out the data
+ * transfer over that connection.  NOTE: start_part_mutex is HELD when this
+ * function begins */
 static gpointer
-directtcp_thread(
-       gpointer data)
+directtcp_common_thread(
+       XferSourceRecovery *self)
 {
-    XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
     XferElement *elt = XFER_ELEMENT(self);
     char *errmsg = NULL;
 
-    /* first, we need to accept the incoming connection; we do this while
-     * holding the start_part_mutex, so that a part doesn't get started until
-     * we're finished with the device */
-    g_mutex_lock(self->start_part_mutex);
-
-    if (elt->cancelled) {
-       g_mutex_unlock(self->start_part_mutex);
-       goto send_done;
-    }
-
-    g_assert(self->device != NULL); /* have a device */
-    g_assert(elt->output_listen_addrs != NULL); /* listening on it */
-    g_assert(self->listen_ok);
-
-    DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
-    if (!device_accept(self->device, &self->conn, NULL, NULL)) {
-       xfer_cancel_with_error(elt,
-           _("error accepting DirectTCP connection: %s"),
-           device_error_or_status(self->device));
-       g_mutex_unlock(self->start_part_mutex);
-       wait_until_xfer_cancelled(elt->xfer);
-       goto send_done;
-    }
-
     /* send XMSG_READY to indicate it's OK to call start_part now */
-    DBG(2, "connection accepted; sending XMSG_READY");
+    DBG(2, "sending XMSG_READY");
     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
 
     /* now we sit around waiting for signals to write a part */
@@ -228,9 +212,90 @@ close_conn_and_send_done:
        }
     }
 
+    xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
+
+    return NULL;
+}
+
+static gpointer
+directtcp_connect_thread(
+       gpointer data)
+{
+    XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
+    XferElement *elt = XFER_ELEMENT(self);
+
+    DBG(1, "(this is directtcp_connect_thread)")
+
+    /* first, we need to accept the incoming connection; we do this while
+     * holding the start_part_mutex, so that a part doesn't get started until
+     * we're finished with the device */
+    g_mutex_lock(self->start_part_mutex);
+
+    if (elt->cancelled) {
+       g_mutex_unlock(self->start_part_mutex);
+       goto send_done;
+    }
+
+    g_assert(self->device != NULL); /* have a device */
+    g_assert(elt->output_listen_addrs != NULL); /* listening on it */
+    g_assert(self->listen_ok);
+
+    DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
+    if (!device_accept(self->device, &self->conn, NULL, NULL)) {
+       xfer_cancel_with_error(elt,
+           _("error accepting DirectTCP connection: %s"),
+           device_error_or_status(self->device));
+       g_mutex_unlock(self->start_part_mutex);
+       wait_until_xfer_cancelled(elt->xfer);
+       goto send_done;
+    }
+    DBG(2, "DirectTCP connection accepted");
+
+    return directtcp_common_thread(self);
+
 send_done:
     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
+    return NULL;
+}
 
+static gpointer
+directtcp_listen_thread(
+       gpointer data)
+{
+    XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
+    XferElement *elt = XFER_ELEMENT(self);
+
+    DBG(1, "(this is directtcp_listen_thread)");
+
+    /* we need to make an outgoing connection to downstream; we do this while
+     * holding the start_part_mutex, so that a part doesn't get started until
+     * we're finished with the device */
+    g_mutex_lock(self->start_part_mutex);
+
+    if (elt->cancelled) {
+       g_mutex_unlock(self->start_part_mutex);
+       goto send_done;
+    }
+
+    g_assert(self->device != NULL); /* have a device */
+    g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */
+
+    DBG(2, "making DirectTCP connection on device %s", self->device->device_name);
+    if (!device_connect(self->device, FALSE, elt->downstream->input_listen_addrs,
+                       &self->conn, NULL, NULL)) {
+       xfer_cancel_with_error(elt,
+           _("error making DirectTCP connection: %s"),
+           device_error_or_status(self->device));
+       g_mutex_unlock(self->start_part_mutex);
+       wait_until_xfer_cancelled(elt->xfer);
+       goto send_done;
+    }
+    DBG(2, "DirectTCP connect succeeded");
+
+    return directtcp_common_thread(self);
+
+send_done:
+    xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
     return NULL;
 }
 
@@ -251,6 +316,8 @@ setup_impl(
        }
        self->listen_ok = TRUE;
     } else {
+       /* no output_listen_addrs for either XFER_MECH_DIRECTTCP_LISTEN or
+        * XFER_MECH_PULL_BUFFER */
        elt->output_listen_addrs = NULL;
     }
 
@@ -265,7 +332,11 @@ start_impl(
 
     if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
        g_assert(elt->output_listen_addrs != NULL);
-       self->thread = g_thread_create(directtcp_thread, (gpointer)self, FALSE, NULL);
+       self->thread = g_thread_create(directtcp_connect_thread, (gpointer)self, FALSE, NULL);
+       return TRUE; /* we'll send XMSG_DONE */
+    } else if (elt->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
+       g_assert(elt->output_listen_addrs == NULL);
+       self->thread = g_thread_create(directtcp_listen_thread, (gpointer)self, FALSE, NULL);
        return TRUE; /* we'll send XMSG_DONE */
     } else {
        /* nothing to prepare for - we're ready already! */
@@ -339,8 +410,9 @@ pull_buffer_impl(
                    _("error reading from %s: %s"),
                    self->device->device_name,
                    device_error_or_status(self->device));
+               g_mutex_unlock(self->start_part_mutex);
                wait_until_xfer_cancelled(elt->xfer);
-                goto error;
+                goto error_unlocked;
            }
 
            /* the device has signalled EOF (really end-of-part), so clean up instance
@@ -357,6 +429,7 @@ pull_buffer_impl(
            self->paused = TRUE;
            g_object_unref(self->device);
            self->device = NULL;
+           self->bytes_read += self->part_size;
            self->part_size = 0;
            self->block_size = 0;
            if (self->part_timer) {
@@ -372,9 +445,30 @@ pull_buffer_impl(
 
     g_mutex_unlock(self->start_part_mutex);
 
+    if (elt->size > 0) {
+       /* initialize on first pass */
+       if (self->size == 0)
+           self->size = elt->size;
+       
+       if (self->size == -1) {
+           *size = 0;
+           amfree(buf);
+           return NULL;
+       }
+
+       if (*size > (guint64)self->size) {
+           /* return only self->size bytes */
+           *size = self->size;
+           self->size = -1;
+       } else {
+           self->size -= *size;
+       }
+    }
+
     return buf;
 error:
     g_mutex_unlock(self->start_part_mutex);
+error_unlocked:
     *size = 0;
     return NULL;
 }
@@ -414,7 +508,8 @@ start_part_impl(
 
     /* make sure we're ready to go */
     g_assert(self->paused);
-    if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
+    if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT
+     || XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
        g_assert(self->conn != NULL);
     }
 
@@ -480,6 +575,7 @@ get_mech_pairs_impl(
     };
     static xfer_element_mech_pair_t directtcp_mech_pairs[] = {
        { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 0, 1},
+       { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 0, 1},
        /* devices which support DirectTCP are usually not very efficient
         * at delivering data via device_read_block, so this counts an extra
         * byte operation in the cost metrics (2 here vs. 1 in basic_mech_pairs).
@@ -608,3 +704,17 @@ xfer_source_recovery_use_device(
     klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
     klass->use_device(XFER_SOURCE_RECOVERY(elt), device);
 }
+
+guint64
+xfer_source_recovery_get_bytes_read(
+    XferElement *elt)
+{
+    XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
+    guint64 bytes_read = self->bytes_read;
+
+    if (self->device)
+       bytes_read += device_get_bytes_read(self->device);
+
+    return bytes_read;
+}
+