Imported Upstream version 3.3.3
[debian/amanda] / device-src / xfer-source-recovery.c
index f305f0cb7f1e23fd91b2a5f99a6416d78563bcf1..112301cd462c00597b61469f55d8540a0729b411 100644 (file)
@@ -2,9 +2,10 @@
  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
  * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
  *
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 as published
- * by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
  *
  * This program is distributed in the hope that it will be useful, but
  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
@@ -55,7 +56,7 @@ typedef struct XferSourceRecovery {
     GThread *thread;
 
     /* this mutex in this condition variable governs all variables below */
-    GCond *start_part_cond;
+    GCond  *start_part_cond;
     GMutex *start_part_mutex;
 
     /* is this device currently paused and awaiting a new part? */
@@ -87,6 +88,8 @@ typedef struct XferSourceRecovery {
     GTimer *part_timer;
 
     gint64   size;
+
+    GCond *abort_cond; /* condition to trigger to abort ndmp command */
 } XferSourceRecovery;
 
 /*
@@ -135,6 +138,7 @@ directtcp_common_thread(
 {
     XferElement *elt = XFER_ELEMENT(self);
     char *errmsg = NULL;
+    int result;
 
     /* send XMSG_READY to indicate it's OK to call start_part now */
     DBG(2, "sending XMSG_READY");
@@ -165,11 +169,17 @@ directtcp_common_thread(
 
        while (1) {
            DBG(2, "reading part from %s", self->device->device_name);
-           if (!device_read_to_connection(self->device, G_MAXUINT64, &actual_size)) {
+           result = device_read_to_connection(self->device, G_MAXUINT64,
+                       &actual_size, &elt->cancelled,
+                       self->start_part_mutex, self->abort_cond);
+           if (result == 1 && !elt->cancelled) {
                xfer_cancel_with_error(elt, _("error reading from device: %s"),
                    device_error_or_status(self->device));
                g_mutex_unlock(self->start_part_mutex);
                goto close_conn_and_send_done;
+           } else if (result == 2 || elt->cancelled) {
+               g_mutex_unlock(self->start_part_mutex);
+               goto close_conn_and_send_done;
            }
 
            /* break on EOF; otherwise do another read_to_connection */
@@ -223,6 +233,7 @@ directtcp_connect_thread(
 {
     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
     XferElement *elt = XFER_ELEMENT(self);
+    int result;
 
     DBG(1, "(this is directtcp_connect_thread)")
 
@@ -241,13 +252,18 @@ directtcp_connect_thread(
     g_assert(self->listen_ok);
 
     DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
-    if (!device_accept(self->device, &self->conn, NULL, NULL)) {
+    result = device_accept(self->device, &self->conn, &elt->cancelled,
+                          self->start_part_mutex, self->abort_cond);
+    if (result == 1 && !elt->cancelled) {
        xfer_cancel_with_error(elt,
            _("error accepting DirectTCP connection: %s"),
            device_error_or_status(self->device));
        g_mutex_unlock(self->start_part_mutex);
        wait_until_xfer_cancelled(elt->xfer);
        goto send_done;
+    } else if (result == 2 || elt->cancelled) {
+       g_mutex_unlock(self->start_part_mutex);
+       goto send_done;
     }
     DBG(2, "DirectTCP connection accepted");
 
@@ -264,6 +280,7 @@ directtcp_listen_thread(
 {
     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
     XferElement *elt = XFER_ELEMENT(self);
+    int result;
 
     DBG(1, "(this is directtcp_listen_thread)");
 
@@ -281,14 +298,21 @@ directtcp_listen_thread(
     g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */
 
     DBG(2, "making DirectTCP connection on device %s", self->device->device_name);
-    if (!device_connect(self->device, FALSE, elt->downstream->input_listen_addrs,
-                       &self->conn, NULL, NULL)) {
+    result = device_connect(self->device, FALSE,
+                           elt->downstream->input_listen_addrs,
+                           &self->conn, &elt->cancelled,
+                           self->start_part_mutex, self->abort_cond);
+    if (result == 1 && !elt->cancelled) {
        xfer_cancel_with_error(elt,
            _("error making DirectTCP connection: %s"),
            device_error_or_status(self->device));
        g_mutex_unlock(self->start_part_mutex);
        wait_until_xfer_cancelled(elt->xfer);
        goto send_done;
+    } else if (result == 2 || elt->cancelled) {
+       g_mutex_unlock(self->start_part_mutex);
+       wait_until_xfer_cancelled(elt->xfer);
+       goto send_done;
     }
     DBG(2, "DirectTCP connect succeeded");
 
@@ -484,6 +508,7 @@ cancel_impl(
     /* trigger the condition variable, in case the thread is waiting on it */
     g_mutex_lock(self->start_part_mutex);
     g_cond_broadcast(self->start_part_cond);
+    g_cond_broadcast(self->abort_cond);
     g_mutex_unlock(self->start_part_mutex);
 
     return TRUE;
@@ -600,6 +625,7 @@ finalize_impl(
        g_object_unref(self->device);
 
     g_cond_free(self->start_part_cond);
+    g_cond_free(self->abort_cond);
     g_mutex_free(self->start_part_mutex);
 }
 
@@ -611,6 +637,7 @@ instance_init(
 
     self->paused = TRUE;
     self->start_part_cond = g_cond_new();
+    self->abort_cond = g_cond_new();
     self->start_part_mutex = g_mutex_new();
 }