2 * Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU General Public License
6 * as published by the Free Software Foundation; either version 2
7 * of the License, or (at your option) any later version.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
24 #include "element-glue.h"
27 /* XMsgSource objects are GSource "subclasses" which manage
28 * a queue of messages, delivering those messages via callback
29 * in the mainloop. Messages can be *sent* from any thread without
30 * any concern for locking, but must only be received in the main
31 * thread, in the default GMainContext.
33 * An XMsgSource pointer can be cast to a GSource pointer as
36 typedef struct XMsgSource {
37 GSource source; /* must be the first element of the struct */
41 /* forward prototypes */
42 static void xfer_set_status(Xfer *xfer, xfer_status status);
43 static XMsgSource *xmsgsource_new(Xfer *xfer);
44 static void link_elements(Xfer *xfer);
48 XferElement **elements,
49 unsigned int nelements)
51 Xfer *xfer = g_new0(Xfer, 1);
55 g_assert(nelements >= 2);
57 xfer->status = XFER_INIT;
58 xfer->status_mutex = g_mutex_new();
59 xfer->status_cond = g_cond_new();
60 xfer->fd_mutex = g_mutex_new();
65 /* Create our message source and corresponding queue */
66 xfer->msg_source = xmsgsource_new(xfer);
67 xfer->queue = g_async_queue_new();
69 /* copy the elements in, verifying that they're all XferElement objects */
70 xfer->elements = g_ptr_array_sized_new(nelements);
71 for (i = 0; i < nelements; i++) {
72 g_assert(elements[i] != NULL);
73 g_assert(IS_XFER_ELEMENT(elements[i]));
74 g_assert(elements[i]->xfer == NULL);
76 g_ptr_array_add(xfer->elements, (gpointer)elements[i]);
78 g_object_ref(elements[i]);
79 elements[i]->xfer = xfer;
99 if (!xfer) return; /* be friendly to NULLs */
101 if (--xfer->refcount > 0) return;
103 g_assert(xfer != NULL);
104 g_assert(xfer->status == XFER_INIT || xfer->status == XFER_DONE);
106 /* Divorce ourselves from the message source */
107 xfer->msg_source->xfer = NULL;
108 g_source_unref((GSource *)xfer->msg_source);
109 xfer->msg_source = NULL;
111 /* Try to empty the message queue */
112 while ((msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
113 g_warning("Dropping XMsg from %s because the XMsgSource is being destroyed",
114 xfer_element_repr(msg->elt));
117 g_async_queue_unref(xfer->queue);
119 g_mutex_free(xfer->status_mutex);
120 g_cond_free(xfer->status_cond);
121 g_mutex_free(xfer->fd_mutex);
123 /* Free our references to the elements, and also set the 'xfer'
124 * attribute of each to NULL, making them "unattached" (although
125 * subsequent reuse of elements is untested). */
126 for (i = 0; i < xfer->elements->len; i++) {
127 XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
132 g_ptr_array_free(xfer->elements, TRUE);
144 return (GSource *)xfer->msg_source;
152 g_assert(xfer != NULL);
153 g_assert(msg != NULL);
155 g_async_queue_push(xfer->queue, (gpointer)msg);
157 /* TODO: don't do this if we're in the main thread */
158 g_main_context_wakeup(NULL);
168 xfer->repr = newvstrallocf(xfer->repr, "<Xfer@%p (", xfer);
169 for (i = 0; i < xfer->elements->len; i++) {
170 XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
171 xfer->repr = newvstralloc(xfer->repr,
172 xfer->repr, (i==0)?"":" -> ", xfer_element_repr(elt), NULL);
174 xfer->repr = newvstralloc(xfer->repr, xfer->repr, ")>", NULL);
183 gint64 offset G_GNUC_UNUSED,
190 g_assert(xfer != NULL);
191 g_assert(xfer->status == XFER_INIT);
192 g_assert(xfer->elements->len >= 2);
193 g_assert(offset == 0);
195 g_debug("Starting %s", xfer_repr(xfer));
196 /* set the status to XFER_START and add a reference to our count, so that
197 * we are not freed while still in operation. We'll drop this reference
198 * when the status becomes XFER_DONE. */
200 xfer->num_active_elements = 0;
201 xfer_set_status(xfer, XFER_START);
203 /* Link the elements. This calls error() on failure, and rewrites
207 /* Tell all elements to set up. This is done before upstream and downstream
208 * are set so that elements cannot interfere with one another before setup()
211 for (i = 0; i < xfer->elements->len; i++) {
212 XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i);
213 if (!xfer_element_setup(xe)) {
219 /* If setup_ok is false, then there is an XMSG_CANCEL in the message queue
220 * already, so skip calling start for any of the elements and send an
221 * XMSG_DONE, since none of the elements will do so. */
224 /* Set the upstream and downstream links between elements */
225 len = xfer->elements->len;
226 for (i = 0; i < len; i++) {
227 XferElement *elt = g_ptr_array_index(xfer->elements, i);
230 elt->upstream = g_ptr_array_index(xfer->elements, i-1);
232 elt->downstream = g_ptr_array_index(xfer->elements, i+1);
235 /* Set size for first element */
237 XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, 0);
238 xfer_element_set_size(xe, size);
241 /* now tell them all to start, in order from destination to source */
242 for (i = xfer->elements->len; i >= 1; i--) {
243 XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1);
244 if (xfer_element_start(xe))
245 xfer->num_active_elements++;
249 /* (note that status can only change in the main thread, so we can be
250 * certain that the status is still XFER_START and we have not yet been
251 * cancelled. We may have an XMSG_CANCEL already queued up for us, though) */
252 xfer_set_status(xfer, XFER_RUNNING);
254 /* If this transfer involves no active processing, then we consider it to
255 * be done already. We send a "fake" XMSG_DONE from the destination element,
256 * so that all of the usual processing will take place. */
257 if (xfer->num_active_elements == 0) {
259 g_debug("%s has no active elements; generating fake XMSG_DONE", xfer_repr(xfer));
260 xfer->num_active_elements++;
261 xfer_queue_message(xfer,
262 xmsg_new((XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1),
271 /* Since xfer_cancel can be called from any thread, we just send a message.
272 * The action takes place when the message is received. */
274 if (xfer->cancelled > 0) return;
276 src = g_ptr_array_index(xfer->elements, 0);
277 xfer_queue_message(xfer, xmsg_new(src, XMSG_CANCEL, 0));
285 if (xfer->status == status) return;
287 g_mutex_lock(xfer->status_mutex);
289 /* check that this state transition is valid */
292 g_assert(xfer->status == XFER_INIT);
295 g_assert(xfer->status == XFER_START);
297 case XFER_CANCELLING:
298 g_assert(xfer->status == XFER_RUNNING);
301 g_assert(xfer->status == XFER_CANCELLING);
304 g_assert(xfer->status == XFER_CANCELLED || xfer->status == XFER_RUNNING);
308 g_assert_not_reached();
311 xfer->status = status;
312 g_cond_broadcast(xfer->status_cond);
313 g_mutex_unlock(xfer->status_mutex);
320 /* How is ELT linked? link_recurse uses an array of these to track its progress
321 * and find the optimal overall linkage. */
322 typedef struct linkage {
324 xfer_element_mech_pair_t *mech_pairs;
325 int elt_idx; /* index into elt's mech_pairs */
326 int glue_idx; /* index into glue pairs for elt's output; -1 = no glue */
329 /* Overall state of the recursive linking process */
330 typedef struct linking_state {
331 int nlinks; /* number of linkage objects in each array */
332 linkage *cur; /* "current" linkage */
334 linkage *best; /* best linkage so far */
335 gint32 best_cost; /* cost for best */
338 /* used for debugging messages */
344 case XFER_MECH_NONE: return "NONE";
345 case XFER_MECH_READFD: return "READFD";
346 case XFER_MECH_WRITEFD: return "WRITEFD";
347 case XFER_MECH_PULL_BUFFER: return "PULL_BUFFER";
348 case XFER_MECH_PUSH_BUFFER: return "PUSH_BUFFER";
349 case XFER_MECH_DIRECTTCP_LISTEN: return "DIRECTTCP_LISTEN";
350 case XFER_MECH_DIRECTTCP_CONNECT: return "DIRECTTCP_CONNECT";
351 default: return "UNKNOWN";
355 /* calculate an integer representing the cost of a mech pair as a
356 * single integer. OPS_PER_BYTE is the most important metric,
357 * followed by NTHREADS.
359 * PAIR will be evaluated multiple times.
361 #define PAIR_COST(pair) (((pair).ops_per_byte << 8) + (pair).nthreads)
364 #define MAX_COST 0xffffff
366 /* Generate all possible linkages of elements [idx:nlinks], where
367 * elements [0:idx-1] have cost 'cost' and end with mechanism
373 xfer_mech input_mech,
376 xfer_element_mech_pair_t *elt_pairs, *glue_pairs;
379 /* if we've overrun the previous best cost already, then bail out */
380 if (cost >= st->best_cost)
383 /* have we linked everything? */
384 if (idx == st->nlinks) {
385 /* if we ended on other than XFER_MECH_NONE, then this is not a
387 if (input_mech != XFER_MECH_NONE) return;
389 /* we already know this has lower cost than the previous best */
390 memcpy(st->best, st->cur, st->nlinks * sizeof(linkage));
391 st->best_cost = cost;
396 /* recurse for each linkage we can make that starts with input_mech */
398 elt_pairs = my->mech_pairs;
399 glue_pairs = xfer_element_glue_mech_pairs;
401 for (my->elt_idx = 0;
402 elt_pairs[my->elt_idx].input_mech != XFER_MECH_NONE
403 || elt_pairs[my->elt_idx].output_mech != XFER_MECH_NONE;
405 /* reject this pair if the input mech does not match */
406 if (elt_pairs[my->elt_idx].input_mech != input_mech)
409 /* recurse with no glue */
411 link_recurse(st, idx+1,
412 elt_pairs[my->elt_idx].output_mech,
413 cost + PAIR_COST(elt_pairs[my->elt_idx]));
415 /* and recurse with glue */
416 for (my->glue_idx = 0;
417 glue_pairs[my->glue_idx].input_mech != XFER_MECH_NONE
418 || glue_pairs[my->glue_idx].output_mech != XFER_MECH_NONE;
420 /* reject this glue pair if it doesn't match with the element output */
421 if (glue_pairs[my->glue_idx].input_mech != elt_pairs[my->elt_idx].output_mech)
424 /* and recurse with the glue */
425 link_recurse(st, idx+1,
426 glue_pairs[my->glue_idx].output_mech,
427 cost + PAIR_COST(elt_pairs[my->elt_idx])
428 + PAIR_COST(glue_pairs[my->glue_idx]));
437 GPtrArray *new_elements;
443 /* Note that this algorithm's running time is polynomial in the length of
444 * the transfer, with a fairly high order. If Amanda is regularly assembling
445 * transfers with more than, say, 6 elements, then the algorithm should be
448 /* set up the state for recursion */
449 st.nlinks = xfer->elements->len;
450 st.cur = g_new0(linkage, st.nlinks);
451 st.best = g_new0(linkage, st.nlinks);
452 st.best_cost = MAX_COST;
453 for (i = 0; i < st.nlinks; i++) {
454 st.cur[i].elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
455 st.cur[i].mech_pairs = xfer_element_get_mech_pairs(st.cur[i].elt);
458 /* check that the first element is an XferSource and the last is an XferDest.
459 * A source is identified by having no input mechanisms. */
460 if (st.cur[0].mech_pairs[0].input_mech != XFER_MECH_NONE)
461 error("Transfer element 0 is not a transfer source");
463 /* Similarly, a destination has no output mechanisms. */
464 if (st.cur[st.nlinks-1].mech_pairs[0].output_mech != XFER_MECH_NONE)
465 error("Last transfer element is not a transfer destination");
467 /* start recursing with the first element, asserting that its input mech is NONE */
468 link_recurse(&st, 0, XFER_MECH_NONE, 0);
470 /* check that we got *some* solution */
471 if (st.best_cost == MAX_COST) {
472 error(_("Xfer %s cannot be linked."), xfer_repr(xfer));
475 /* Now create a new list of elements, containing any glue elements
476 * that we need to add, and set their input_mech and output_mech fields */
477 new_elements = g_ptr_array_sized_new(xfer->elements->len);
478 for (i = 0; i < st.nlinks; i++) {
479 elt = st.best[i].elt;
480 elt->input_mech = st.best[i].mech_pairs[st.best[i].elt_idx].input_mech;
481 elt->output_mech = st.best[i].mech_pairs[st.best[i].elt_idx].output_mech;
482 g_ptr_array_add(new_elements, elt);
484 if (st.best[i].glue_idx != -1) {
485 elt = xfer_element_glue();
487 elt->input_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].input_mech;
488 elt->output_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].output_mech;
489 g_ptr_array_add(new_elements, elt);
493 /* install the new list of elements */
494 g_ptr_array_free(xfer->elements, FALSE);
495 xfer->elements = new_elements;
498 /* debug-log the xfer's linkage */
499 len = xfer->elements->len;
500 linkage_str = stralloc("Final linkage: ");
501 for (i = 0; i < len; i++) {
502 XferElement *elt = g_ptr_array_index(xfer->elements, i);
505 linkage_str = newvstralloc(linkage_str, linkage_str, xfer_element_repr(elt), NULL);
507 linkage_str = newvstrallocf(linkage_str, "%s -(%s)-> %s",
508 linkage_str, xfer_mech_name(elt->input_mech), xfer_element_repr(elt));
510 g_debug("%s", linkage_str);
526 XMsgSource *xms = (XMsgSource *)source;
529 return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
536 XMsgSource *xms = (XMsgSource *)source;
538 return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
543 GSource *source G_GNUC_UNUSED,
544 GSourceFunc callback,
547 XMsgSource *xms = (XMsgSource *)source;
548 Xfer *xfer = xms->xfer;
549 XMsgCallback my_cb = (XMsgCallback)callback;
551 gboolean deliver_to_caller;
553 gboolean xfer_done = FALSE;
555 /* we're potentially calling Perl code within this loop, so we have to
556 * check that everything is ok on each iteration of the loop. */
558 && xfer->status != XFER_DONE
559 && (msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
561 /* We get first crack at interpreting messages, before calling the
562 * designated callback. */
563 deliver_to_caller = TRUE;
565 /* Intercept and count DONE messages so that we can determine when
566 * the entire transfer is finished. */
568 if (--xfer->num_active_elements <= 0) {
569 /* mark the transfer as done, and take a note to break out
570 * of this loop after delivering the message to the user */
571 xfer_set_status(xfer, XFER_DONE);
574 /* eat this XMSG_DONE, since we expect more */
575 deliver_to_caller = FALSE;
580 if (xfer->status == XFER_CANCELLING || xfer->status == XFER_CANCELLED) {
581 /* ignore duplicate cancel messages */
582 deliver_to_caller = FALSE;
584 /* call cancel() on each child element */
587 g_debug("Cancelling %s", xfer_repr(xfer));
588 xfer_set_status(xfer, XFER_CANCELLING);
591 for (i = 0; i < xfer->elements->len; i++) {
592 XferElement *elt = (XferElement *)
593 g_ptr_array_index(xfer->elements, i);
594 expect_eof = xfer_element_cancel(elt, expect_eof) || expect_eof;
597 /* if nothing in the transfer can generate an EOF, then we
598 * can't cancel this transfer, and we'll just have to wait
599 * until it's finished. This may happen, for example, if
600 * the operating system is copying data for us
603 g_warning("Transfer %s cannot be cancelled.", xfer_repr(xfer));
605 /* and now we're done cancelling */
606 xfer_set_status(xfer, XFER_CANCELLED);
611 break; /* nothing interesting to do */
614 if (deliver_to_caller) {
616 my_cb(user_data, msg, xfer);
618 g_warning("Dropping %s because no callback is set", xmsg_repr(msg));
624 /* This transfer is done, so kill it and exit the loop */
632 /* Never automatically un-queue the event source */
640 static GSourceFuncs *xmsgsource_funcs = NULL;
644 /* initialize these here to avoid a compiler warning */
645 if (!xmsgsource_funcs) {
646 xmsgsource_funcs = g_new0(GSourceFuncs, 1);
647 xmsgsource_funcs->prepare = xmsgsource_prepare;
648 xmsgsource_funcs->check = xmsgsource_check;
649 xmsgsource_funcs->dispatch = xmsgsource_dispatch;
652 src = g_source_new(xmsgsource_funcs, sizeof(XMsgSource));
653 xms = (XMsgSource *)src;
660 wait_until_xfer_cancelled(
663 xfer_status seen_status;
664 g_assert(xfer != NULL);
666 g_mutex_lock(xfer->status_mutex);
667 while (xfer->status != XFER_CANCELLED && xfer->status != XFER_DONE)
668 g_cond_wait(xfer->status_cond, xfer->status_mutex);
669 seen_status = xfer->status;
670 g_mutex_unlock(xfer->status_mutex);
676 wait_until_xfer_running(
679 xfer_status seen_status;
680 g_assert(xfer != NULL);
682 g_mutex_lock(xfer->status_mutex);
683 while (xfer->status == XFER_START)
684 g_cond_wait(xfer->status_cond, xfer->status_mutex);
685 seen_status = xfer->status;
686 g_mutex_unlock(xfer->status_mutex);
692 xfer_cancel_with_error(
700 g_assert(elt != NULL);
701 g_assert(elt->xfer != NULL);
703 msg = xmsg_new(elt, XMSG_ERROR, 0);
705 arglist_start(argp, fmt);
706 msg->message = g_strdup_vprintf(fmt, argp);
709 /* send the XMSG_ERROR */
710 xfer_queue_message(elt->xfer, msg);
712 /* cancel the transfer */
713 xfer_cancel(elt->xfer);
717 xfer_atomic_swap_fd(Xfer *xfer, gint *fdp, gint newfd)
722 g_mutex_lock(xfer->fd_mutex);
726 g_mutex_unlock(xfer->fd_mutex);