Imported Upstream version 3.3.2
[debian/amanda] / xfer-src / xfer.c
index 55dbd08729da71c4b783a6db3b81e237218ed352..97b71473642859dc6bcad874904bff4dcdc66171 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
+ * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
  *
  * This program is free software; you can redistribute it and/or modify it
  * under the terms of the GNU General Public License version 2 as published
@@ -18,9 +18,9 @@
  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
  */
 
+#include "amanda.h"
 #include "amxfer.h"
 #include "element-glue.h"
-#include "amanda.h"
 #include "arglist.h"
 
 /* XMsgSource objects are GSource "subclasses" which manage
@@ -56,6 +56,7 @@ xfer_new(
     xfer->status = XFER_INIT;
     xfer->status_mutex = g_mutex_new();
     xfer->status_cond = g_cond_new();
+    xfer->fd_mutex = g_mutex_new();
 
     xfer->refcount = 1;
     xfer->repr = NULL;
@@ -116,6 +117,7 @@ xfer_unref(
 
     g_mutex_free(xfer->status_mutex);
     g_cond_free(xfer->status_cond);
+    g_mutex_free(xfer->fd_mutex);
 
     /* Free our references to the elements, and also set the 'xfer'
      * attribute of each to NULL, making them "unattached" (although 
@@ -176,7 +178,9 @@ xfer_repr(
 
 void
 xfer_start(
-    Xfer *xfer)
+    Xfer *xfer,
+    gint64 offset G_GNUC_UNUSED,
+    gint64 size)
 {
     unsigned int len;
     unsigned int i;
@@ -185,6 +189,7 @@ xfer_start(
     g_assert(xfer != NULL);
     g_assert(xfer->status == XFER_INIT);
     g_assert(xfer->elements->len >= 2);
+    g_assert(offset == 0);
 
     g_debug("Starting %s", xfer_repr(xfer));
     /* set the status to XFER_START and add a reference to our count, so that
@@ -226,6 +231,12 @@ xfer_start(
                elt->downstream = g_ptr_array_index(xfer->elements, i+1);
        }
 
+       /* Set size for first element */
+       if (size) {
+           XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, 0);
+           xfer_element_set_size(xe, size);
+       }
+
        /* now tell them all to start, in order from destination to source */
        for (i = xfer->elements->len; i >= 1; i--) {
            XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1);
@@ -258,7 +269,10 @@ xfer_cancel(
 {
     /* Since xfer_cancel can be called from any thread, we just send a message.
      * The action takes place when the message is received. */
-    XferElement *src = g_ptr_array_index(xfer->elements, 0);
+    XferElement *src;
+    if (xfer->cancelled > 0) return;
+    xfer->cancelled++;
+    src = g_ptr_array_index(xfer->elements, 0);
     xfer_queue_message(xfer, xmsg_new(src, XMSG_CANCEL, 0));
 }
 
@@ -698,3 +712,17 @@ xfer_cancel_with_error(
     xfer_cancel(elt->xfer);
 }
 
+gint
+xfer_atomic_swap_fd(Xfer *xfer, gint *fdp, gint newfd)
+{
+    gint rv;
+
+    if (xfer)
+       g_mutex_lock(xfer->fd_mutex);
+    rv = *fdp;
+    *fdp = newfd;
+    if (xfer)
+       g_mutex_unlock(xfer->fd_mutex);
+
+    return rv;
+}