b9b1800f8d2640aaf1de87d5d772e8cba3c45a13
[debian/amanda] / xfer-src / xfer.c
1 /*
2  * Copyright (c) 2008 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This library is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU Lesser General Public License version 2.1 as
6  * published by the Free Software Foundation.
7  *
8  * This library is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
11  * License for more details.
12  *
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this library; if not, write to the Free Software Foundation,
15  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
16  *
17  * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 #include "amxfer.h"
22 #include "element-glue.h"
23 #include "amanda.h"
24
25 /* XMsgSource objects are GSource "subclasses" which manage
26  * a queue of messages, delivering those messages via callback
27  * in the mainloop.  Messages can be *sent* from any thread without
28  * any concern for locking, but must only be received in the main
29  * thread, in the default GMainContext.
30  *
31  * An XMsgSource pointer can be cast to a GSource pointer as
32  * necessary.
33  */
34 typedef struct XMsgSource {
35     GSource source; /* must be the first element of the struct */
36     Xfer *xfer;
37 } XMsgSource;
38
39 /* forward prototypes */
40 static void xfer_set_status(Xfer *xfer, xfer_status status);
41 static XMsgSource *xmsgsource_new(Xfer *xfer);
42 static void link_elements(Xfer *xfer);
43
44 Xfer *
45 xfer_new(
46     XferElement **elements,
47     unsigned int nelements)
48 {
49     Xfer *xfer = g_new0(Xfer, 1);
50     unsigned int i;
51
52     g_assert(elements);
53     g_assert(nelements >= 2);
54
55     xfer->status = XFER_INIT;
56     xfer->status_mutex = g_mutex_new();
57     xfer->status_cond = g_cond_new();
58
59     xfer->refcount = 1;
60     xfer->repr = NULL;
61
62     /* Create our message source and corresponding queue */
63     xfer->msg_source = xmsgsource_new(xfer);
64     g_source_ref((GSource *)xfer->msg_source);
65     xfer->queue = g_async_queue_new();
66
67     /* copy the elements in, verifying that they're all XferElement objects */
68     xfer->elements = g_ptr_array_sized_new(nelements);
69     for (i = 0; i < nelements; i++) {
70         g_assert(elements[i] != NULL);
71         g_assert(IS_XFER_ELEMENT(elements[i]));
72         g_assert(elements[i]->xfer == NULL);
73
74         g_ptr_array_add(xfer->elements, (gpointer)elements[i]);
75
76         g_object_ref(elements[i]);
77         elements[i]->xfer = xfer;
78     }
79
80     return xfer;
81 }
82
83 void
84 xfer_ref(
85     Xfer *xfer)
86 {
87     ++xfer->refcount;
88 }
89
90 void
91 xfer_unref(
92     Xfer *xfer)
93 {
94     unsigned int i;
95     XMsg *msg;
96
97     if (!xfer) return; /* be friendly to NULLs */
98
99     if (--xfer->refcount > 0) return;
100
101     g_assert(xfer != NULL);
102     g_assert(xfer->status == XFER_INIT || xfer->status == XFER_DONE);
103
104     /* Divorce ourselves from the message source */
105     xfer->msg_source->xfer = NULL;
106     g_source_unref((GSource *)xfer->msg_source);
107     xfer->msg_source = NULL;
108
109     /* Try to empty the message queue */
110     while ((msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
111         g_warning("Dropping XMsg from %s because the XMsgSource is being destroyed", 
112             xfer_element_repr(msg->elt));
113         xmsg_free(msg);
114     }
115     g_async_queue_unref(xfer->queue);
116
117     g_mutex_free(xfer->status_mutex);
118     g_cond_free(xfer->status_cond);
119
120     /* Free our references to the elements, and also set the 'xfer'
121      * attribute of each to NULL, making them "unattached" (although 
122      * subsequent reuse of elements is untested). */
123     for (i = 0; i < xfer->elements->len; i++) {
124         XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
125
126         elt->xfer = NULL;
127         g_object_unref(elt);
128     }
129
130     g_free(xfer);
131 }
132
133 GSource *
134 xfer_get_source(
135     Xfer *xfer)
136 {
137     return (GSource *)xfer->msg_source;
138 }
139
140 void
141 xfer_queue_message(
142     Xfer *xfer,
143     XMsg *msg)
144 {
145     g_assert(xfer != NULL);
146     g_assert(msg != NULL);
147
148     g_async_queue_push(xfer->queue, (gpointer)msg);
149
150     /* TODO: don't do this if we're in the main thread */
151     g_main_context_wakeup(NULL);
152 }
153
154 char *
155 xfer_repr(
156     Xfer *xfer)
157 {
158     unsigned int i;
159
160     if (!xfer->repr) {
161         xfer->repr = newvstrallocf(xfer->repr, "<Xfer@%p (", xfer);
162         for (i = 0; i < xfer->elements->len; i++) {
163             XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
164             xfer->repr = newvstralloc(xfer->repr,
165                 xfer->repr, (i==0)?"":" -> ", xfer_element_repr(elt), NULL);
166         }
167         xfer->repr = newvstralloc(xfer->repr, xfer->repr, ")>", NULL);
168     }
169
170     return xfer->repr;
171 }
172
173 void
174 xfer_start(
175     Xfer *xfer)
176 {
177     unsigned int len;
178     unsigned int i;
179     XferElement *xe;
180
181     g_assert(xfer != NULL);
182     g_assert(xfer->status == XFER_INIT);
183     g_assert(xfer->elements->len >= 2);
184
185     g_debug("Starting %s", xfer_repr(xfer));
186     /* set the status to XFER_START and add a reference to our count, so that
187      * we are not freed while still in operation.  We'll drop this reference
188      * when the status becomes XFER_DONE. */
189     xfer_ref(xfer);
190     xfer->num_active_elements = 0;
191     xfer_set_status(xfer, XFER_START);
192
193     /* check that the first element is an XferSource and the last is an XferDest.
194      * A source is identified by having no input mechanisms. */
195     xe = (XferElement *)g_ptr_array_index(xfer->elements, 0);
196     if (XFER_ELEMENT_GET_CLASS(xe)->mech_pairs[0].input_mech != XFER_MECH_NONE)
197         error("Transfer element 0 is not a transfer source");
198
199     /* Similarly, a destination has no output mechanisms. */
200     xe = (XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1);
201     if (XFER_ELEMENT_GET_CLASS(xe)->mech_pairs[0].output_mech != XFER_MECH_NONE)
202         error("Last transfer element is not a transfer destination");
203
204     /* Link the elements.  This calls error() on failure, and rewrites
205      * xfer->elements */
206     link_elements(xfer);
207
208     /* Tell all elements to set up.  This is done before upstream and downstream
209      * are set so that elements cannot interfere with one another before setup()
210      * is completed. */
211     for (i = 0; i < xfer->elements->len; i++) {
212         XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i);
213         xfer_element_setup(xe);
214     }
215
216     /* Set the upstream and downstream links between elements */
217     len = xfer->elements->len;
218     for (i = 0; i < len; i++) {
219         XferElement *elt = g_ptr_array_index(xfer->elements, i);
220
221         if (i > 0)
222             elt->upstream = g_ptr_array_index(xfer->elements, i-1);
223         if (i < len-1)
224             elt->downstream = g_ptr_array_index(xfer->elements, i+1);
225     }
226
227     /* now tell them all to start, in order from destination to source */
228     for (i = xfer->elements->len; i >= 1; i--) {
229         XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1);
230         if (xfer_element_start(xe))
231             xfer->num_active_elements++;
232     }
233
234     /* (note that status can only change in the main thread, so we can be
235      * certain that the status is still XFER_START and we have not yet been
236      * cancelled.  We may have an XMSG_CANCEL already queued up for us, though) */
237     xfer_set_status(xfer, XFER_RUNNING);
238
239     /* If this transfer involves no active processing, then we consider it to
240      * be done already.  We send a "fake" XMSG_DONE from the destination element,
241      * so that all of the usual processing will take place. */
242     if (xfer->num_active_elements == 0) {
243         g_debug("%s has no active elements; generating fake XMSG_DONE", xfer_repr(xfer));
244         xfer->num_active_elements++;
245         xfer_queue_message(xfer,
246             xmsg_new((XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1),
247                      XMSG_DONE, 0));
248     }
249 }
250
251 void
252 xfer_cancel(
253     Xfer *xfer)
254 {
255     /* Since xfer_cancel can be called from any thread, we just send a message.
256      * The action takes place when the message is received. */
257     XferElement *src = g_ptr_array_index(xfer->elements, 0);
258     xfer_queue_message(xfer, xmsg_new(src, XMSG_CANCEL, 0));
259 }
260
261 static void
262 xfer_set_status(
263     Xfer *xfer,
264     xfer_status status)
265 {
266     if (xfer->status == status) return;
267
268     g_mutex_lock(xfer->status_mutex);
269
270     /* check that this state transition is valid */
271     switch (status) {
272     case XFER_START:
273         g_assert(xfer->status == XFER_INIT);
274         break;
275     case XFER_RUNNING:
276         g_assert(xfer->status == XFER_START);
277         break;
278     case XFER_CANCELLING:
279         g_assert(xfer->status == XFER_RUNNING);
280         break;
281     case XFER_CANCELLED:
282         g_assert(xfer->status == XFER_CANCELLING);
283         break;
284     case XFER_DONE:
285         g_assert(xfer->status == XFER_CANCELLED || xfer->status == XFER_RUNNING);
286         break;
287     case XFER_INIT:
288     default:
289         g_assert_not_reached();
290     }
291
292     xfer->status = status;
293     g_cond_broadcast(xfer->status_cond);
294     g_mutex_unlock(xfer->status_mutex);
295 }
296
297 /*
298  * Element linking
299  */
300
301 /* How is ELT linked? link_recurse uses an array of these to track its progress
302  * and find the optimal overall linkage. */
303 typedef struct linkage {
304     XferElement *elt;
305     int elt_idx; /* index into elt's mech_pairs */
306     int glue_idx; /* index into glue pairs for elt's output; -1 = no glue */
307 } linkage;
308
309 /* Overall state of the recursive linking process */
310 typedef struct linking_state {
311     int nlinks; /* number of linkage objects in each array */
312     linkage *cur; /* "current" linkage */
313
314     linkage *best; /* best linkage so far */
315     gint32 best_cost; /* cost for best */
316 } linking_state;
317
318 /* used for debugging messages */
319 static char *
320 xfer_mech_name(
321     xfer_mech mech)
322 {
323     switch (mech) {
324         case XFER_MECH_NONE: return "NONE";
325         case XFER_MECH_READFD: return "READFD";
326         case XFER_MECH_WRITEFD: return "WRITEFD";
327         case XFER_MECH_PULL_BUFFER: return "PULL_BUFFER";
328         case XFER_MECH_PUSH_BUFFER: return "PUSH_BUFFER";
329         default: return "UNKNOWN";
330     }
331 }
332
333 /* calculate an integer representing the cost of a mech pair as a
334  * single integer.  OPS_PER_BYTE is the most important metric,
335  * followed by NTHREADS.
336  *
337  * PAIR will be evaluated multiple times.
338  */
339 #define PAIR_COST(pair) (((pair).ops_per_byte << 8) + (pair).nthreads)
340
341 /* maximum cost */
342 #define MAX_COST 0xffffff
343
344 /* Generate all possible linkages of elements [idx:nlinks], where
345  * elements [0:idx-1] have cost 'cost' and end with mechanism
346  * 'input_mech'. */
347 static void
348 link_recurse(
349     linking_state *st,
350     int idx,
351     xfer_mech input_mech,
352     gint32 cost)
353 {
354     xfer_element_mech_pair_t *elt_pairs, *glue_pairs;
355     linkage *my;
356
357     /* if we've overrun the previous best cost already, then bail out */
358     if (cost >= st->best_cost)
359         return;
360
361     /* have we linked everything? */
362     if (idx == st->nlinks) {
363         /* if we ended on other than XFER_MECH_NONE, then this is not a
364          * valid transfer */
365         if (input_mech != XFER_MECH_NONE) return;
366
367         /* we already know this has lower cost than the previous best */
368         memcpy(st->best, st->cur, st->nlinks * sizeof(linkage));
369         st->best_cost = cost;
370
371         return;
372     }
373
374     /* recurse for each linkage we can make that starts with input_mech */
375     my = &st->cur[idx];
376     elt_pairs = XFER_ELEMENT_GET_CLASS(my->elt)->mech_pairs;
377     glue_pairs = xfer_element_glue_mech_pairs;
378
379     for (my->elt_idx = 0;
380          elt_pairs[my->elt_idx].input_mech != XFER_MECH_NONE
381          || elt_pairs[my->elt_idx].output_mech != XFER_MECH_NONE;
382          my->elt_idx++) {
383          /* reject this pair if the input mech does not match */
384          if (elt_pairs[my->elt_idx].input_mech != input_mech)
385             continue;
386
387          /* recurse with no glue */
388          my->glue_idx = -1;
389          link_recurse(st, idx+1,
390                       elt_pairs[my->elt_idx].output_mech,
391                       cost + PAIR_COST(elt_pairs[my->elt_idx]));
392
393         /* and recurse with glue */
394         for (my->glue_idx = 0;
395              glue_pairs[my->glue_idx].input_mech != XFER_MECH_NONE
396              || glue_pairs[my->glue_idx].output_mech != XFER_MECH_NONE;
397              my->glue_idx++) {
398             /* reject this glue pair if it doesn't match with the element output */
399             if (glue_pairs[my->glue_idx].input_mech != elt_pairs[my->elt_idx].output_mech)
400                 continue;
401
402              /* and recurse with the glue */
403              link_recurse(st, idx+1,
404                           glue_pairs[my->glue_idx].output_mech,
405                           cost + PAIR_COST(elt_pairs[my->elt_idx])
406                                + PAIR_COST(glue_pairs[my->glue_idx]));
407         }
408     }
409 }
410
411 static void
412 link_elements(
413     Xfer *xfer)
414 {
415     GPtrArray *new_elements;
416     XferElement *elt;
417     XferElementClass *eltc;
418     char *linkage_str;
419     linking_state st;
420     gint i, len;
421
422     /* Note that this algorithm's running time is polynomial in the length of
423      * the transfer, with a fairly high order.  If Amanda is regularly assembling
424      * transfers with more than, say, 6 elements, then the algorithm should be
425      * redesigned. */
426
427     /* set up the state for recursion */
428     st.nlinks = xfer->elements->len;
429     st.cur = g_new0(linkage, st.nlinks);
430     st.best = g_new0(linkage, st.nlinks);
431     st.best_cost = MAX_COST;
432     for (i = 0; i < st.nlinks; i++) {
433         st.cur[i].elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
434     }
435
436     /* start recursing with the first element, asserting that its input mech is NONE */
437     link_recurse(&st, 0, XFER_MECH_NONE, 0);
438
439     /* check that we got *some* solution */
440     if (st.best_cost == MAX_COST) {
441         error(_("Xfer %s cannot be linked."), xfer_repr(xfer));
442     }
443
444     /* Now create a new list of elements, containing any glue elements
445      * that we need to add, and set their input_mech and output_mech fields */
446     new_elements = g_ptr_array_sized_new(xfer->elements->len);
447     for (i = 0; i < st.nlinks; i++) {
448         elt = st.best[i].elt;
449         eltc = XFER_ELEMENT_GET_CLASS(elt);
450         elt->input_mech = eltc->mech_pairs[st.best[i].elt_idx].input_mech;
451         elt->output_mech = eltc->mech_pairs[st.best[i].elt_idx].output_mech;
452         g_ptr_array_add(new_elements, elt);
453
454         if (st.best[i].glue_idx != -1) {
455             elt = xfer_element_glue();
456             eltc = XFER_ELEMENT_GET_CLASS(elt);
457             elt->xfer = xfer;
458             elt->input_mech = eltc->mech_pairs[st.best[i].glue_idx].input_mech;
459             elt->output_mech = eltc->mech_pairs[st.best[i].glue_idx].output_mech;
460             g_ptr_array_add(new_elements, elt);
461         }
462     }
463
464     /* install the new list of elements */
465     g_ptr_array_free(xfer->elements, FALSE);
466     xfer->elements = new_elements;
467     new_elements = NULL;
468
469     /* debug-log the xfer's linkage */
470     len = xfer->elements->len;
471     linkage_str = stralloc("Final linkage: ");
472     for (i = 0; i < len; i++) {
473         XferElement *elt = g_ptr_array_index(xfer->elements, i);
474
475         if (i == 0)
476             linkage_str = newvstralloc(linkage_str, linkage_str, xfer_element_repr(elt), NULL);
477         else
478             linkage_str = newvstrallocf(linkage_str, "%s -(%s)-> %s",
479                 linkage_str, xfer_mech_name(elt->input_mech), xfer_element_repr(elt));
480     }
481     g_debug("%s", linkage_str);
482     amfree(linkage_str);
483
484     amfree(st.cur);
485     amfree(st.best);
486 }
487
488 /*
489  * XMsgSource
490  */
491
492 static gboolean
493 xmsgsource_prepare(
494     GSource *source,
495     gint *timeout_)
496 {
497     XMsgSource *xms = (XMsgSource *)source;
498
499     *timeout_ = -1;
500     return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
501 }
502
503 static gboolean
504 xmsgsource_check(
505     GSource *source)
506 {
507     XMsgSource *xms = (XMsgSource *)source;
508
509     return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
510 }
511
512 static gboolean
513 xmsgsource_dispatch(
514     GSource *source G_GNUC_UNUSED,
515     GSourceFunc callback,
516     gpointer user_data)
517 {
518     XMsgSource *xms = (XMsgSource *)source;
519     Xfer *xfer = xms->xfer;
520     XMsgCallback my_cb = (XMsgCallback)callback;
521     XMsg *msg;
522     gboolean deliver_to_caller;
523     guint i;
524     gboolean xfer_done = FALSE;
525
526     /* we're potentially calling Perl code within this loop, so we have to
527      * check that everything is ok on each iteration of the loop. */
528     while (xfer
529         && xfer->status != XFER_DONE
530         && (msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
531
532         /* We get first crack at interpreting messages, before calling the
533          * designated callback. */
534         deliver_to_caller = TRUE;
535         switch (msg->type) {
536             /* Intercept and count DONE messages so that we can determine when
537              * the entire transfer is finished. */
538             case XMSG_DONE:
539                 if (--xfer->num_active_elements <= 0) {
540                     /* mark the transfer as done, and take a note to break out
541                      * of this loop after delivering the message to the user */
542                     xfer_set_status(xfer, XFER_DONE);
543                     xfer_done = TRUE;
544                 } else {
545                     /* eat this XMSG_DONE, since we expect more */
546                     deliver_to_caller = FALSE;
547                 }
548                 break;
549
550             case XMSG_CANCEL:
551                 if (xfer->status == XFER_CANCELLING || xfer->status == XFER_CANCELLED) {
552                     /* ignore duplicate cancel messages */
553                     deliver_to_caller = FALSE;
554                 } else {
555                     /* call cancel() on each child element */
556                     gboolean expect_eof;
557
558                     g_debug("Cancelling %s", xfer_repr(xfer));
559                     xfer_set_status(xfer, XFER_CANCELLING);
560
561                     expect_eof = FALSE;
562                     for (i = 0; i < xfer->elements->len; i++) {
563                         XferElement *elt = (XferElement *)
564                                 g_ptr_array_index(xfer->elements, i);
565                         expect_eof = xfer_element_cancel(elt, expect_eof) || expect_eof;
566                     }
567
568                     /* if nothing in the transfer can generate an EOF, then we
569                      * can't cancel this transfer, and we'll just have to wait
570                      * until it's finished.  This may happen, for example, if
571                      * the operating system is copying data for us
572                      * asynchronously */
573                     if (!expect_eof)
574                         g_warning("Transfer %s cannot be cancelled.", xfer_repr(xfer));
575
576                     /* and now we're done cancelling */
577                     xfer_set_status(xfer, XFER_CANCELLED);
578                 }
579                 break;
580
581             default:
582                 break;  /* nothing interesting to do */
583         }
584
585         if (deliver_to_caller) {
586             if (my_cb) {
587                 my_cb(user_data, msg, xfer);
588             } else {
589                 g_warning("Dropping %s because no callback is set", xmsg_repr(msg));
590             }
591         }
592
593         xmsg_free(msg);
594
595         /* This transfer is done, so kill it and exit the loop */
596         if (xfer_done) {
597             xfer_unref(xfer);
598             xfer = NULL;
599             break;
600         }
601     }
602
603     /* Never automatically un-queue the event source */
604     return TRUE;
605 }
606
607 XMsgSource *
608 xmsgsource_new(
609     Xfer *xfer)
610 {
611     static GSourceFuncs *xmsgsource_funcs = NULL;
612     GSource *src;
613     XMsgSource *xms;
614
615     /* initialize these here to avoid a compiler warning */
616     if (!xmsgsource_funcs) {
617         xmsgsource_funcs = g_new0(GSourceFuncs, 1);
618         xmsgsource_funcs->prepare = xmsgsource_prepare;
619         xmsgsource_funcs->check = xmsgsource_check;
620         xmsgsource_funcs->dispatch = xmsgsource_dispatch;
621     }
622
623     src = g_source_new(xmsgsource_funcs, sizeof(XMsgSource));
624     xms = (XMsgSource *)src;
625     xms->xfer = xfer;
626
627     return xms;
628 }