lintian doesn't like orphan packages with uploaders...
[debian/amanda] / device-src / xfer-dest-taper-directtcp.c
index 9036d22382bdcf282f8c06461d4d37e1591cca3e..9257538013a1b5e991d9efa0fcb24a7ff91c1414 100644 (file)
@@ -1,10 +1,11 @@
 /*
  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
+ * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
  *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 as published
- * by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
  *
  * This program is distributed in the hope that it will be useful, but
  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -19,8 +20,8 @@
  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
  */
 
-#include "amxfer.h"
 #include "amanda.h"
+#include "amxfer.h"
 #include "xfer-device.h"
 #include "arglist.h"
 #include "conffile.h"
@@ -74,6 +75,7 @@ typedef struct XferDestTaperDirectTCP {
      * corresponding condition variable. */
     volatile gboolean paused;
     GCond *paused_cond;
+    GCond *abort_cond; /* condition to trigger to abort an NDMP command */
 
 } XferDestTaperDirectTCP;
 
@@ -95,7 +97,7 @@ _xdt_dbg(const char *fmt, ...)
     arglist_start(argp, fmt);
     g_vsnprintf(msg, sizeof(msg), fmt, argp);
     arglist_end(argp);
-    g_debug("XDT thd-%p: %s", g_thread_self(), msg);
+    g_debug("XDT: %s", msg);
 }
 /*
  * Worker Thread
@@ -108,6 +110,7 @@ worker_thread(
     XferElement *elt = (XferElement *)data;
     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)data;
     GTimer *timer = g_timer_new();
+    int result;
 
     /* This thread's job is to accept() an incoming connection, then call
      * write_from_connection for each part, and then close the connection */
@@ -124,17 +127,30 @@ worker_thread(
 
     /* first, accept a new connection from the device */
     DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
-    if (!device_accept(self->device, &self->conn, NULL, NULL)) {
+    result = device_accept(self->device, &self->conn, &elt->cancelled,
+                          self->state_mutex, self->abort_cond);
+    if (result == 1 && !elt->cancelled) {
        xfer_cancel_with_error(XFER_ELEMENT(self),
            "accepting DirectTCP connection: %s",
            device_error_or_status(self->device));
        g_mutex_unlock(self->state_mutex);
-       return NULL;
+       wait_until_xfer_cancelled(elt->xfer);
+       goto send_xmsg_done;
+    } else if (result == 2 || elt->cancelled) {
+       g_mutex_unlock(self->state_mutex);
+       wait_until_xfer_cancelled(elt->xfer);
+       goto send_xmsg_done;
     }
 
     DBG(2, "connection accepted; sending XMSG_READY");
     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
 
+    /* round the part size up to the next multiple of the block size */
+    if (self->part_size) {
+       self->part_size += self->device->block_size-1;
+       self->part_size -= self->part_size % self->device->block_size;
+    }
+
     /* now loop until we're out of parts */
     while (1) {
        guint64 size;
@@ -173,14 +189,18 @@ worker_thread(
 
        /* write the part */
        g_timer_start(timer);
-       if (!device_write_from_connection(self->device,
-               self->part_size, &size)) {
+       result = device_write_from_connection(self->device,
+               self->part_size, &size, &elt->cancelled,
+               self->state_mutex, self->abort_cond);
+       if (result == 1 && !elt->cancelled) {
            /* even if this is just a physical EOM, we may have lost data, so
             * the whole transfer is dead. */
            xfer_cancel_with_error(XFER_ELEMENT(self),
                "Error writing from DirectTCP connection: %s",
                device_error_or_status(self->device));
            goto cancelled;
+       } else if (result == 2 || elt->cancelled) {
+           goto cancelled;
        }
        g_timer_stop(timer);
 
@@ -209,6 +229,10 @@ worker_thread(
        msg->successful = TRUE;
        msg->eom = eom;
        msg->eof = eof;
+
+       /* time runs backward on some test boxes, so make sure this is positive */
+       if (msg->duration < 0) msg->duration = 0;
+
        xfer_queue_message(elt->xfer, msg);
 
         self->partnum++;
@@ -317,6 +341,7 @@ cancel_impl(
      * longer paused */
     g_mutex_lock(self->state_mutex);
     g_cond_broadcast(self->paused_cond);
+    g_cond_broadcast(self->abort_cond);
     g_mutex_unlock(self->state_mutex);
 
     return rv;
@@ -388,16 +413,6 @@ use_device_impl(
     g_mutex_unlock(self->state_mutex);
 }
 
-static void
-cache_inform_impl(
-    XferDestTaper *xdtself G_GNUC_UNUSED,
-    const char *filename G_GNUC_UNUSED,
-    off_t offset G_GNUC_UNUSED,
-    off_t length G_GNUC_UNUSED)
-{
-    /* do nothing */
-}
-
 static guint64
 get_part_bytes_written_impl(
     XferDestTaper *xdtself G_GNUC_UNUSED)
@@ -418,6 +433,7 @@ instance_init(
     self->conn = NULL;
     self->state_mutex = g_mutex_new();
     self->paused_cond = g_cond_new();
+    self->abort_cond = g_cond_new();
 }
 
 static void
@@ -440,6 +456,7 @@ finalize_impl(
 
     g_mutex_free(self->state_mutex);
     g_cond_free(self->paused_cond);
+    g_cond_free(self->abort_cond);
 
     if (self->part_header)
        dumpfile_free(self->part_header);
@@ -466,7 +483,6 @@ class_init(
     klass->cancel = cancel_impl;
     xdt_klass->start_part = start_part_impl;
     xdt_klass->use_device = use_device_impl;
-    xdt_klass->cache_inform = cache_inform_impl;
     xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
     goc->finalize = finalize_impl;