2 * Copyright (c) 2008 Zmanda, Inc. All Rights Reserved.
4 * This library is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU Lesser General Public License version 2.1 as
6 * published by the Free Software Foundation.
8 * This library is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
11 * License for more details.
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this library; if not, write to the Free Software Foundation,
15 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
17 * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120
18 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
22 #include "element-glue.h"
25 /* XMsgSource objects are GSource "subclasses" which manage
26 * a queue of messages, delivering those messages via callback
27 * in the mainloop. Messages can be *sent* from any thread without
28 * any concern for locking, but must only be received in the main
29 * thread, in the default GMainContext.
31 * An XMsgSource pointer can be cast to a GSource pointer as
34 typedef struct XMsgSource {
35 GSource source; /* must be the first element of the struct */
39 /* forward prototypes */
40 static void xfer_set_status(Xfer *xfer, xfer_status status);
41 static XMsgSource *xmsgsource_new(Xfer *xfer);
42 static void link_elements(Xfer *xfer);
46 XferElement **elements,
47 unsigned int nelements)
49 Xfer *xfer = g_new0(Xfer, 1);
53 g_assert(nelements >= 2);
55 xfer->status = XFER_INIT;
56 xfer->status_mutex = g_mutex_new();
57 xfer->status_cond = g_cond_new();
62 /* Create our message source and corresponding queue */
63 xfer->msg_source = xmsgsource_new(xfer);
64 g_source_ref((GSource *)xfer->msg_source);
65 xfer->queue = g_async_queue_new();
67 /* copy the elements in, verifying that they're all XferElement objects */
68 xfer->elements = g_ptr_array_sized_new(nelements);
69 for (i = 0; i < nelements; i++) {
70 g_assert(elements[i] != NULL);
71 g_assert(IS_XFER_ELEMENT(elements[i]));
72 g_assert(elements[i]->xfer == NULL);
74 g_ptr_array_add(xfer->elements, (gpointer)elements[i]);
76 g_object_ref(elements[i]);
77 elements[i]->xfer = xfer;
97 if (!xfer) return; /* be friendly to NULLs */
99 if (--xfer->refcount > 0) return;
101 g_assert(xfer != NULL);
102 g_assert(xfer->status == XFER_INIT || xfer->status == XFER_DONE);
104 /* Divorce ourselves from the message source */
105 xfer->msg_source->xfer = NULL;
106 g_source_unref((GSource *)xfer->msg_source);
107 xfer->msg_source = NULL;
109 /* Try to empty the message queue */
110 while ((msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
111 g_warning("Dropping XMsg from %s because the XMsgSource is being destroyed",
112 xfer_element_repr(msg->elt));
115 g_async_queue_unref(xfer->queue);
117 g_mutex_free(xfer->status_mutex);
118 g_cond_free(xfer->status_cond);
120 /* Free our references to the elements, and also set the 'xfer'
121 * attribute of each to NULL, making them "unattached" (although
122 * subsequent reuse of elements is untested). */
123 for (i = 0; i < xfer->elements->len; i++) {
124 XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
137 return (GSource *)xfer->msg_source;
145 g_assert(xfer != NULL);
146 g_assert(msg != NULL);
148 g_async_queue_push(xfer->queue, (gpointer)msg);
150 /* TODO: don't do this if we're in the main thread */
151 g_main_context_wakeup(NULL);
161 xfer->repr = newvstrallocf(xfer->repr, "<Xfer@%p (", xfer);
162 for (i = 0; i < xfer->elements->len; i++) {
163 XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
164 xfer->repr = newvstralloc(xfer->repr,
165 xfer->repr, (i==0)?"":" -> ", xfer_element_repr(elt), NULL);
167 xfer->repr = newvstralloc(xfer->repr, xfer->repr, ")>", NULL);
181 g_assert(xfer != NULL);
182 g_assert(xfer->status == XFER_INIT);
183 g_assert(xfer->elements->len >= 2);
185 g_debug("Starting %s", xfer_repr(xfer));
186 /* set the status to XFER_START and add a reference to our count, so that
187 * we are not freed while still in operation. We'll drop this reference
188 * when the status becomes XFER_DONE. */
190 xfer->num_active_elements = 0;
191 xfer_set_status(xfer, XFER_START);
193 /* check that the first element is an XferSource and the last is an XferDest.
194 * A source is identified by having no input mechanisms. */
195 xe = (XferElement *)g_ptr_array_index(xfer->elements, 0);
196 if (XFER_ELEMENT_GET_CLASS(xe)->mech_pairs[0].input_mech != XFER_MECH_NONE)
197 error("Transfer element 0 is not a transfer source");
199 /* Similarly, a destination has no output mechanisms. */
200 xe = (XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1);
201 if (XFER_ELEMENT_GET_CLASS(xe)->mech_pairs[0].output_mech != XFER_MECH_NONE)
202 error("Last transfer element is not a transfer destination");
204 /* Link the elements. This calls error() on failure, and rewrites
208 /* Tell all elements to set up. This is done before upstream and downstream
209 * are set so that elements cannot interfere with one another before setup()
211 for (i = 0; i < xfer->elements->len; i++) {
212 XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i);
213 xfer_element_setup(xe);
216 /* Set the upstream and downstream links between elements */
217 len = xfer->elements->len;
218 for (i = 0; i < len; i++) {
219 XferElement *elt = g_ptr_array_index(xfer->elements, i);
222 elt->upstream = g_ptr_array_index(xfer->elements, i-1);
224 elt->downstream = g_ptr_array_index(xfer->elements, i+1);
227 /* now tell them all to start, in order from destination to source */
228 for (i = xfer->elements->len; i >= 1; i--) {
229 XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1);
230 if (xfer_element_start(xe))
231 xfer->num_active_elements++;
234 /* (note that status can only change in the main thread, so we can be
235 * certain that the status is still XFER_START and we have not yet been
236 * cancelled. We may have an XMSG_CANCEL already queued up for us, though) */
237 xfer_set_status(xfer, XFER_RUNNING);
239 /* If this transfer involves no active processing, then we consider it to
240 * be done already. We send a "fake" XMSG_DONE from the destination element,
241 * so that all of the usual processing will take place. */
242 if (xfer->num_active_elements == 0) {
243 g_debug("%s has no active elements; generating fake XMSG_DONE", xfer_repr(xfer));
244 xfer->num_active_elements++;
245 xfer_queue_message(xfer,
246 xmsg_new((XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1),
255 /* Since xfer_cancel can be called from any thread, we just send a message.
256 * The action takes place when the message is received. */
257 XferElement *src = g_ptr_array_index(xfer->elements, 0);
258 xfer_queue_message(xfer, xmsg_new(src, XMSG_CANCEL, 0));
266 if (xfer->status == status) return;
268 g_mutex_lock(xfer->status_mutex);
270 /* check that this state transition is valid */
273 g_assert(xfer->status == XFER_INIT);
276 g_assert(xfer->status == XFER_START);
278 case XFER_CANCELLING:
279 g_assert(xfer->status == XFER_RUNNING);
282 g_assert(xfer->status == XFER_CANCELLING);
285 g_assert(xfer->status == XFER_CANCELLED || xfer->status == XFER_RUNNING);
289 g_assert_not_reached();
292 xfer->status = status;
293 g_cond_broadcast(xfer->status_cond);
294 g_mutex_unlock(xfer->status_mutex);
301 /* How is ELT linked? link_recurse uses an array of these to track its progress
302 * and find the optimal overall linkage. */
303 typedef struct linkage {
305 int elt_idx; /* index into elt's mech_pairs */
306 int glue_idx; /* index into glue pairs for elt's output; -1 = no glue */
309 /* Overall state of the recursive linking process */
310 typedef struct linking_state {
311 int nlinks; /* number of linkage objects in each array */
312 linkage *cur; /* "current" linkage */
314 linkage *best; /* best linkage so far */
315 gint32 best_cost; /* cost for best */
318 /* used for debugging messages */
324 case XFER_MECH_NONE: return "NONE";
325 case XFER_MECH_READFD: return "READFD";
326 case XFER_MECH_WRITEFD: return "WRITEFD";
327 case XFER_MECH_PULL_BUFFER: return "PULL_BUFFER";
328 case XFER_MECH_PUSH_BUFFER: return "PUSH_BUFFER";
329 default: return "UNKNOWN";
333 /* calculate an integer representing the cost of a mech pair as a
334 * single integer. OPS_PER_BYTE is the most important metric,
335 * followed by NTHREADS.
337 * PAIR will be evaluated multiple times.
339 #define PAIR_COST(pair) (((pair).ops_per_byte << 8) + (pair).nthreads)
342 #define MAX_COST 0xffffff
344 /* Generate all possible linkages of elements [idx:nlinks], where
345 * elements [0:idx-1] have cost 'cost' and end with mechanism
351 xfer_mech input_mech,
354 xfer_element_mech_pair_t *elt_pairs, *glue_pairs;
357 /* if we've overrun the previous best cost already, then bail out */
358 if (cost >= st->best_cost)
361 /* have we linked everything? */
362 if (idx == st->nlinks) {
363 /* if we ended on other than XFER_MECH_NONE, then this is not a
365 if (input_mech != XFER_MECH_NONE) return;
367 /* we already know this has lower cost than the previous best */
368 memcpy(st->best, st->cur, st->nlinks * sizeof(linkage));
369 st->best_cost = cost;
374 /* recurse for each linkage we can make that starts with input_mech */
376 elt_pairs = XFER_ELEMENT_GET_CLASS(my->elt)->mech_pairs;
377 glue_pairs = xfer_element_glue_mech_pairs;
379 for (my->elt_idx = 0;
380 elt_pairs[my->elt_idx].input_mech != XFER_MECH_NONE
381 || elt_pairs[my->elt_idx].output_mech != XFER_MECH_NONE;
383 /* reject this pair if the input mech does not match */
384 if (elt_pairs[my->elt_idx].input_mech != input_mech)
387 /* recurse with no glue */
389 link_recurse(st, idx+1,
390 elt_pairs[my->elt_idx].output_mech,
391 cost + PAIR_COST(elt_pairs[my->elt_idx]));
393 /* and recurse with glue */
394 for (my->glue_idx = 0;
395 glue_pairs[my->glue_idx].input_mech != XFER_MECH_NONE
396 || glue_pairs[my->glue_idx].output_mech != XFER_MECH_NONE;
398 /* reject this glue pair if it doesn't match with the element output */
399 if (glue_pairs[my->glue_idx].input_mech != elt_pairs[my->elt_idx].output_mech)
402 /* and recurse with the glue */
403 link_recurse(st, idx+1,
404 glue_pairs[my->glue_idx].output_mech,
405 cost + PAIR_COST(elt_pairs[my->elt_idx])
406 + PAIR_COST(glue_pairs[my->glue_idx]));
415 GPtrArray *new_elements;
417 XferElementClass *eltc;
422 /* Note that this algorithm's running time is polynomial in the length of
423 * the transfer, with a fairly high order. If Amanda is regularly assembling
424 * transfers with more than, say, 6 elements, then the algorithm should be
427 /* set up the state for recursion */
428 st.nlinks = xfer->elements->len;
429 st.cur = g_new0(linkage, st.nlinks);
430 st.best = g_new0(linkage, st.nlinks);
431 st.best_cost = MAX_COST;
432 for (i = 0; i < st.nlinks; i++) {
433 st.cur[i].elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
436 /* start recursing with the first element, asserting that its input mech is NONE */
437 link_recurse(&st, 0, XFER_MECH_NONE, 0);
439 /* check that we got *some* solution */
440 if (st.best_cost == MAX_COST) {
441 error(_("Xfer %s cannot be linked."), xfer_repr(xfer));
444 /* Now create a new list of elements, containing any glue elements
445 * that we need to add, and set their input_mech and output_mech fields */
446 new_elements = g_ptr_array_sized_new(xfer->elements->len);
447 for (i = 0; i < st.nlinks; i++) {
448 elt = st.best[i].elt;
449 eltc = XFER_ELEMENT_GET_CLASS(elt);
450 elt->input_mech = eltc->mech_pairs[st.best[i].elt_idx].input_mech;
451 elt->output_mech = eltc->mech_pairs[st.best[i].elt_idx].output_mech;
452 g_ptr_array_add(new_elements, elt);
454 if (st.best[i].glue_idx != -1) {
455 elt = xfer_element_glue();
456 eltc = XFER_ELEMENT_GET_CLASS(elt);
458 elt->input_mech = eltc->mech_pairs[st.best[i].glue_idx].input_mech;
459 elt->output_mech = eltc->mech_pairs[st.best[i].glue_idx].output_mech;
460 g_ptr_array_add(new_elements, elt);
464 /* install the new list of elements */
465 g_ptr_array_free(xfer->elements, FALSE);
466 xfer->elements = new_elements;
469 /* debug-log the xfer's linkage */
470 len = xfer->elements->len;
471 linkage_str = stralloc("Final linkage: ");
472 for (i = 0; i < len; i++) {
473 XferElement *elt = g_ptr_array_index(xfer->elements, i);
476 linkage_str = newvstralloc(linkage_str, linkage_str, xfer_element_repr(elt), NULL);
478 linkage_str = newvstrallocf(linkage_str, "%s -(%s)-> %s",
479 linkage_str, xfer_mech_name(elt->input_mech), xfer_element_repr(elt));
481 g_debug("%s", linkage_str);
497 XMsgSource *xms = (XMsgSource *)source;
500 return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
507 XMsgSource *xms = (XMsgSource *)source;
509 return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
514 GSource *source G_GNUC_UNUSED,
515 GSourceFunc callback,
518 XMsgSource *xms = (XMsgSource *)source;
519 Xfer *xfer = xms->xfer;
520 XMsgCallback my_cb = (XMsgCallback)callback;
522 gboolean deliver_to_caller;
524 gboolean xfer_done = FALSE;
526 /* we're potentially calling Perl code within this loop, so we have to
527 * check that everything is ok on each iteration of the loop. */
529 && xfer->status != XFER_DONE
530 && (msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
532 /* We get first crack at interpreting messages, before calling the
533 * designated callback. */
534 deliver_to_caller = TRUE;
536 /* Intercept and count DONE messages so that we can determine when
537 * the entire transfer is finished. */
539 if (--xfer->num_active_elements <= 0) {
540 /* mark the transfer as done, and take a note to break out
541 * of this loop after delivering the message to the user */
542 xfer_set_status(xfer, XFER_DONE);
545 /* eat this XMSG_DONE, since we expect more */
546 deliver_to_caller = FALSE;
551 if (xfer->status == XFER_CANCELLING || xfer->status == XFER_CANCELLED) {
552 /* ignore duplicate cancel messages */
553 deliver_to_caller = FALSE;
555 /* call cancel() on each child element */
558 g_debug("Cancelling %s", xfer_repr(xfer));
559 xfer_set_status(xfer, XFER_CANCELLING);
562 for (i = 0; i < xfer->elements->len; i++) {
563 XferElement *elt = (XferElement *)
564 g_ptr_array_index(xfer->elements, i);
565 expect_eof = xfer_element_cancel(elt, expect_eof) || expect_eof;
568 /* if nothing in the transfer can generate an EOF, then we
569 * can't cancel this transfer, and we'll just have to wait
570 * until it's finished. This may happen, for example, if
571 * the operating system is copying data for us
574 g_warning("Transfer %s cannot be cancelled.", xfer_repr(xfer));
576 /* and now we're done cancelling */
577 xfer_set_status(xfer, XFER_CANCELLED);
582 break; /* nothing interesting to do */
585 if (deliver_to_caller) {
587 my_cb(user_data, msg, xfer);
589 g_warning("Dropping %s because no callback is set", xmsg_repr(msg));
595 /* This transfer is done, so kill it and exit the loop */
603 /* Never automatically un-queue the event source */
611 static GSourceFuncs *xmsgsource_funcs = NULL;
615 /* initialize these here to avoid a compiler warning */
616 if (!xmsgsource_funcs) {
617 xmsgsource_funcs = g_new0(GSourceFuncs, 1);
618 xmsgsource_funcs->prepare = xmsgsource_prepare;
619 xmsgsource_funcs->check = xmsgsource_check;
620 xmsgsource_funcs->dispatch = xmsgsource_dispatch;
623 src = g_source_new(xmsgsource_funcs, sizeof(XMsgSource));
624 xms = (XMsgSource *)src;