Imported Upstream version 3.1.0
[debian/amanda] / xfer-src / xfer.c
index b9b1800f8d2640aaf1de87d5d772e8cba3c45a13..55dbd08729da71c4b783a6db3b81e237218ed352 100644 (file)
@@ -1,26 +1,27 @@
 /*
- * Copyright (c) 2008 Zmanda, Inc.  All Rights Reserved.
+ * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
  *
- * This library is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License version 2.1 as
- * published by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation.
  *
- * This library is distributed in the hope that it will be useful, but
+ * This program is distributed in the hope that it will be useful, but
  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
- * License for more details.
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
  *
- * You should have received a copy of the GNU Lesser General Public License
- * along with this library; if not, write to the Free Software Foundation,
- * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
  *
- * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120
+ * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
  */
 
 #include "amxfer.h"
 #include "element-glue.h"
 #include "amanda.h"
+#include "arglist.h"
 
 /* XMsgSource objects are GSource "subclasses" which manage
  * a queue of messages, delivering those messages via callback
@@ -61,7 +62,6 @@ xfer_new(
 
     /* Create our message source and corresponding queue */
     xfer->msg_source = xmsgsource_new(xfer);
-    g_source_ref((GSource *)xfer->msg_source);
     xfer->queue = g_async_queue_new();
 
     /* copy the elements in, verifying that they're all XferElement objects */
@@ -126,6 +126,10 @@ xfer_unref(
        elt->xfer = NULL;
        g_object_unref(elt);
     }
+    g_ptr_array_free(xfer->elements, TRUE);
+
+    if (xfer->repr)
+       g_free(xfer->repr);
 
     g_free(xfer);
 }
@@ -176,7 +180,7 @@ xfer_start(
 {
     unsigned int len;
     unsigned int i;
-    XferElement *xe;
+    gboolean setup_ok;
 
     g_assert(xfer != NULL);
     g_assert(xfer->status == XFER_INIT);
@@ -190,17 +194,6 @@ xfer_start(
     xfer->num_active_elements = 0;
     xfer_set_status(xfer, XFER_START);
 
-    /* check that the first element is an XferSource and the last is an XferDest.
-     * A source is identified by having no input mechanisms. */
-    xe = (XferElement *)g_ptr_array_index(xfer->elements, 0);
-    if (XFER_ELEMENT_GET_CLASS(xe)->mech_pairs[0].input_mech != XFER_MECH_NONE)
-       error("Transfer element 0 is not a transfer source");
-
-    /* Similarly, a destination has no output mechanisms. */
-    xe = (XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1);
-    if (XFER_ELEMENT_GET_CLASS(xe)->mech_pairs[0].output_mech != XFER_MECH_NONE)
-       error("Last transfer element is not a transfer destination");
-
     /* Link the elements.  This calls error() on failure, and rewrites
      * xfer->elements */
     link_elements(xfer);
@@ -208,27 +201,37 @@ xfer_start(
     /* Tell all elements to set up.  This is done before upstream and downstream
      * are set so that elements cannot interfere with one another before setup()
      * is completed. */
+    setup_ok = TRUE;
     for (i = 0; i < xfer->elements->len; i++) {
        XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i);
-       xfer_element_setup(xe);
+       if (!xfer_element_setup(xe)) {
+           setup_ok = FALSE;
+           break;
+       }
     }
 
-    /* Set the upstream and downstream links between elements */
-    len = xfer->elements->len;
-    for (i = 0; i < len; i++) {
-       XferElement *elt = g_ptr_array_index(xfer->elements, i);
+    /* If setup_ok is false, then there is an XMSG_CANCEL in the message queue
+     * already, so skip calling start for any of the elements and send an
+     * XMSG_DONE, since none of the elements will do so. */
 
-       if (i > 0)
-           elt->upstream = g_ptr_array_index(xfer->elements, i-1);
-       if (i < len-1)
-           elt->downstream = g_ptr_array_index(xfer->elements, i+1);
-    }
+    if (setup_ok) {
+       /* Set the upstream and downstream links between elements */
+       len = xfer->elements->len;
+       for (i = 0; i < len; i++) {
+           XferElement *elt = g_ptr_array_index(xfer->elements, i);
 
-    /* now tell them all to start, in order from destination to source */
-    for (i = xfer->elements->len; i >= 1; i--) {
-       XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1);
-       if (xfer_element_start(xe))
-           xfer->num_active_elements++;
+           if (i > 0)
+               elt->upstream = g_ptr_array_index(xfer->elements, i-1);
+           if (i < len-1)
+               elt->downstream = g_ptr_array_index(xfer->elements, i+1);
+       }
+
+       /* now tell them all to start, in order from destination to source */
+       for (i = xfer->elements->len; i >= 1; i--) {
+           XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1);
+           if (xfer_element_start(xe))
+               xfer->num_active_elements++;
+       }
     }
 
     /* (note that status can only change in the main thread, so we can be
@@ -240,7 +243,8 @@ xfer_start(
      * be done already.  We send a "fake" XMSG_DONE from the destination element,
      * so that all of the usual processing will take place. */
     if (xfer->num_active_elements == 0) {
-       g_debug("%s has no active elements; generating fake XMSG_DONE", xfer_repr(xfer));
+       if (setup_ok)
+           g_debug("%s has no active elements; generating fake XMSG_DONE", xfer_repr(xfer));
        xfer->num_active_elements++;
        xfer_queue_message(xfer,
            xmsg_new((XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1),
@@ -302,6 +306,7 @@ xfer_set_status(
  * and find the optimal overall linkage. */
 typedef struct linkage {
     XferElement *elt;
+    xfer_element_mech_pair_t *mech_pairs;
     int elt_idx; /* index into elt's mech_pairs */
     int glue_idx; /* index into glue pairs for elt's output; -1 = no glue */
 } linkage;
@@ -326,6 +331,8 @@ xfer_mech_name(
        case XFER_MECH_WRITEFD: return "WRITEFD";
        case XFER_MECH_PULL_BUFFER: return "PULL_BUFFER";
        case XFER_MECH_PUSH_BUFFER: return "PUSH_BUFFER";
+       case XFER_MECH_DIRECTTCP_LISTEN: return "DIRECTTCP_LISTEN";
+       case XFER_MECH_DIRECTTCP_CONNECT: return "DIRECTTCP_CONNECT";
        default: return "UNKNOWN";
     }
 }
@@ -373,7 +380,7 @@ link_recurse(
 
     /* recurse for each linkage we can make that starts with input_mech */
     my = &st->cur[idx];
-    elt_pairs = XFER_ELEMENT_GET_CLASS(my->elt)->mech_pairs;
+    elt_pairs = my->mech_pairs;
     glue_pairs = xfer_element_glue_mech_pairs;
 
     for (my->elt_idx = 0;
@@ -414,7 +421,6 @@ link_elements(
 {
     GPtrArray *new_elements;
     XferElement *elt;
-    XferElementClass *eltc;
     char *linkage_str;
     linking_state st;
     gint i, len;
@@ -431,8 +437,18 @@ link_elements(
     st.best_cost = MAX_COST;
     for (i = 0; i < st.nlinks; i++) {
        st.cur[i].elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
+       st.cur[i].mech_pairs = xfer_element_get_mech_pairs(st.cur[i].elt);
     }
 
+    /* check that the first element is an XferSource and the last is an XferDest.
+     * A source is identified by having no input mechanisms. */
+    if (st.cur[0].mech_pairs[0].input_mech != XFER_MECH_NONE)
+       error("Transfer element 0 is not a transfer source");
+
+    /* Similarly, a destination has no output mechanisms. */
+    if (st.cur[st.nlinks-1].mech_pairs[0].output_mech != XFER_MECH_NONE)
+       error("Last transfer element is not a transfer destination");
+
     /* start recursing with the first element, asserting that its input mech is NONE */
     link_recurse(&st, 0, XFER_MECH_NONE, 0);
 
@@ -446,17 +462,15 @@ link_elements(
     new_elements = g_ptr_array_sized_new(xfer->elements->len);
     for (i = 0; i < st.nlinks; i++) {
        elt = st.best[i].elt;
-       eltc = XFER_ELEMENT_GET_CLASS(elt);
-       elt->input_mech = eltc->mech_pairs[st.best[i].elt_idx].input_mech;
-       elt->output_mech = eltc->mech_pairs[st.best[i].elt_idx].output_mech;
+       elt->input_mech = st.best[i].mech_pairs[st.best[i].elt_idx].input_mech;
+       elt->output_mech = st.best[i].mech_pairs[st.best[i].elt_idx].output_mech;
        g_ptr_array_add(new_elements, elt);
 
        if (st.best[i].glue_idx != -1) {
            elt = xfer_element_glue();
-           eltc = XFER_ELEMENT_GET_CLASS(elt);
            elt->xfer = xfer;
-           elt->input_mech = eltc->mech_pairs[st.best[i].glue_idx].input_mech;
-           elt->output_mech = eltc->mech_pairs[st.best[i].glue_idx].output_mech;
+           elt->input_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].input_mech;
+           elt->output_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].output_mech;
            g_ptr_array_add(new_elements, elt);
        }
     }
@@ -626,3 +640,61 @@ xmsgsource_new(
 
     return xms;
 }
+
+xfer_status
+wait_until_xfer_cancelled(
+    Xfer *xfer)
+{
+    xfer_status seen_status;
+    g_assert(xfer != NULL);
+
+    g_mutex_lock(xfer->status_mutex);
+    while (xfer->status != XFER_CANCELLED && xfer->status != XFER_DONE)
+       g_cond_wait(xfer->status_cond, xfer->status_mutex);
+    seen_status = xfer->status;
+    g_mutex_unlock(xfer->status_mutex);
+
+    return seen_status;
+}
+
+xfer_status
+wait_until_xfer_running(
+    Xfer *xfer)
+{
+    xfer_status seen_status;
+    g_assert(xfer != NULL);
+
+    g_mutex_lock(xfer->status_mutex);
+    while (xfer->status == XFER_START)
+       g_cond_wait(xfer->status_cond, xfer->status_mutex);
+    seen_status = xfer->status;
+    g_mutex_unlock(xfer->status_mutex);
+
+    return seen_status;
+}
+
+void
+xfer_cancel_with_error(
+    XferElement *elt,
+    const char *fmt,
+    ...)
+{
+    va_list argp;
+    XMsg *msg;
+
+    g_assert(elt != NULL);
+    g_assert(elt->xfer != NULL);
+
+    msg = xmsg_new(elt, XMSG_ERROR, 0);
+
+    arglist_start(argp, fmt);
+    msg->message = g_strdup_vprintf(fmt, argp);
+    arglist_end(argp);
+
+    /* send the XMSG_ERROR */
+    xfer_queue_message(elt->xfer, msg);
+
+    /* cancel the transfer */
+    xfer_cancel(elt->xfer);
+}
+