Imported Upstream version 3.1.0
[debian/amanda] / xfer-src / xfer.c
1 /*
2  * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  *
17  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 #include "amxfer.h"
22 #include "element-glue.h"
23 #include "amanda.h"
24 #include "arglist.h"
25
26 /* XMsgSource objects are GSource "subclasses" which manage
27  * a queue of messages, delivering those messages via callback
28  * in the mainloop.  Messages can be *sent* from any thread without
29  * any concern for locking, but must only be received in the main
30  * thread, in the default GMainContext.
31  *
32  * An XMsgSource pointer can be cast to a GSource pointer as
33  * necessary.
34  */
35 typedef struct XMsgSource {
36     GSource source; /* must be the first element of the struct */
37     Xfer *xfer;
38 } XMsgSource;
39
40 /* forward prototypes */
41 static void xfer_set_status(Xfer *xfer, xfer_status status);
42 static XMsgSource *xmsgsource_new(Xfer *xfer);
43 static void link_elements(Xfer *xfer);
44
45 Xfer *
46 xfer_new(
47     XferElement **elements,
48     unsigned int nelements)
49 {
50     Xfer *xfer = g_new0(Xfer, 1);
51     unsigned int i;
52
53     g_assert(elements);
54     g_assert(nelements >= 2);
55
56     xfer->status = XFER_INIT;
57     xfer->status_mutex = g_mutex_new();
58     xfer->status_cond = g_cond_new();
59
60     xfer->refcount = 1;
61     xfer->repr = NULL;
62
63     /* Create our message source and corresponding queue */
64     xfer->msg_source = xmsgsource_new(xfer);
65     xfer->queue = g_async_queue_new();
66
67     /* copy the elements in, verifying that they're all XferElement objects */
68     xfer->elements = g_ptr_array_sized_new(nelements);
69     for (i = 0; i < nelements; i++) {
70         g_assert(elements[i] != NULL);
71         g_assert(IS_XFER_ELEMENT(elements[i]));
72         g_assert(elements[i]->xfer == NULL);
73
74         g_ptr_array_add(xfer->elements, (gpointer)elements[i]);
75
76         g_object_ref(elements[i]);
77         elements[i]->xfer = xfer;
78     }
79
80     return xfer;
81 }
82
83 void
84 xfer_ref(
85     Xfer *xfer)
86 {
87     ++xfer->refcount;
88 }
89
90 void
91 xfer_unref(
92     Xfer *xfer)
93 {
94     unsigned int i;
95     XMsg *msg;
96
97     if (!xfer) return; /* be friendly to NULLs */
98
99     if (--xfer->refcount > 0) return;
100
101     g_assert(xfer != NULL);
102     g_assert(xfer->status == XFER_INIT || xfer->status == XFER_DONE);
103
104     /* Divorce ourselves from the message source */
105     xfer->msg_source->xfer = NULL;
106     g_source_unref((GSource *)xfer->msg_source);
107     xfer->msg_source = NULL;
108
109     /* Try to empty the message queue */
110     while ((msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
111         g_warning("Dropping XMsg from %s because the XMsgSource is being destroyed", 
112             xfer_element_repr(msg->elt));
113         xmsg_free(msg);
114     }
115     g_async_queue_unref(xfer->queue);
116
117     g_mutex_free(xfer->status_mutex);
118     g_cond_free(xfer->status_cond);
119
120     /* Free our references to the elements, and also set the 'xfer'
121      * attribute of each to NULL, making them "unattached" (although 
122      * subsequent reuse of elements is untested). */
123     for (i = 0; i < xfer->elements->len; i++) {
124         XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
125
126         elt->xfer = NULL;
127         g_object_unref(elt);
128     }
129     g_ptr_array_free(xfer->elements, TRUE);
130
131     if (xfer->repr)
132         g_free(xfer->repr);
133
134     g_free(xfer);
135 }
136
137 GSource *
138 xfer_get_source(
139     Xfer *xfer)
140 {
141     return (GSource *)xfer->msg_source;
142 }
143
144 void
145 xfer_queue_message(
146     Xfer *xfer,
147     XMsg *msg)
148 {
149     g_assert(xfer != NULL);
150     g_assert(msg != NULL);
151
152     g_async_queue_push(xfer->queue, (gpointer)msg);
153
154     /* TODO: don't do this if we're in the main thread */
155     g_main_context_wakeup(NULL);
156 }
157
158 char *
159 xfer_repr(
160     Xfer *xfer)
161 {
162     unsigned int i;
163
164     if (!xfer->repr) {
165         xfer->repr = newvstrallocf(xfer->repr, "<Xfer@%p (", xfer);
166         for (i = 0; i < xfer->elements->len; i++) {
167             XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
168             xfer->repr = newvstralloc(xfer->repr,
169                 xfer->repr, (i==0)?"":" -> ", xfer_element_repr(elt), NULL);
170         }
171         xfer->repr = newvstralloc(xfer->repr, xfer->repr, ")>", NULL);
172     }
173
174     return xfer->repr;
175 }
176
177 void
178 xfer_start(
179     Xfer *xfer)
180 {
181     unsigned int len;
182     unsigned int i;
183     gboolean setup_ok;
184
185     g_assert(xfer != NULL);
186     g_assert(xfer->status == XFER_INIT);
187     g_assert(xfer->elements->len >= 2);
188
189     g_debug("Starting %s", xfer_repr(xfer));
190     /* set the status to XFER_START and add a reference to our count, so that
191      * we are not freed while still in operation.  We'll drop this reference
192      * when the status becomes XFER_DONE. */
193     xfer_ref(xfer);
194     xfer->num_active_elements = 0;
195     xfer_set_status(xfer, XFER_START);
196
197     /* Link the elements.  This calls error() on failure, and rewrites
198      * xfer->elements */
199     link_elements(xfer);
200
201     /* Tell all elements to set up.  This is done before upstream and downstream
202      * are set so that elements cannot interfere with one another before setup()
203      * is completed. */
204     setup_ok = TRUE;
205     for (i = 0; i < xfer->elements->len; i++) {
206         XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i);
207         if (!xfer_element_setup(xe)) {
208             setup_ok = FALSE;
209             break;
210         }
211     }
212
213     /* If setup_ok is false, then there is an XMSG_CANCEL in the message queue
214      * already, so skip calling start for any of the elements and send an
215      * XMSG_DONE, since none of the elements will do so. */
216
217     if (setup_ok) {
218         /* Set the upstream and downstream links between elements */
219         len = xfer->elements->len;
220         for (i = 0; i < len; i++) {
221             XferElement *elt = g_ptr_array_index(xfer->elements, i);
222
223             if (i > 0)
224                 elt->upstream = g_ptr_array_index(xfer->elements, i-1);
225             if (i < len-1)
226                 elt->downstream = g_ptr_array_index(xfer->elements, i+1);
227         }
228
229         /* now tell them all to start, in order from destination to source */
230         for (i = xfer->elements->len; i >= 1; i--) {
231             XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1);
232             if (xfer_element_start(xe))
233                 xfer->num_active_elements++;
234         }
235     }
236
237     /* (note that status can only change in the main thread, so we can be
238      * certain that the status is still XFER_START and we have not yet been
239      * cancelled.  We may have an XMSG_CANCEL already queued up for us, though) */
240     xfer_set_status(xfer, XFER_RUNNING);
241
242     /* If this transfer involves no active processing, then we consider it to
243      * be done already.  We send a "fake" XMSG_DONE from the destination element,
244      * so that all of the usual processing will take place. */
245     if (xfer->num_active_elements == 0) {
246         if (setup_ok)
247             g_debug("%s has no active elements; generating fake XMSG_DONE", xfer_repr(xfer));
248         xfer->num_active_elements++;
249         xfer_queue_message(xfer,
250             xmsg_new((XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1),
251                      XMSG_DONE, 0));
252     }
253 }
254
255 void
256 xfer_cancel(
257     Xfer *xfer)
258 {
259     /* Since xfer_cancel can be called from any thread, we just send a message.
260      * The action takes place when the message is received. */
261     XferElement *src = g_ptr_array_index(xfer->elements, 0);
262     xfer_queue_message(xfer, xmsg_new(src, XMSG_CANCEL, 0));
263 }
264
265 static void
266 xfer_set_status(
267     Xfer *xfer,
268     xfer_status status)
269 {
270     if (xfer->status == status) return;
271
272     g_mutex_lock(xfer->status_mutex);
273
274     /* check that this state transition is valid */
275     switch (status) {
276     case XFER_START:
277         g_assert(xfer->status == XFER_INIT);
278         break;
279     case XFER_RUNNING:
280         g_assert(xfer->status == XFER_START);
281         break;
282     case XFER_CANCELLING:
283         g_assert(xfer->status == XFER_RUNNING);
284         break;
285     case XFER_CANCELLED:
286         g_assert(xfer->status == XFER_CANCELLING);
287         break;
288     case XFER_DONE:
289         g_assert(xfer->status == XFER_CANCELLED || xfer->status == XFER_RUNNING);
290         break;
291     case XFER_INIT:
292     default:
293         g_assert_not_reached();
294     }
295
296     xfer->status = status;
297     g_cond_broadcast(xfer->status_cond);
298     g_mutex_unlock(xfer->status_mutex);
299 }
300
301 /*
302  * Element linking
303  */
304
305 /* How is ELT linked? link_recurse uses an array of these to track its progress
306  * and find the optimal overall linkage. */
307 typedef struct linkage {
308     XferElement *elt;
309     xfer_element_mech_pair_t *mech_pairs;
310     int elt_idx; /* index into elt's mech_pairs */
311     int glue_idx; /* index into glue pairs for elt's output; -1 = no glue */
312 } linkage;
313
314 /* Overall state of the recursive linking process */
315 typedef struct linking_state {
316     int nlinks; /* number of linkage objects in each array */
317     linkage *cur; /* "current" linkage */
318
319     linkage *best; /* best linkage so far */
320     gint32 best_cost; /* cost for best */
321 } linking_state;
322
323 /* used for debugging messages */
324 static char *
325 xfer_mech_name(
326     xfer_mech mech)
327 {
328     switch (mech) {
329         case XFER_MECH_NONE: return "NONE";
330         case XFER_MECH_READFD: return "READFD";
331         case XFER_MECH_WRITEFD: return "WRITEFD";
332         case XFER_MECH_PULL_BUFFER: return "PULL_BUFFER";
333         case XFER_MECH_PUSH_BUFFER: return "PUSH_BUFFER";
334         case XFER_MECH_DIRECTTCP_LISTEN: return "DIRECTTCP_LISTEN";
335         case XFER_MECH_DIRECTTCP_CONNECT: return "DIRECTTCP_CONNECT";
336         default: return "UNKNOWN";
337     }
338 }
339
340 /* calculate an integer representing the cost of a mech pair as a
341  * single integer.  OPS_PER_BYTE is the most important metric,
342  * followed by NTHREADS.
343  *
344  * PAIR will be evaluated multiple times.
345  */
346 #define PAIR_COST(pair) (((pair).ops_per_byte << 8) + (pair).nthreads)
347
348 /* maximum cost */
349 #define MAX_COST 0xffffff
350
351 /* Generate all possible linkages of elements [idx:nlinks], where
352  * elements [0:idx-1] have cost 'cost' and end with mechanism
353  * 'input_mech'. */
354 static void
355 link_recurse(
356     linking_state *st,
357     int idx,
358     xfer_mech input_mech,
359     gint32 cost)
360 {
361     xfer_element_mech_pair_t *elt_pairs, *glue_pairs;
362     linkage *my;
363
364     /* if we've overrun the previous best cost already, then bail out */
365     if (cost >= st->best_cost)
366         return;
367
368     /* have we linked everything? */
369     if (idx == st->nlinks) {
370         /* if we ended on other than XFER_MECH_NONE, then this is not a
371          * valid transfer */
372         if (input_mech != XFER_MECH_NONE) return;
373
374         /* we already know this has lower cost than the previous best */
375         memcpy(st->best, st->cur, st->nlinks * sizeof(linkage));
376         st->best_cost = cost;
377
378         return;
379     }
380
381     /* recurse for each linkage we can make that starts with input_mech */
382     my = &st->cur[idx];
383     elt_pairs = my->mech_pairs;
384     glue_pairs = xfer_element_glue_mech_pairs;
385
386     for (my->elt_idx = 0;
387          elt_pairs[my->elt_idx].input_mech != XFER_MECH_NONE
388          || elt_pairs[my->elt_idx].output_mech != XFER_MECH_NONE;
389          my->elt_idx++) {
390          /* reject this pair if the input mech does not match */
391          if (elt_pairs[my->elt_idx].input_mech != input_mech)
392             continue;
393
394          /* recurse with no glue */
395          my->glue_idx = -1;
396          link_recurse(st, idx+1,
397                       elt_pairs[my->elt_idx].output_mech,
398                       cost + PAIR_COST(elt_pairs[my->elt_idx]));
399
400         /* and recurse with glue */
401         for (my->glue_idx = 0;
402              glue_pairs[my->glue_idx].input_mech != XFER_MECH_NONE
403              || glue_pairs[my->glue_idx].output_mech != XFER_MECH_NONE;
404              my->glue_idx++) {
405             /* reject this glue pair if it doesn't match with the element output */
406             if (glue_pairs[my->glue_idx].input_mech != elt_pairs[my->elt_idx].output_mech)
407                 continue;
408
409              /* and recurse with the glue */
410              link_recurse(st, idx+1,
411                           glue_pairs[my->glue_idx].output_mech,
412                           cost + PAIR_COST(elt_pairs[my->elt_idx])
413                                + PAIR_COST(glue_pairs[my->glue_idx]));
414         }
415     }
416 }
417
418 static void
419 link_elements(
420     Xfer *xfer)
421 {
422     GPtrArray *new_elements;
423     XferElement *elt;
424     char *linkage_str;
425     linking_state st;
426     gint i, len;
427
428     /* Note that this algorithm's running time is polynomial in the length of
429      * the transfer, with a fairly high order.  If Amanda is regularly assembling
430      * transfers with more than, say, 6 elements, then the algorithm should be
431      * redesigned. */
432
433     /* set up the state for recursion */
434     st.nlinks = xfer->elements->len;
435     st.cur = g_new0(linkage, st.nlinks);
436     st.best = g_new0(linkage, st.nlinks);
437     st.best_cost = MAX_COST;
438     for (i = 0; i < st.nlinks; i++) {
439         st.cur[i].elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
440         st.cur[i].mech_pairs = xfer_element_get_mech_pairs(st.cur[i].elt);
441     }
442
443     /* check that the first element is an XferSource and the last is an XferDest.
444      * A source is identified by having no input mechanisms. */
445     if (st.cur[0].mech_pairs[0].input_mech != XFER_MECH_NONE)
446         error("Transfer element 0 is not a transfer source");
447
448     /* Similarly, a destination has no output mechanisms. */
449     if (st.cur[st.nlinks-1].mech_pairs[0].output_mech != XFER_MECH_NONE)
450         error("Last transfer element is not a transfer destination");
451
452     /* start recursing with the first element, asserting that its input mech is NONE */
453     link_recurse(&st, 0, XFER_MECH_NONE, 0);
454
455     /* check that we got *some* solution */
456     if (st.best_cost == MAX_COST) {
457         error(_("Xfer %s cannot be linked."), xfer_repr(xfer));
458     }
459
460     /* Now create a new list of elements, containing any glue elements
461      * that we need to add, and set their input_mech and output_mech fields */
462     new_elements = g_ptr_array_sized_new(xfer->elements->len);
463     for (i = 0; i < st.nlinks; i++) {
464         elt = st.best[i].elt;
465         elt->input_mech = st.best[i].mech_pairs[st.best[i].elt_idx].input_mech;
466         elt->output_mech = st.best[i].mech_pairs[st.best[i].elt_idx].output_mech;
467         g_ptr_array_add(new_elements, elt);
468
469         if (st.best[i].glue_idx != -1) {
470             elt = xfer_element_glue();
471             elt->xfer = xfer;
472             elt->input_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].input_mech;
473             elt->output_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].output_mech;
474             g_ptr_array_add(new_elements, elt);
475         }
476     }
477
478     /* install the new list of elements */
479     g_ptr_array_free(xfer->elements, FALSE);
480     xfer->elements = new_elements;
481     new_elements = NULL;
482
483     /* debug-log the xfer's linkage */
484     len = xfer->elements->len;
485     linkage_str = stralloc("Final linkage: ");
486     for (i = 0; i < len; i++) {
487         XferElement *elt = g_ptr_array_index(xfer->elements, i);
488
489         if (i == 0)
490             linkage_str = newvstralloc(linkage_str, linkage_str, xfer_element_repr(elt), NULL);
491         else
492             linkage_str = newvstrallocf(linkage_str, "%s -(%s)-> %s",
493                 linkage_str, xfer_mech_name(elt->input_mech), xfer_element_repr(elt));
494     }
495     g_debug("%s", linkage_str);
496     amfree(linkage_str);
497
498     amfree(st.cur);
499     amfree(st.best);
500 }
501
502 /*
503  * XMsgSource
504  */
505
506 static gboolean
507 xmsgsource_prepare(
508     GSource *source,
509     gint *timeout_)
510 {
511     XMsgSource *xms = (XMsgSource *)source;
512
513     *timeout_ = -1;
514     return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
515 }
516
517 static gboolean
518 xmsgsource_check(
519     GSource *source)
520 {
521     XMsgSource *xms = (XMsgSource *)source;
522
523     return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
524 }
525
526 static gboolean
527 xmsgsource_dispatch(
528     GSource *source G_GNUC_UNUSED,
529     GSourceFunc callback,
530     gpointer user_data)
531 {
532     XMsgSource *xms = (XMsgSource *)source;
533     Xfer *xfer = xms->xfer;
534     XMsgCallback my_cb = (XMsgCallback)callback;
535     XMsg *msg;
536     gboolean deliver_to_caller;
537     guint i;
538     gboolean xfer_done = FALSE;
539
540     /* we're potentially calling Perl code within this loop, so we have to
541      * check that everything is ok on each iteration of the loop. */
542     while (xfer
543         && xfer->status != XFER_DONE
544         && (msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
545
546         /* We get first crack at interpreting messages, before calling the
547          * designated callback. */
548         deliver_to_caller = TRUE;
549         switch (msg->type) {
550             /* Intercept and count DONE messages so that we can determine when
551              * the entire transfer is finished. */
552             case XMSG_DONE:
553                 if (--xfer->num_active_elements <= 0) {
554                     /* mark the transfer as done, and take a note to break out
555                      * of this loop after delivering the message to the user */
556                     xfer_set_status(xfer, XFER_DONE);
557                     xfer_done = TRUE;
558                 } else {
559                     /* eat this XMSG_DONE, since we expect more */
560                     deliver_to_caller = FALSE;
561                 }
562                 break;
563
564             case XMSG_CANCEL:
565                 if (xfer->status == XFER_CANCELLING || xfer->status == XFER_CANCELLED) {
566                     /* ignore duplicate cancel messages */
567                     deliver_to_caller = FALSE;
568                 } else {
569                     /* call cancel() on each child element */
570                     gboolean expect_eof;
571
572                     g_debug("Cancelling %s", xfer_repr(xfer));
573                     xfer_set_status(xfer, XFER_CANCELLING);
574
575                     expect_eof = FALSE;
576                     for (i = 0; i < xfer->elements->len; i++) {
577                         XferElement *elt = (XferElement *)
578                                 g_ptr_array_index(xfer->elements, i);
579                         expect_eof = xfer_element_cancel(elt, expect_eof) || expect_eof;
580                     }
581
582                     /* if nothing in the transfer can generate an EOF, then we
583                      * can't cancel this transfer, and we'll just have to wait
584                      * until it's finished.  This may happen, for example, if
585                      * the operating system is copying data for us
586                      * asynchronously */
587                     if (!expect_eof)
588                         g_warning("Transfer %s cannot be cancelled.", xfer_repr(xfer));
589
590                     /* and now we're done cancelling */
591                     xfer_set_status(xfer, XFER_CANCELLED);
592                 }
593                 break;
594
595             default:
596                 break;  /* nothing interesting to do */
597         }
598
599         if (deliver_to_caller) {
600             if (my_cb) {
601                 my_cb(user_data, msg, xfer);
602             } else {
603                 g_warning("Dropping %s because no callback is set", xmsg_repr(msg));
604             }
605         }
606
607         xmsg_free(msg);
608
609         /* This transfer is done, so kill it and exit the loop */
610         if (xfer_done) {
611             xfer_unref(xfer);
612             xfer = NULL;
613             break;
614         }
615     }
616
617     /* Never automatically un-queue the event source */
618     return TRUE;
619 }
620
621 XMsgSource *
622 xmsgsource_new(
623     Xfer *xfer)
624 {
625     static GSourceFuncs *xmsgsource_funcs = NULL;
626     GSource *src;
627     XMsgSource *xms;
628
629     /* initialize these here to avoid a compiler warning */
630     if (!xmsgsource_funcs) {
631         xmsgsource_funcs = g_new0(GSourceFuncs, 1);
632         xmsgsource_funcs->prepare = xmsgsource_prepare;
633         xmsgsource_funcs->check = xmsgsource_check;
634         xmsgsource_funcs->dispatch = xmsgsource_dispatch;
635     }
636
637     src = g_source_new(xmsgsource_funcs, sizeof(XMsgSource));
638     xms = (XMsgSource *)src;
639     xms->xfer = xfer;
640
641     return xms;
642 }
643
644 xfer_status
645 wait_until_xfer_cancelled(
646     Xfer *xfer)
647 {
648     xfer_status seen_status;
649     g_assert(xfer != NULL);
650
651     g_mutex_lock(xfer->status_mutex);
652     while (xfer->status != XFER_CANCELLED && xfer->status != XFER_DONE)
653         g_cond_wait(xfer->status_cond, xfer->status_mutex);
654     seen_status = xfer->status;
655     g_mutex_unlock(xfer->status_mutex);
656
657     return seen_status;
658 }
659
660 xfer_status
661 wait_until_xfer_running(
662     Xfer *xfer)
663 {
664     xfer_status seen_status;
665     g_assert(xfer != NULL);
666
667     g_mutex_lock(xfer->status_mutex);
668     while (xfer->status == XFER_START)
669         g_cond_wait(xfer->status_cond, xfer->status_mutex);
670     seen_status = xfer->status;
671     g_mutex_unlock(xfer->status_mutex);
672
673     return seen_status;
674 }
675
676 void
677 xfer_cancel_with_error(
678     XferElement *elt,
679     const char *fmt,
680     ...)
681 {
682     va_list argp;
683     XMsg *msg;
684
685     g_assert(elt != NULL);
686     g_assert(elt->xfer != NULL);
687
688     msg = xmsg_new(elt, XMSG_ERROR, 0);
689
690     arglist_start(argp, fmt);
691     msg->message = g_strdup_vprintf(fmt, argp);
692     arglist_end(argp);
693
694     /* send the XMSG_ERROR */
695     xfer_queue_message(elt->xfer, msg);
696
697     /* cancel the transfer */
698     xfer_cancel(elt->xfer);
699 }
700