/*
- * Copyright (c) 2008 Zmanda, Inc. All Rights Reserved.
+ * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
*
- * This library is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License version 2.1 as
- * published by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation.
*
- * This library is distributed in the hope that it will be useful, but
+ * This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
- * License for more details.
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
+ * for more details.
*
- * You should have received a copy of the GNU Lesser General Public License
- * along with this library; if not, write to the Free Software Foundation,
- * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
- * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120
+ * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
* Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
*/
+#include "amanda.h"
#include "amxfer.h"
#include "element-glue.h"
-#include "amanda.h"
+#include "arglist.h"
/* XMsgSource objects are GSource "subclasses" which manage
* a queue of messages, delivering those messages via callback
xfer->status = XFER_INIT;
xfer->status_mutex = g_mutex_new();
xfer->status_cond = g_cond_new();
+ xfer->fd_mutex = g_mutex_new();
xfer->refcount = 1;
xfer->repr = NULL;
/* Create our message source and corresponding queue */
xfer->msg_source = xmsgsource_new(xfer);
- g_source_ref((GSource *)xfer->msg_source);
xfer->queue = g_async_queue_new();
/* copy the elements in, verifying that they're all XferElement objects */
g_mutex_free(xfer->status_mutex);
g_cond_free(xfer->status_cond);
+ g_mutex_free(xfer->fd_mutex);
/* Free our references to the elements, and also set the 'xfer'
* attribute of each to NULL, making them "unattached" (although
elt->xfer = NULL;
g_object_unref(elt);
}
+ g_ptr_array_free(xfer->elements, TRUE);
+
+ if (xfer->repr)
+ g_free(xfer->repr);
g_free(xfer);
}
void
xfer_start(
- Xfer *xfer)
+ Xfer *xfer,
+ gint64 offset G_GNUC_UNUSED,
+ gint64 size)
{
unsigned int len;
unsigned int i;
- XferElement *xe;
+ gboolean setup_ok;
g_assert(xfer != NULL);
g_assert(xfer->status == XFER_INIT);
g_assert(xfer->elements->len >= 2);
+ g_assert(offset == 0);
g_debug("Starting %s", xfer_repr(xfer));
/* set the status to XFER_START and add a reference to our count, so that
xfer->num_active_elements = 0;
xfer_set_status(xfer, XFER_START);
- /* check that the first element is an XferSource and the last is an XferDest.
- * A source is identified by having no input mechanisms. */
- xe = (XferElement *)g_ptr_array_index(xfer->elements, 0);
- if (XFER_ELEMENT_GET_CLASS(xe)->mech_pairs[0].input_mech != XFER_MECH_NONE)
- error("Transfer element 0 is not a transfer source");
-
- /* Similarly, a destination has no output mechanisms. */
- xe = (XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1);
- if (XFER_ELEMENT_GET_CLASS(xe)->mech_pairs[0].output_mech != XFER_MECH_NONE)
- error("Last transfer element is not a transfer destination");
-
/* Link the elements. This calls error() on failure, and rewrites
* xfer->elements */
link_elements(xfer);
/* Tell all elements to set up. This is done before upstream and downstream
* are set so that elements cannot interfere with one another before setup()
* is completed. */
+ setup_ok = TRUE;
for (i = 0; i < xfer->elements->len; i++) {
XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i);
- xfer_element_setup(xe);
+ if (!xfer_element_setup(xe)) {
+ setup_ok = FALSE;
+ break;
+ }
}
- /* Set the upstream and downstream links between elements */
- len = xfer->elements->len;
- for (i = 0; i < len; i++) {
- XferElement *elt = g_ptr_array_index(xfer->elements, i);
+ /* If setup_ok is false, then there is an XMSG_CANCEL in the message queue
+ * already, so skip calling start for any of the elements and send an
+ * XMSG_DONE, since none of the elements will do so. */
- if (i > 0)
- elt->upstream = g_ptr_array_index(xfer->elements, i-1);
- if (i < len-1)
- elt->downstream = g_ptr_array_index(xfer->elements, i+1);
- }
+ if (setup_ok) {
+ /* Set the upstream and downstream links between elements */
+ len = xfer->elements->len;
+ for (i = 0; i < len; i++) {
+ XferElement *elt = g_ptr_array_index(xfer->elements, i);
+
+ if (i > 0)
+ elt->upstream = g_ptr_array_index(xfer->elements, i-1);
+ if (i < len-1)
+ elt->downstream = g_ptr_array_index(xfer->elements, i+1);
+ }
- /* now tell them all to start, in order from destination to source */
- for (i = xfer->elements->len; i >= 1; i--) {
- XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1);
- if (xfer_element_start(xe))
- xfer->num_active_elements++;
+ /* Set size for first element */
+ if (size) {
+ XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, 0);
+ xfer_element_set_size(xe, size);
+ }
+
+ /* now tell them all to start, in order from destination to source */
+ for (i = xfer->elements->len; i >= 1; i--) {
+ XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1);
+ if (xfer_element_start(xe))
+ xfer->num_active_elements++;
+ }
}
/* (note that status can only change in the main thread, so we can be
* be done already. We send a "fake" XMSG_DONE from the destination element,
* so that all of the usual processing will take place. */
if (xfer->num_active_elements == 0) {
- g_debug("%s has no active elements; generating fake XMSG_DONE", xfer_repr(xfer));
+ if (setup_ok)
+ g_debug("%s has no active elements; generating fake XMSG_DONE", xfer_repr(xfer));
xfer->num_active_elements++;
xfer_queue_message(xfer,
xmsg_new((XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1),
* and find the optimal overall linkage. */
typedef struct linkage {
XferElement *elt;
+ xfer_element_mech_pair_t *mech_pairs;
int elt_idx; /* index into elt's mech_pairs */
int glue_idx; /* index into glue pairs for elt's output; -1 = no glue */
} linkage;
case XFER_MECH_WRITEFD: return "WRITEFD";
case XFER_MECH_PULL_BUFFER: return "PULL_BUFFER";
case XFER_MECH_PUSH_BUFFER: return "PUSH_BUFFER";
+ case XFER_MECH_DIRECTTCP_LISTEN: return "DIRECTTCP_LISTEN";
+ case XFER_MECH_DIRECTTCP_CONNECT: return "DIRECTTCP_CONNECT";
default: return "UNKNOWN";
}
}
/* recurse for each linkage we can make that starts with input_mech */
my = &st->cur[idx];
- elt_pairs = XFER_ELEMENT_GET_CLASS(my->elt)->mech_pairs;
+ elt_pairs = my->mech_pairs;
glue_pairs = xfer_element_glue_mech_pairs;
for (my->elt_idx = 0;
{
GPtrArray *new_elements;
XferElement *elt;
- XferElementClass *eltc;
char *linkage_str;
linking_state st;
gint i, len;
st.best_cost = MAX_COST;
for (i = 0; i < st.nlinks; i++) {
st.cur[i].elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
+ st.cur[i].mech_pairs = xfer_element_get_mech_pairs(st.cur[i].elt);
}
+ /* check that the first element is an XferSource and the last is an XferDest.
+ * A source is identified by having no input mechanisms. */
+ if (st.cur[0].mech_pairs[0].input_mech != XFER_MECH_NONE)
+ error("Transfer element 0 is not a transfer source");
+
+ /* Similarly, a destination has no output mechanisms. */
+ if (st.cur[st.nlinks-1].mech_pairs[0].output_mech != XFER_MECH_NONE)
+ error("Last transfer element is not a transfer destination");
+
/* start recursing with the first element, asserting that its input mech is NONE */
link_recurse(&st, 0, XFER_MECH_NONE, 0);
new_elements = g_ptr_array_sized_new(xfer->elements->len);
for (i = 0; i < st.nlinks; i++) {
elt = st.best[i].elt;
- eltc = XFER_ELEMENT_GET_CLASS(elt);
- elt->input_mech = eltc->mech_pairs[st.best[i].elt_idx].input_mech;
- elt->output_mech = eltc->mech_pairs[st.best[i].elt_idx].output_mech;
+ elt->input_mech = st.best[i].mech_pairs[st.best[i].elt_idx].input_mech;
+ elt->output_mech = st.best[i].mech_pairs[st.best[i].elt_idx].output_mech;
g_ptr_array_add(new_elements, elt);
if (st.best[i].glue_idx != -1) {
elt = xfer_element_glue();
- eltc = XFER_ELEMENT_GET_CLASS(elt);
elt->xfer = xfer;
- elt->input_mech = eltc->mech_pairs[st.best[i].glue_idx].input_mech;
- elt->output_mech = eltc->mech_pairs[st.best[i].glue_idx].output_mech;
+ elt->input_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].input_mech;
+ elt->output_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].output_mech;
g_ptr_array_add(new_elements, elt);
}
}
return xms;
}
+
+xfer_status
+wait_until_xfer_cancelled(
+ Xfer *xfer)
+{
+ xfer_status seen_status;
+ g_assert(xfer != NULL);
+
+ g_mutex_lock(xfer->status_mutex);
+ while (xfer->status != XFER_CANCELLED && xfer->status != XFER_DONE)
+ g_cond_wait(xfer->status_cond, xfer->status_mutex);
+ seen_status = xfer->status;
+ g_mutex_unlock(xfer->status_mutex);
+
+ return seen_status;
+}
+
+xfer_status
+wait_until_xfer_running(
+ Xfer *xfer)
+{
+ xfer_status seen_status;
+ g_assert(xfer != NULL);
+
+ g_mutex_lock(xfer->status_mutex);
+ while (xfer->status == XFER_START)
+ g_cond_wait(xfer->status_cond, xfer->status_mutex);
+ seen_status = xfer->status;
+ g_mutex_unlock(xfer->status_mutex);
+
+ return seen_status;
+}
+
+void
+xfer_cancel_with_error(
+ XferElement *elt,
+ const char *fmt,
+ ...)
+{
+ va_list argp;
+ XMsg *msg;
+
+ g_assert(elt != NULL);
+ g_assert(elt->xfer != NULL);
+
+ msg = xmsg_new(elt, XMSG_ERROR, 0);
+
+ arglist_start(argp, fmt);
+ msg->message = g_strdup_vprintf(fmt, argp);
+ arglist_end(argp);
+
+ /* send the XMSG_ERROR */
+ xfer_queue_message(elt->xfer, msg);
+
+ /* cancel the transfer */
+ xfer_cancel(elt->xfer);
+}
+
+gint
+xfer_atomic_swap_fd(Xfer *xfer, gint *fdp, gint newfd)
+{
+ gint rv;
+
+ if (xfer)
+ g_mutex_lock(xfer->fd_mutex);
+ rv = *fdp;
+ *fdp = newfd;
+ if (xfer)
+ g_mutex_unlock(xfer->fd_mutex);
+
+ return rv;
+}