Imported Upstream version 3.3.2
[debian/amanda] / xfer-src / xfer.c
1 /*
2  * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  *
17  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 #include "amanda.h"
22 #include "amxfer.h"
23 #include "element-glue.h"
24 #include "arglist.h"
25
26 /* XMsgSource objects are GSource "subclasses" which manage
27  * a queue of messages, delivering those messages via callback
28  * in the mainloop.  Messages can be *sent* from any thread without
29  * any concern for locking, but must only be received in the main
30  * thread, in the default GMainContext.
31  *
32  * An XMsgSource pointer can be cast to a GSource pointer as
33  * necessary.
34  */
35 typedef struct XMsgSource {
36     GSource source; /* must be the first element of the struct */
37     Xfer *xfer;
38 } XMsgSource;
39
40 /* forward prototypes */
41 static void xfer_set_status(Xfer *xfer, xfer_status status);
42 static XMsgSource *xmsgsource_new(Xfer *xfer);
43 static void link_elements(Xfer *xfer);
44
45 Xfer *
46 xfer_new(
47     XferElement **elements,
48     unsigned int nelements)
49 {
50     Xfer *xfer = g_new0(Xfer, 1);
51     unsigned int i;
52
53     g_assert(elements);
54     g_assert(nelements >= 2);
55
56     xfer->status = XFER_INIT;
57     xfer->status_mutex = g_mutex_new();
58     xfer->status_cond = g_cond_new();
59     xfer->fd_mutex = g_mutex_new();
60
61     xfer->refcount = 1;
62     xfer->repr = NULL;
63
64     /* Create our message source and corresponding queue */
65     xfer->msg_source = xmsgsource_new(xfer);
66     xfer->queue = g_async_queue_new();
67
68     /* copy the elements in, verifying that they're all XferElement objects */
69     xfer->elements = g_ptr_array_sized_new(nelements);
70     for (i = 0; i < nelements; i++) {
71         g_assert(elements[i] != NULL);
72         g_assert(IS_XFER_ELEMENT(elements[i]));
73         g_assert(elements[i]->xfer == NULL);
74
75         g_ptr_array_add(xfer->elements, (gpointer)elements[i]);
76
77         g_object_ref(elements[i]);
78         elements[i]->xfer = xfer;
79     }
80
81     return xfer;
82 }
83
84 void
85 xfer_ref(
86     Xfer *xfer)
87 {
88     ++xfer->refcount;
89 }
90
91 void
92 xfer_unref(
93     Xfer *xfer)
94 {
95     unsigned int i;
96     XMsg *msg;
97
98     if (!xfer) return; /* be friendly to NULLs */
99
100     if (--xfer->refcount > 0) return;
101
102     g_assert(xfer != NULL);
103     g_assert(xfer->status == XFER_INIT || xfer->status == XFER_DONE);
104
105     /* Divorce ourselves from the message source */
106     xfer->msg_source->xfer = NULL;
107     g_source_unref((GSource *)xfer->msg_source);
108     xfer->msg_source = NULL;
109
110     /* Try to empty the message queue */
111     while ((msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
112         g_warning("Dropping XMsg from %s because the XMsgSource is being destroyed", 
113             xfer_element_repr(msg->elt));
114         xmsg_free(msg);
115     }
116     g_async_queue_unref(xfer->queue);
117
118     g_mutex_free(xfer->status_mutex);
119     g_cond_free(xfer->status_cond);
120     g_mutex_free(xfer->fd_mutex);
121
122     /* Free our references to the elements, and also set the 'xfer'
123      * attribute of each to NULL, making them "unattached" (although 
124      * subsequent reuse of elements is untested). */
125     for (i = 0; i < xfer->elements->len; i++) {
126         XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
127
128         elt->xfer = NULL;
129         g_object_unref(elt);
130     }
131     g_ptr_array_free(xfer->elements, TRUE);
132
133     if (xfer->repr)
134         g_free(xfer->repr);
135
136     g_free(xfer);
137 }
138
139 GSource *
140 xfer_get_source(
141     Xfer *xfer)
142 {
143     return (GSource *)xfer->msg_source;
144 }
145
146 void
147 xfer_queue_message(
148     Xfer *xfer,
149     XMsg *msg)
150 {
151     g_assert(xfer != NULL);
152     g_assert(msg != NULL);
153
154     g_async_queue_push(xfer->queue, (gpointer)msg);
155
156     /* TODO: don't do this if we're in the main thread */
157     g_main_context_wakeup(NULL);
158 }
159
160 char *
161 xfer_repr(
162     Xfer *xfer)
163 {
164     unsigned int i;
165
166     if (!xfer->repr) {
167         xfer->repr = newvstrallocf(xfer->repr, "<Xfer@%p (", xfer);
168         for (i = 0; i < xfer->elements->len; i++) {
169             XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
170             xfer->repr = newvstralloc(xfer->repr,
171                 xfer->repr, (i==0)?"":" -> ", xfer_element_repr(elt), NULL);
172         }
173         xfer->repr = newvstralloc(xfer->repr, xfer->repr, ")>", NULL);
174     }
175
176     return xfer->repr;
177 }
178
179 void
180 xfer_start(
181     Xfer *xfer,
182     gint64 offset G_GNUC_UNUSED,
183     gint64 size)
184 {
185     unsigned int len;
186     unsigned int i;
187     gboolean setup_ok;
188
189     g_assert(xfer != NULL);
190     g_assert(xfer->status == XFER_INIT);
191     g_assert(xfer->elements->len >= 2);
192     g_assert(offset == 0);
193
194     g_debug("Starting %s", xfer_repr(xfer));
195     /* set the status to XFER_START and add a reference to our count, so that
196      * we are not freed while still in operation.  We'll drop this reference
197      * when the status becomes XFER_DONE. */
198     xfer_ref(xfer);
199     xfer->num_active_elements = 0;
200     xfer_set_status(xfer, XFER_START);
201
202     /* Link the elements.  This calls error() on failure, and rewrites
203      * xfer->elements */
204     link_elements(xfer);
205
206     /* Tell all elements to set up.  This is done before upstream and downstream
207      * are set so that elements cannot interfere with one another before setup()
208      * is completed. */
209     setup_ok = TRUE;
210     for (i = 0; i < xfer->elements->len; i++) {
211         XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i);
212         if (!xfer_element_setup(xe)) {
213             setup_ok = FALSE;
214             break;
215         }
216     }
217
218     /* If setup_ok is false, then there is an XMSG_CANCEL in the message queue
219      * already, so skip calling start for any of the elements and send an
220      * XMSG_DONE, since none of the elements will do so. */
221
222     if (setup_ok) {
223         /* Set the upstream and downstream links between elements */
224         len = xfer->elements->len;
225         for (i = 0; i < len; i++) {
226             XferElement *elt = g_ptr_array_index(xfer->elements, i);
227
228             if (i > 0)
229                 elt->upstream = g_ptr_array_index(xfer->elements, i-1);
230             if (i < len-1)
231                 elt->downstream = g_ptr_array_index(xfer->elements, i+1);
232         }
233
234         /* Set size for first element */
235         if (size) {
236             XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, 0);
237             xfer_element_set_size(xe, size);
238         }
239
240         /* now tell them all to start, in order from destination to source */
241         for (i = xfer->elements->len; i >= 1; i--) {
242             XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1);
243             if (xfer_element_start(xe))
244                 xfer->num_active_elements++;
245         }
246     }
247
248     /* (note that status can only change in the main thread, so we can be
249      * certain that the status is still XFER_START and we have not yet been
250      * cancelled.  We may have an XMSG_CANCEL already queued up for us, though) */
251     xfer_set_status(xfer, XFER_RUNNING);
252
253     /* If this transfer involves no active processing, then we consider it to
254      * be done already.  We send a "fake" XMSG_DONE from the destination element,
255      * so that all of the usual processing will take place. */
256     if (xfer->num_active_elements == 0) {
257         if (setup_ok)
258             g_debug("%s has no active elements; generating fake XMSG_DONE", xfer_repr(xfer));
259         xfer->num_active_elements++;
260         xfer_queue_message(xfer,
261             xmsg_new((XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1),
262                      XMSG_DONE, 0));
263     }
264 }
265
266 void
267 xfer_cancel(
268     Xfer *xfer)
269 {
270     /* Since xfer_cancel can be called from any thread, we just send a message.
271      * The action takes place when the message is received. */
272     XferElement *src;
273     if (xfer->cancelled > 0) return;
274     xfer->cancelled++;
275     src = g_ptr_array_index(xfer->elements, 0);
276     xfer_queue_message(xfer, xmsg_new(src, XMSG_CANCEL, 0));
277 }
278
279 static void
280 xfer_set_status(
281     Xfer *xfer,
282     xfer_status status)
283 {
284     if (xfer->status == status) return;
285
286     g_mutex_lock(xfer->status_mutex);
287
288     /* check that this state transition is valid */
289     switch (status) {
290     case XFER_START:
291         g_assert(xfer->status == XFER_INIT);
292         break;
293     case XFER_RUNNING:
294         g_assert(xfer->status == XFER_START);
295         break;
296     case XFER_CANCELLING:
297         g_assert(xfer->status == XFER_RUNNING);
298         break;
299     case XFER_CANCELLED:
300         g_assert(xfer->status == XFER_CANCELLING);
301         break;
302     case XFER_DONE:
303         g_assert(xfer->status == XFER_CANCELLED || xfer->status == XFER_RUNNING);
304         break;
305     case XFER_INIT:
306     default:
307         g_assert_not_reached();
308     }
309
310     xfer->status = status;
311     g_cond_broadcast(xfer->status_cond);
312     g_mutex_unlock(xfer->status_mutex);
313 }
314
315 /*
316  * Element linking
317  */
318
319 /* How is ELT linked? link_recurse uses an array of these to track its progress
320  * and find the optimal overall linkage. */
321 typedef struct linkage {
322     XferElement *elt;
323     xfer_element_mech_pair_t *mech_pairs;
324     int elt_idx; /* index into elt's mech_pairs */
325     int glue_idx; /* index into glue pairs for elt's output; -1 = no glue */
326 } linkage;
327
328 /* Overall state of the recursive linking process */
329 typedef struct linking_state {
330     int nlinks; /* number of linkage objects in each array */
331     linkage *cur; /* "current" linkage */
332
333     linkage *best; /* best linkage so far */
334     gint32 best_cost; /* cost for best */
335 } linking_state;
336
337 /* used for debugging messages */
338 static char *
339 xfer_mech_name(
340     xfer_mech mech)
341 {
342     switch (mech) {
343         case XFER_MECH_NONE: return "NONE";
344         case XFER_MECH_READFD: return "READFD";
345         case XFER_MECH_WRITEFD: return "WRITEFD";
346         case XFER_MECH_PULL_BUFFER: return "PULL_BUFFER";
347         case XFER_MECH_PUSH_BUFFER: return "PUSH_BUFFER";
348         case XFER_MECH_DIRECTTCP_LISTEN: return "DIRECTTCP_LISTEN";
349         case XFER_MECH_DIRECTTCP_CONNECT: return "DIRECTTCP_CONNECT";
350         default: return "UNKNOWN";
351     }
352 }
353
354 /* calculate an integer representing the cost of a mech pair as a
355  * single integer.  OPS_PER_BYTE is the most important metric,
356  * followed by NTHREADS.
357  *
358  * PAIR will be evaluated multiple times.
359  */
360 #define PAIR_COST(pair) (((pair).ops_per_byte << 8) + (pair).nthreads)
361
362 /* maximum cost */
363 #define MAX_COST 0xffffff
364
365 /* Generate all possible linkages of elements [idx:nlinks], where
366  * elements [0:idx-1] have cost 'cost' and end with mechanism
367  * 'input_mech'. */
368 static void
369 link_recurse(
370     linking_state *st,
371     int idx,
372     xfer_mech input_mech,
373     gint32 cost)
374 {
375     xfer_element_mech_pair_t *elt_pairs, *glue_pairs;
376     linkage *my;
377
378     /* if we've overrun the previous best cost already, then bail out */
379     if (cost >= st->best_cost)
380         return;
381
382     /* have we linked everything? */
383     if (idx == st->nlinks) {
384         /* if we ended on other than XFER_MECH_NONE, then this is not a
385          * valid transfer */
386         if (input_mech != XFER_MECH_NONE) return;
387
388         /* we already know this has lower cost than the previous best */
389         memcpy(st->best, st->cur, st->nlinks * sizeof(linkage));
390         st->best_cost = cost;
391
392         return;
393     }
394
395     /* recurse for each linkage we can make that starts with input_mech */
396     my = &st->cur[idx];
397     elt_pairs = my->mech_pairs;
398     glue_pairs = xfer_element_glue_mech_pairs;
399
400     for (my->elt_idx = 0;
401          elt_pairs[my->elt_idx].input_mech != XFER_MECH_NONE
402          || elt_pairs[my->elt_idx].output_mech != XFER_MECH_NONE;
403          my->elt_idx++) {
404          /* reject this pair if the input mech does not match */
405          if (elt_pairs[my->elt_idx].input_mech != input_mech)
406             continue;
407
408          /* recurse with no glue */
409          my->glue_idx = -1;
410          link_recurse(st, idx+1,
411                       elt_pairs[my->elt_idx].output_mech,
412                       cost + PAIR_COST(elt_pairs[my->elt_idx]));
413
414         /* and recurse with glue */
415         for (my->glue_idx = 0;
416              glue_pairs[my->glue_idx].input_mech != XFER_MECH_NONE
417              || glue_pairs[my->glue_idx].output_mech != XFER_MECH_NONE;
418              my->glue_idx++) {
419             /* reject this glue pair if it doesn't match with the element output */
420             if (glue_pairs[my->glue_idx].input_mech != elt_pairs[my->elt_idx].output_mech)
421                 continue;
422
423              /* and recurse with the glue */
424              link_recurse(st, idx+1,
425                           glue_pairs[my->glue_idx].output_mech,
426                           cost + PAIR_COST(elt_pairs[my->elt_idx])
427                                + PAIR_COST(glue_pairs[my->glue_idx]));
428         }
429     }
430 }
431
432 static void
433 link_elements(
434     Xfer *xfer)
435 {
436     GPtrArray *new_elements;
437     XferElement *elt;
438     char *linkage_str;
439     linking_state st;
440     gint i, len;
441
442     /* Note that this algorithm's running time is polynomial in the length of
443      * the transfer, with a fairly high order.  If Amanda is regularly assembling
444      * transfers with more than, say, 6 elements, then the algorithm should be
445      * redesigned. */
446
447     /* set up the state for recursion */
448     st.nlinks = xfer->elements->len;
449     st.cur = g_new0(linkage, st.nlinks);
450     st.best = g_new0(linkage, st.nlinks);
451     st.best_cost = MAX_COST;
452     for (i = 0; i < st.nlinks; i++) {
453         st.cur[i].elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
454         st.cur[i].mech_pairs = xfer_element_get_mech_pairs(st.cur[i].elt);
455     }
456
457     /* check that the first element is an XferSource and the last is an XferDest.
458      * A source is identified by having no input mechanisms. */
459     if (st.cur[0].mech_pairs[0].input_mech != XFER_MECH_NONE)
460         error("Transfer element 0 is not a transfer source");
461
462     /* Similarly, a destination has no output mechanisms. */
463     if (st.cur[st.nlinks-1].mech_pairs[0].output_mech != XFER_MECH_NONE)
464         error("Last transfer element is not a transfer destination");
465
466     /* start recursing with the first element, asserting that its input mech is NONE */
467     link_recurse(&st, 0, XFER_MECH_NONE, 0);
468
469     /* check that we got *some* solution */
470     if (st.best_cost == MAX_COST) {
471         error(_("Xfer %s cannot be linked."), xfer_repr(xfer));
472     }
473
474     /* Now create a new list of elements, containing any glue elements
475      * that we need to add, and set their input_mech and output_mech fields */
476     new_elements = g_ptr_array_sized_new(xfer->elements->len);
477     for (i = 0; i < st.nlinks; i++) {
478         elt = st.best[i].elt;
479         elt->input_mech = st.best[i].mech_pairs[st.best[i].elt_idx].input_mech;
480         elt->output_mech = st.best[i].mech_pairs[st.best[i].elt_idx].output_mech;
481         g_ptr_array_add(new_elements, elt);
482
483         if (st.best[i].glue_idx != -1) {
484             elt = xfer_element_glue();
485             elt->xfer = xfer;
486             elt->input_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].input_mech;
487             elt->output_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].output_mech;
488             g_ptr_array_add(new_elements, elt);
489         }
490     }
491
492     /* install the new list of elements */
493     g_ptr_array_free(xfer->elements, FALSE);
494     xfer->elements = new_elements;
495     new_elements = NULL;
496
497     /* debug-log the xfer's linkage */
498     len = xfer->elements->len;
499     linkage_str = stralloc("Final linkage: ");
500     for (i = 0; i < len; i++) {
501         XferElement *elt = g_ptr_array_index(xfer->elements, i);
502
503         if (i == 0)
504             linkage_str = newvstralloc(linkage_str, linkage_str, xfer_element_repr(elt), NULL);
505         else
506             linkage_str = newvstrallocf(linkage_str, "%s -(%s)-> %s",
507                 linkage_str, xfer_mech_name(elt->input_mech), xfer_element_repr(elt));
508     }
509     g_debug("%s", linkage_str);
510     amfree(linkage_str);
511
512     amfree(st.cur);
513     amfree(st.best);
514 }
515
516 /*
517  * XMsgSource
518  */
519
520 static gboolean
521 xmsgsource_prepare(
522     GSource *source,
523     gint *timeout_)
524 {
525     XMsgSource *xms = (XMsgSource *)source;
526
527     *timeout_ = -1;
528     return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
529 }
530
531 static gboolean
532 xmsgsource_check(
533     GSource *source)
534 {
535     XMsgSource *xms = (XMsgSource *)source;
536
537     return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
538 }
539
540 static gboolean
541 xmsgsource_dispatch(
542     GSource *source G_GNUC_UNUSED,
543     GSourceFunc callback,
544     gpointer user_data)
545 {
546     XMsgSource *xms = (XMsgSource *)source;
547     Xfer *xfer = xms->xfer;
548     XMsgCallback my_cb = (XMsgCallback)callback;
549     XMsg *msg;
550     gboolean deliver_to_caller;
551     guint i;
552     gboolean xfer_done = FALSE;
553
554     /* we're potentially calling Perl code within this loop, so we have to
555      * check that everything is ok on each iteration of the loop. */
556     while (xfer
557         && xfer->status != XFER_DONE
558         && (msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
559
560         /* We get first crack at interpreting messages, before calling the
561          * designated callback. */
562         deliver_to_caller = TRUE;
563         switch (msg->type) {
564             /* Intercept and count DONE messages so that we can determine when
565              * the entire transfer is finished. */
566             case XMSG_DONE:
567                 if (--xfer->num_active_elements <= 0) {
568                     /* mark the transfer as done, and take a note to break out
569                      * of this loop after delivering the message to the user */
570                     xfer_set_status(xfer, XFER_DONE);
571                     xfer_done = TRUE;
572                 } else {
573                     /* eat this XMSG_DONE, since we expect more */
574                     deliver_to_caller = FALSE;
575                 }
576                 break;
577
578             case XMSG_CANCEL:
579                 if (xfer->status == XFER_CANCELLING || xfer->status == XFER_CANCELLED) {
580                     /* ignore duplicate cancel messages */
581                     deliver_to_caller = FALSE;
582                 } else {
583                     /* call cancel() on each child element */
584                     gboolean expect_eof;
585
586                     g_debug("Cancelling %s", xfer_repr(xfer));
587                     xfer_set_status(xfer, XFER_CANCELLING);
588
589                     expect_eof = FALSE;
590                     for (i = 0; i < xfer->elements->len; i++) {
591                         XferElement *elt = (XferElement *)
592                                 g_ptr_array_index(xfer->elements, i);
593                         expect_eof = xfer_element_cancel(elt, expect_eof) || expect_eof;
594                     }
595
596                     /* if nothing in the transfer can generate an EOF, then we
597                      * can't cancel this transfer, and we'll just have to wait
598                      * until it's finished.  This may happen, for example, if
599                      * the operating system is copying data for us
600                      * asynchronously */
601                     if (!expect_eof)
602                         g_warning("Transfer %s cannot be cancelled.", xfer_repr(xfer));
603
604                     /* and now we're done cancelling */
605                     xfer_set_status(xfer, XFER_CANCELLED);
606                 }
607                 break;
608
609             default:
610                 break;  /* nothing interesting to do */
611         }
612
613         if (deliver_to_caller) {
614             if (my_cb) {
615                 my_cb(user_data, msg, xfer);
616             } else {
617                 g_warning("Dropping %s because no callback is set", xmsg_repr(msg));
618             }
619         }
620
621         xmsg_free(msg);
622
623         /* This transfer is done, so kill it and exit the loop */
624         if (xfer_done) {
625             xfer_unref(xfer);
626             xfer = NULL;
627             break;
628         }
629     }
630
631     /* Never automatically un-queue the event source */
632     return TRUE;
633 }
634
635 XMsgSource *
636 xmsgsource_new(
637     Xfer *xfer)
638 {
639     static GSourceFuncs *xmsgsource_funcs = NULL;
640     GSource *src;
641     XMsgSource *xms;
642
643     /* initialize these here to avoid a compiler warning */
644     if (!xmsgsource_funcs) {
645         xmsgsource_funcs = g_new0(GSourceFuncs, 1);
646         xmsgsource_funcs->prepare = xmsgsource_prepare;
647         xmsgsource_funcs->check = xmsgsource_check;
648         xmsgsource_funcs->dispatch = xmsgsource_dispatch;
649     }
650
651     src = g_source_new(xmsgsource_funcs, sizeof(XMsgSource));
652     xms = (XMsgSource *)src;
653     xms->xfer = xfer;
654
655     return xms;
656 }
657
658 xfer_status
659 wait_until_xfer_cancelled(
660     Xfer *xfer)
661 {
662     xfer_status seen_status;
663     g_assert(xfer != NULL);
664
665     g_mutex_lock(xfer->status_mutex);
666     while (xfer->status != XFER_CANCELLED && xfer->status != XFER_DONE)
667         g_cond_wait(xfer->status_cond, xfer->status_mutex);
668     seen_status = xfer->status;
669     g_mutex_unlock(xfer->status_mutex);
670
671     return seen_status;
672 }
673
674 xfer_status
675 wait_until_xfer_running(
676     Xfer *xfer)
677 {
678     xfer_status seen_status;
679     g_assert(xfer != NULL);
680
681     g_mutex_lock(xfer->status_mutex);
682     while (xfer->status == XFER_START)
683         g_cond_wait(xfer->status_cond, xfer->status_mutex);
684     seen_status = xfer->status;
685     g_mutex_unlock(xfer->status_mutex);
686
687     return seen_status;
688 }
689
690 void
691 xfer_cancel_with_error(
692     XferElement *elt,
693     const char *fmt,
694     ...)
695 {
696     va_list argp;
697     XMsg *msg;
698
699     g_assert(elt != NULL);
700     g_assert(elt->xfer != NULL);
701
702     msg = xmsg_new(elt, XMSG_ERROR, 0);
703
704     arglist_start(argp, fmt);
705     msg->message = g_strdup_vprintf(fmt, argp);
706     arglist_end(argp);
707
708     /* send the XMSG_ERROR */
709     xfer_queue_message(elt->xfer, msg);
710
711     /* cancel the transfer */
712     xfer_cancel(elt->xfer);
713 }
714
715 gint
716 xfer_atomic_swap_fd(Xfer *xfer, gint *fdp, gint newfd)
717 {
718     gint rv;
719
720     if (xfer)
721         g_mutex_lock(xfer->fd_mutex);
722     rv = *fdp;
723     *fdp = newfd;
724     if (xfer)
725         g_mutex_unlock(xfer->fd_mutex);
726
727     return rv;
728 }