Imported Upstream version 3.3.2
[debian/amanda] / device-src / xfer-dest-taper-directtcp.c
index 9036d22382bdcf282f8c06461d4d37e1591cca3e..91738d012c725d4f40c25c524553463d8d4da422 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
+ * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 as published
@@ -19,8 +19,8 @@
  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
  */
 
-#include "amxfer.h"
 #include "amanda.h"
+#include "amxfer.h"
 #include "xfer-device.h"
 #include "arglist.h"
 #include "conffile.h"
@@ -74,6 +74,7 @@ typedef struct XferDestTaperDirectTCP {
      * corresponding condition variable. */
     volatile gboolean paused;
     GCond *paused_cond;
+    GCond *abort_accept_cond; /* condition to trigger to abort an accept */
 
 } XferDestTaperDirectTCP;
 
@@ -95,7 +96,7 @@ _xdt_dbg(const char *fmt, ...)
     arglist_start(argp, fmt);
     g_vsnprintf(msg, sizeof(msg), fmt, argp);
     arglist_end(argp);
-    g_debug("XDT thd-%p: %s", g_thread_self(), msg);
+    g_debug("XDT: %s", msg);
 }
 /*
  * Worker Thread
@@ -108,6 +109,7 @@ worker_thread(
     XferElement *elt = (XferElement *)data;
     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)data;
     GTimer *timer = g_timer_new();
+    int result;
 
     /* This thread's job is to accept() an incoming connection, then call
      * write_from_connection for each part, and then close the connection */
@@ -124,17 +126,29 @@ worker_thread(
 
     /* first, accept a new connection from the device */
     DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
-    if (!device_accept(self->device, &self->conn, NULL, NULL)) {
+    result = device_accept_with_cond(self->device, &self->conn,
+                                    self->state_mutex,
+                                    self->abort_accept_cond);
+    if (result == 2) {
        xfer_cancel_with_error(XFER_ELEMENT(self),
            "accepting DirectTCP connection: %s",
            device_error_or_status(self->device));
        g_mutex_unlock(self->state_mutex);
        return NULL;
+    } else if (result == 1) {
+       g_mutex_unlock(self->state_mutex);
+       return NULL;
     }
 
     DBG(2, "connection accepted; sending XMSG_READY");
     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
 
+    /* round the part size up to the next multiple of the block size */
+    if (self->part_size) {
+       self->part_size += self->device->block_size-1;
+       self->part_size -= self->part_size % self->device->block_size;
+    }
+
     /* now loop until we're out of parts */
     while (1) {
        guint64 size;
@@ -209,6 +223,10 @@ worker_thread(
        msg->successful = TRUE;
        msg->eom = eom;
        msg->eof = eof;
+
+       /* time runs backward on some test boxes, so make sure this is positive */
+       if (msg->duration < 0) msg->duration = 0;
+
        xfer_queue_message(elt->xfer, msg);
 
         self->partnum++;
@@ -317,6 +335,7 @@ cancel_impl(
      * longer paused */
     g_mutex_lock(self->state_mutex);
     g_cond_broadcast(self->paused_cond);
+    g_cond_broadcast(self->abort_accept_cond);
     g_mutex_unlock(self->state_mutex);
 
     return rv;
@@ -388,16 +407,6 @@ use_device_impl(
     g_mutex_unlock(self->state_mutex);
 }
 
-static void
-cache_inform_impl(
-    XferDestTaper *xdtself G_GNUC_UNUSED,
-    const char *filename G_GNUC_UNUSED,
-    off_t offset G_GNUC_UNUSED,
-    off_t length G_GNUC_UNUSED)
-{
-    /* do nothing */
-}
-
 static guint64
 get_part_bytes_written_impl(
     XferDestTaper *xdtself G_GNUC_UNUSED)
@@ -418,6 +427,7 @@ instance_init(
     self->conn = NULL;
     self->state_mutex = g_mutex_new();
     self->paused_cond = g_cond_new();
+    self->abort_accept_cond = g_cond_new();
 }
 
 static void
@@ -440,6 +450,7 @@ finalize_impl(
 
     g_mutex_free(self->state_mutex);
     g_cond_free(self->paused_cond);
+    g_cond_free(self->abort_accept_cond);
 
     if (self->part_header)
        dumpfile_free(self->part_header);
@@ -466,7 +477,6 @@ class_init(
     klass->cancel = cancel_impl;
     xdt_klass->start_part = start_part_impl;
     xdt_klass->use_device = use_device_impl;
-    xdt_klass->cache_inform = cache_inform_impl;
     xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
     goc->finalize = finalize_impl;