/*
- * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
+ * Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
* Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
*/
+#include "amanda.h"
#include "amxfer.h"
#include "element-glue.h"
-#include "amanda.h"
#include "arglist.h"
/* XMsgSource objects are GSource "subclasses" which manage
xfer->status = XFER_INIT;
xfer->status_mutex = g_mutex_new();
xfer->status_cond = g_cond_new();
+ xfer->fd_mutex = g_mutex_new();
xfer->refcount = 1;
xfer->repr = NULL;
g_mutex_free(xfer->status_mutex);
g_cond_free(xfer->status_cond);
+ g_mutex_free(xfer->fd_mutex);
/* Free our references to the elements, and also set the 'xfer'
* attribute of each to NULL, making them "unattached" (although
void
xfer_start(
- Xfer *xfer)
+ Xfer *xfer,
+ gint64 offset G_GNUC_UNUSED,
+ gint64 size)
{
unsigned int len;
unsigned int i;
g_assert(xfer != NULL);
g_assert(xfer->status == XFER_INIT);
g_assert(xfer->elements->len >= 2);
+ g_assert(offset == 0);
g_debug("Starting %s", xfer_repr(xfer));
/* set the status to XFER_START and add a reference to our count, so that
elt->downstream = g_ptr_array_index(xfer->elements, i+1);
}
+ /* Set size for first element */
+ if (size) {
+ XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, 0);
+ xfer_element_set_size(xe, size);
+ }
+
/* now tell them all to start, in order from destination to source */
for (i = xfer->elements->len; i >= 1; i--) {
XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1);
{
/* Since xfer_cancel can be called from any thread, we just send a message.
* The action takes place when the message is received. */
- XferElement *src = g_ptr_array_index(xfer->elements, 0);
+ XferElement *src;
+ if (xfer->cancelled > 0) return;
+ xfer->cancelled++;
+ src = g_ptr_array_index(xfer->elements, 0);
xfer_queue_message(xfer, xmsg_new(src, XMSG_CANCEL, 0));
}
xfer_cancel(elt->xfer);
}
+gint
+xfer_atomic_swap_fd(Xfer *xfer, gint *fdp, gint newfd)
+{
+ gint rv;
+
+ if (xfer)
+ g_mutex_lock(xfer->fd_mutex);
+ rv = *fdp;
+ *fdp = newfd;
+ if (xfer)
+ g_mutex_unlock(xfer->fd_mutex);
+
+ return rv;
+}