X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=xfer-src%2Fxfer.c;h=55dbd08729da71c4b783a6db3b81e237218ed352;hb=fd48f3e498442f0cbff5f3606c7c403d0566150e;hp=b9b1800f8d2640aaf1de87d5d772e8cba3c45a13;hpb=96f35b20267e8b1a1c846d476f27fcd330e0b018;p=debian%2Famanda diff --git a/xfer-src/xfer.c b/xfer-src/xfer.c index b9b1800..55dbd08 100644 --- a/xfer-src/xfer.c +++ b/xfer-src/xfer.c @@ -1,26 +1,27 @@ /* - * Copyright (c) 2008 Zmanda, Inc. All Rights Reserved. + * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved. * - * This library is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License version 2.1 as - * published by the Free Software Foundation. + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation. * - * This library is distributed in the hope that it will be useful, but + * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - * License for more details. + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. * - * You should have received a copy of the GNU Lesser General Public License - * along with this library; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA * - * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120 + * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com */ #include "amxfer.h" #include "element-glue.h" #include "amanda.h" +#include "arglist.h" /* XMsgSource objects are GSource "subclasses" which manage * a queue of messages, delivering those messages via callback @@ -61,7 +62,6 @@ xfer_new( /* Create our message source and corresponding queue */ xfer->msg_source = xmsgsource_new(xfer); - g_source_ref((GSource *)xfer->msg_source); xfer->queue = g_async_queue_new(); /* copy the elements in, verifying that they're all XferElement objects */ @@ -126,6 +126,10 @@ xfer_unref( elt->xfer = NULL; g_object_unref(elt); } + g_ptr_array_free(xfer->elements, TRUE); + + if (xfer->repr) + g_free(xfer->repr); g_free(xfer); } @@ -176,7 +180,7 @@ xfer_start( { unsigned int len; unsigned int i; - XferElement *xe; + gboolean setup_ok; g_assert(xfer != NULL); g_assert(xfer->status == XFER_INIT); @@ -190,17 +194,6 @@ xfer_start( xfer->num_active_elements = 0; xfer_set_status(xfer, XFER_START); - /* check that the first element is an XferSource and the last is an XferDest. - * A source is identified by having no input mechanisms. */ - xe = (XferElement *)g_ptr_array_index(xfer->elements, 0); - if (XFER_ELEMENT_GET_CLASS(xe)->mech_pairs[0].input_mech != XFER_MECH_NONE) - error("Transfer element 0 is not a transfer source"); - - /* Similarly, a destination has no output mechanisms. */ - xe = (XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1); - if (XFER_ELEMENT_GET_CLASS(xe)->mech_pairs[0].output_mech != XFER_MECH_NONE) - error("Last transfer element is not a transfer destination"); - /* Link the elements. This calls error() on failure, and rewrites * xfer->elements */ link_elements(xfer); @@ -208,27 +201,37 @@ xfer_start( /* Tell all elements to set up. This is done before upstream and downstream * are set so that elements cannot interfere with one another before setup() * is completed. */ + setup_ok = TRUE; for (i = 0; i < xfer->elements->len; i++) { XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i); - xfer_element_setup(xe); + if (!xfer_element_setup(xe)) { + setup_ok = FALSE; + break; + } } - /* Set the upstream and downstream links between elements */ - len = xfer->elements->len; - for (i = 0; i < len; i++) { - XferElement *elt = g_ptr_array_index(xfer->elements, i); + /* If setup_ok is false, then there is an XMSG_CANCEL in the message queue + * already, so skip calling start for any of the elements and send an + * XMSG_DONE, since none of the elements will do so. */ - if (i > 0) - elt->upstream = g_ptr_array_index(xfer->elements, i-1); - if (i < len-1) - elt->downstream = g_ptr_array_index(xfer->elements, i+1); - } + if (setup_ok) { + /* Set the upstream and downstream links between elements */ + len = xfer->elements->len; + for (i = 0; i < len; i++) { + XferElement *elt = g_ptr_array_index(xfer->elements, i); - /* now tell them all to start, in order from destination to source */ - for (i = xfer->elements->len; i >= 1; i--) { - XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1); - if (xfer_element_start(xe)) - xfer->num_active_elements++; + if (i > 0) + elt->upstream = g_ptr_array_index(xfer->elements, i-1); + if (i < len-1) + elt->downstream = g_ptr_array_index(xfer->elements, i+1); + } + + /* now tell them all to start, in order from destination to source */ + for (i = xfer->elements->len; i >= 1; i--) { + XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1); + if (xfer_element_start(xe)) + xfer->num_active_elements++; + } } /* (note that status can only change in the main thread, so we can be @@ -240,7 +243,8 @@ xfer_start( * be done already. We send a "fake" XMSG_DONE from the destination element, * so that all of the usual processing will take place. */ if (xfer->num_active_elements == 0) { - g_debug("%s has no active elements; generating fake XMSG_DONE", xfer_repr(xfer)); + if (setup_ok) + g_debug("%s has no active elements; generating fake XMSG_DONE", xfer_repr(xfer)); xfer->num_active_elements++; xfer_queue_message(xfer, xmsg_new((XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1), @@ -302,6 +306,7 @@ xfer_set_status( * and find the optimal overall linkage. */ typedef struct linkage { XferElement *elt; + xfer_element_mech_pair_t *mech_pairs; int elt_idx; /* index into elt's mech_pairs */ int glue_idx; /* index into glue pairs for elt's output; -1 = no glue */ } linkage; @@ -326,6 +331,8 @@ xfer_mech_name( case XFER_MECH_WRITEFD: return "WRITEFD"; case XFER_MECH_PULL_BUFFER: return "PULL_BUFFER"; case XFER_MECH_PUSH_BUFFER: return "PUSH_BUFFER"; + case XFER_MECH_DIRECTTCP_LISTEN: return "DIRECTTCP_LISTEN"; + case XFER_MECH_DIRECTTCP_CONNECT: return "DIRECTTCP_CONNECT"; default: return "UNKNOWN"; } } @@ -373,7 +380,7 @@ link_recurse( /* recurse for each linkage we can make that starts with input_mech */ my = &st->cur[idx]; - elt_pairs = XFER_ELEMENT_GET_CLASS(my->elt)->mech_pairs; + elt_pairs = my->mech_pairs; glue_pairs = xfer_element_glue_mech_pairs; for (my->elt_idx = 0; @@ -414,7 +421,6 @@ link_elements( { GPtrArray *new_elements; XferElement *elt; - XferElementClass *eltc; char *linkage_str; linking_state st; gint i, len; @@ -431,8 +437,18 @@ link_elements( st.best_cost = MAX_COST; for (i = 0; i < st.nlinks; i++) { st.cur[i].elt = (XferElement *)g_ptr_array_index(xfer->elements, i); + st.cur[i].mech_pairs = xfer_element_get_mech_pairs(st.cur[i].elt); } + /* check that the first element is an XferSource and the last is an XferDest. + * A source is identified by having no input mechanisms. */ + if (st.cur[0].mech_pairs[0].input_mech != XFER_MECH_NONE) + error("Transfer element 0 is not a transfer source"); + + /* Similarly, a destination has no output mechanisms. */ + if (st.cur[st.nlinks-1].mech_pairs[0].output_mech != XFER_MECH_NONE) + error("Last transfer element is not a transfer destination"); + /* start recursing with the first element, asserting that its input mech is NONE */ link_recurse(&st, 0, XFER_MECH_NONE, 0); @@ -446,17 +462,15 @@ link_elements( new_elements = g_ptr_array_sized_new(xfer->elements->len); for (i = 0; i < st.nlinks; i++) { elt = st.best[i].elt; - eltc = XFER_ELEMENT_GET_CLASS(elt); - elt->input_mech = eltc->mech_pairs[st.best[i].elt_idx].input_mech; - elt->output_mech = eltc->mech_pairs[st.best[i].elt_idx].output_mech; + elt->input_mech = st.best[i].mech_pairs[st.best[i].elt_idx].input_mech; + elt->output_mech = st.best[i].mech_pairs[st.best[i].elt_idx].output_mech; g_ptr_array_add(new_elements, elt); if (st.best[i].glue_idx != -1) { elt = xfer_element_glue(); - eltc = XFER_ELEMENT_GET_CLASS(elt); elt->xfer = xfer; - elt->input_mech = eltc->mech_pairs[st.best[i].glue_idx].input_mech; - elt->output_mech = eltc->mech_pairs[st.best[i].glue_idx].output_mech; + elt->input_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].input_mech; + elt->output_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].output_mech; g_ptr_array_add(new_elements, elt); } } @@ -626,3 +640,61 @@ xmsgsource_new( return xms; } + +xfer_status +wait_until_xfer_cancelled( + Xfer *xfer) +{ + xfer_status seen_status; + g_assert(xfer != NULL); + + g_mutex_lock(xfer->status_mutex); + while (xfer->status != XFER_CANCELLED && xfer->status != XFER_DONE) + g_cond_wait(xfer->status_cond, xfer->status_mutex); + seen_status = xfer->status; + g_mutex_unlock(xfer->status_mutex); + + return seen_status; +} + +xfer_status +wait_until_xfer_running( + Xfer *xfer) +{ + xfer_status seen_status; + g_assert(xfer != NULL); + + g_mutex_lock(xfer->status_mutex); + while (xfer->status == XFER_START) + g_cond_wait(xfer->status_cond, xfer->status_mutex); + seen_status = xfer->status; + g_mutex_unlock(xfer->status_mutex); + + return seen_status; +} + +void +xfer_cancel_with_error( + XferElement *elt, + const char *fmt, + ...) +{ + va_list argp; + XMsg *msg; + + g_assert(elt != NULL); + g_assert(elt->xfer != NULL); + + msg = xmsg_new(elt, XMSG_ERROR, 0); + + arglist_start(argp, fmt); + msg->message = g_strdup_vprintf(fmt, argp); + arglist_end(argp); + + /* send the XMSG_ERROR */ + xfer_queue_message(elt->xfer, msg); + + /* cancel the transfer */ + xfer_cancel(elt->xfer); +} +