Imported Upstream version 3.3.3
[debian/amanda] / xfer-src / xfer.c
1 /*
2  * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amanda.h"
23 #include "amxfer.h"
24 #include "element-glue.h"
25 #include "arglist.h"
26
27 /* XMsgSource objects are GSource "subclasses" which manage
28  * a queue of messages, delivering those messages via callback
29  * in the mainloop.  Messages can be *sent* from any thread without
30  * any concern for locking, but must only be received in the main
31  * thread, in the default GMainContext.
32  *
33  * An XMsgSource pointer can be cast to a GSource pointer as
34  * necessary.
35  */
36 typedef struct XMsgSource {
37     GSource source; /* must be the first element of the struct */
38     Xfer *xfer;
39 } XMsgSource;
40
41 /* forward prototypes */
42 static void xfer_set_status(Xfer *xfer, xfer_status status);
43 static XMsgSource *xmsgsource_new(Xfer *xfer);
44 static void link_elements(Xfer *xfer);
45
46 Xfer *
47 xfer_new(
48     XferElement **elements,
49     unsigned int nelements)
50 {
51     Xfer *xfer = g_new0(Xfer, 1);
52     unsigned int i;
53
54     g_assert(elements);
55     g_assert(nelements >= 2);
56
57     xfer->status = XFER_INIT;
58     xfer->status_mutex = g_mutex_new();
59     xfer->status_cond = g_cond_new();
60     xfer->fd_mutex = g_mutex_new();
61
62     xfer->refcount = 1;
63     xfer->repr = NULL;
64
65     /* Create our message source and corresponding queue */
66     xfer->msg_source = xmsgsource_new(xfer);
67     xfer->queue = g_async_queue_new();
68
69     /* copy the elements in, verifying that they're all XferElement objects */
70     xfer->elements = g_ptr_array_sized_new(nelements);
71     for (i = 0; i < nelements; i++) {
72         g_assert(elements[i] != NULL);
73         g_assert(IS_XFER_ELEMENT(elements[i]));
74         g_assert(elements[i]->xfer == NULL);
75
76         g_ptr_array_add(xfer->elements, (gpointer)elements[i]);
77
78         g_object_ref(elements[i]);
79         elements[i]->xfer = xfer;
80     }
81
82     return xfer;
83 }
84
85 void
86 xfer_ref(
87     Xfer *xfer)
88 {
89     ++xfer->refcount;
90 }
91
92 void
93 xfer_unref(
94     Xfer *xfer)
95 {
96     unsigned int i;
97     XMsg *msg;
98
99     if (!xfer) return; /* be friendly to NULLs */
100
101     if (--xfer->refcount > 0) return;
102
103     g_assert(xfer != NULL);
104     g_assert(xfer->status == XFER_INIT || xfer->status == XFER_DONE);
105
106     /* Divorce ourselves from the message source */
107     xfer->msg_source->xfer = NULL;
108     g_source_unref((GSource *)xfer->msg_source);
109     xfer->msg_source = NULL;
110
111     /* Try to empty the message queue */
112     while ((msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
113         g_warning("Dropping XMsg from %s because the XMsgSource is being destroyed", 
114             xfer_element_repr(msg->elt));
115         xmsg_free(msg);
116     }
117     g_async_queue_unref(xfer->queue);
118
119     g_mutex_free(xfer->status_mutex);
120     g_cond_free(xfer->status_cond);
121     g_mutex_free(xfer->fd_mutex);
122
123     /* Free our references to the elements, and also set the 'xfer'
124      * attribute of each to NULL, making them "unattached" (although 
125      * subsequent reuse of elements is untested). */
126     for (i = 0; i < xfer->elements->len; i++) {
127         XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
128
129         elt->xfer = NULL;
130         g_object_unref(elt);
131     }
132     g_ptr_array_free(xfer->elements, TRUE);
133
134     if (xfer->repr)
135         g_free(xfer->repr);
136
137     g_free(xfer);
138 }
139
140 GSource *
141 xfer_get_source(
142     Xfer *xfer)
143 {
144     return (GSource *)xfer->msg_source;
145 }
146
147 void
148 xfer_queue_message(
149     Xfer *xfer,
150     XMsg *msg)
151 {
152     g_assert(xfer != NULL);
153     g_assert(msg != NULL);
154
155     g_async_queue_push(xfer->queue, (gpointer)msg);
156
157     /* TODO: don't do this if we're in the main thread */
158     g_main_context_wakeup(NULL);
159 }
160
161 char *
162 xfer_repr(
163     Xfer *xfer)
164 {
165     unsigned int i;
166
167     if (!xfer->repr) {
168         xfer->repr = newvstrallocf(xfer->repr, "<Xfer@%p (", xfer);
169         for (i = 0; i < xfer->elements->len; i++) {
170             XferElement *elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
171             xfer->repr = newvstralloc(xfer->repr,
172                 xfer->repr, (i==0)?"":" -> ", xfer_element_repr(elt), NULL);
173         }
174         xfer->repr = newvstralloc(xfer->repr, xfer->repr, ")>", NULL);
175     }
176
177     return xfer->repr;
178 }
179
180 void
181 xfer_start(
182     Xfer *xfer,
183     gint64 offset G_GNUC_UNUSED,
184     gint64 size)
185 {
186     unsigned int len;
187     unsigned int i;
188     gboolean setup_ok;
189
190     g_assert(xfer != NULL);
191     g_assert(xfer->status == XFER_INIT);
192     g_assert(xfer->elements->len >= 2);
193     g_assert(offset == 0);
194
195     g_debug("Starting %s", xfer_repr(xfer));
196     /* set the status to XFER_START and add a reference to our count, so that
197      * we are not freed while still in operation.  We'll drop this reference
198      * when the status becomes XFER_DONE. */
199     xfer_ref(xfer);
200     xfer->num_active_elements = 0;
201     xfer_set_status(xfer, XFER_START);
202
203     /* Link the elements.  This calls error() on failure, and rewrites
204      * xfer->elements */
205     link_elements(xfer);
206
207     /* Tell all elements to set up.  This is done before upstream and downstream
208      * are set so that elements cannot interfere with one another before setup()
209      * is completed. */
210     setup_ok = TRUE;
211     for (i = 0; i < xfer->elements->len; i++) {
212         XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i);
213         if (!xfer_element_setup(xe)) {
214             setup_ok = FALSE;
215             break;
216         }
217     }
218
219     /* If setup_ok is false, then there is an XMSG_CANCEL in the message queue
220      * already, so skip calling start for any of the elements and send an
221      * XMSG_DONE, since none of the elements will do so. */
222
223     if (setup_ok) {
224         /* Set the upstream and downstream links between elements */
225         len = xfer->elements->len;
226         for (i = 0; i < len; i++) {
227             XferElement *elt = g_ptr_array_index(xfer->elements, i);
228
229             if (i > 0)
230                 elt->upstream = g_ptr_array_index(xfer->elements, i-1);
231             if (i < len-1)
232                 elt->downstream = g_ptr_array_index(xfer->elements, i+1);
233         }
234
235         /* Set size for first element */
236         if (size) {
237             XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, 0);
238             xfer_element_set_size(xe, size);
239         }
240
241         /* now tell them all to start, in order from destination to source */
242         for (i = xfer->elements->len; i >= 1; i--) {
243             XferElement *xe = (XferElement *)g_ptr_array_index(xfer->elements, i-1);
244             if (xfer_element_start(xe))
245                 xfer->num_active_elements++;
246         }
247     }
248
249     /* (note that status can only change in the main thread, so we can be
250      * certain that the status is still XFER_START and we have not yet been
251      * cancelled.  We may have an XMSG_CANCEL already queued up for us, though) */
252     xfer_set_status(xfer, XFER_RUNNING);
253
254     /* If this transfer involves no active processing, then we consider it to
255      * be done already.  We send a "fake" XMSG_DONE from the destination element,
256      * so that all of the usual processing will take place. */
257     if (xfer->num_active_elements == 0) {
258         if (setup_ok)
259             g_debug("%s has no active elements; generating fake XMSG_DONE", xfer_repr(xfer));
260         xfer->num_active_elements++;
261         xfer_queue_message(xfer,
262             xmsg_new((XferElement *)g_ptr_array_index(xfer->elements, xfer->elements->len-1),
263                      XMSG_DONE, 0));
264     }
265 }
266
267 void
268 xfer_cancel(
269     Xfer *xfer)
270 {
271     /* Since xfer_cancel can be called from any thread, we just send a message.
272      * The action takes place when the message is received. */
273     XferElement *src;
274     if (xfer->cancelled > 0) return;
275     xfer->cancelled++;
276     src = g_ptr_array_index(xfer->elements, 0);
277     xfer_queue_message(xfer, xmsg_new(src, XMSG_CANCEL, 0));
278 }
279
280 static void
281 xfer_set_status(
282     Xfer *xfer,
283     xfer_status status)
284 {
285     if (xfer->status == status) return;
286
287     g_mutex_lock(xfer->status_mutex);
288
289     /* check that this state transition is valid */
290     switch (status) {
291     case XFER_START:
292         g_assert(xfer->status == XFER_INIT);
293         break;
294     case XFER_RUNNING:
295         g_assert(xfer->status == XFER_START);
296         break;
297     case XFER_CANCELLING:
298         g_assert(xfer->status == XFER_RUNNING);
299         break;
300     case XFER_CANCELLED:
301         g_assert(xfer->status == XFER_CANCELLING);
302         break;
303     case XFER_DONE:
304         g_assert(xfer->status == XFER_CANCELLED || xfer->status == XFER_RUNNING);
305         break;
306     case XFER_INIT:
307     default:
308         g_assert_not_reached();
309     }
310
311     xfer->status = status;
312     g_cond_broadcast(xfer->status_cond);
313     g_mutex_unlock(xfer->status_mutex);
314 }
315
316 /*
317  * Element linking
318  */
319
320 /* How is ELT linked? link_recurse uses an array of these to track its progress
321  * and find the optimal overall linkage. */
322 typedef struct linkage {
323     XferElement *elt;
324     xfer_element_mech_pair_t *mech_pairs;
325     int elt_idx; /* index into elt's mech_pairs */
326     int glue_idx; /* index into glue pairs for elt's output; -1 = no glue */
327 } linkage;
328
329 /* Overall state of the recursive linking process */
330 typedef struct linking_state {
331     int nlinks; /* number of linkage objects in each array */
332     linkage *cur; /* "current" linkage */
333
334     linkage *best; /* best linkage so far */
335     gint32 best_cost; /* cost for best */
336 } linking_state;
337
338 /* used for debugging messages */
339 static char *
340 xfer_mech_name(
341     xfer_mech mech)
342 {
343     switch (mech) {
344         case XFER_MECH_NONE: return "NONE";
345         case XFER_MECH_READFD: return "READFD";
346         case XFER_MECH_WRITEFD: return "WRITEFD";
347         case XFER_MECH_PULL_BUFFER: return "PULL_BUFFER";
348         case XFER_MECH_PUSH_BUFFER: return "PUSH_BUFFER";
349         case XFER_MECH_DIRECTTCP_LISTEN: return "DIRECTTCP_LISTEN";
350         case XFER_MECH_DIRECTTCP_CONNECT: return "DIRECTTCP_CONNECT";
351         default: return "UNKNOWN";
352     }
353 }
354
355 /* calculate an integer representing the cost of a mech pair as a
356  * single integer.  OPS_PER_BYTE is the most important metric,
357  * followed by NTHREADS.
358  *
359  * PAIR will be evaluated multiple times.
360  */
361 #define PAIR_COST(pair) (((pair).ops_per_byte << 8) + (pair).nthreads)
362
363 /* maximum cost */
364 #define MAX_COST 0xffffff
365
366 /* Generate all possible linkages of elements [idx:nlinks], where
367  * elements [0:idx-1] have cost 'cost' and end with mechanism
368  * 'input_mech'. */
369 static void
370 link_recurse(
371     linking_state *st,
372     int idx,
373     xfer_mech input_mech,
374     gint32 cost)
375 {
376     xfer_element_mech_pair_t *elt_pairs, *glue_pairs;
377     linkage *my;
378
379     /* if we've overrun the previous best cost already, then bail out */
380     if (cost >= st->best_cost)
381         return;
382
383     /* have we linked everything? */
384     if (idx == st->nlinks) {
385         /* if we ended on other than XFER_MECH_NONE, then this is not a
386          * valid transfer */
387         if (input_mech != XFER_MECH_NONE) return;
388
389         /* we already know this has lower cost than the previous best */
390         memcpy(st->best, st->cur, st->nlinks * sizeof(linkage));
391         st->best_cost = cost;
392
393         return;
394     }
395
396     /* recurse for each linkage we can make that starts with input_mech */
397     my = &st->cur[idx];
398     elt_pairs = my->mech_pairs;
399     glue_pairs = xfer_element_glue_mech_pairs;
400
401     for (my->elt_idx = 0;
402          elt_pairs[my->elt_idx].input_mech != XFER_MECH_NONE
403          || elt_pairs[my->elt_idx].output_mech != XFER_MECH_NONE;
404          my->elt_idx++) {
405          /* reject this pair if the input mech does not match */
406          if (elt_pairs[my->elt_idx].input_mech != input_mech)
407             continue;
408
409          /* recurse with no glue */
410          my->glue_idx = -1;
411          link_recurse(st, idx+1,
412                       elt_pairs[my->elt_idx].output_mech,
413                       cost + PAIR_COST(elt_pairs[my->elt_idx]));
414
415         /* and recurse with glue */
416         for (my->glue_idx = 0;
417              glue_pairs[my->glue_idx].input_mech != XFER_MECH_NONE
418              || glue_pairs[my->glue_idx].output_mech != XFER_MECH_NONE;
419              my->glue_idx++) {
420             /* reject this glue pair if it doesn't match with the element output */
421             if (glue_pairs[my->glue_idx].input_mech != elt_pairs[my->elt_idx].output_mech)
422                 continue;
423
424              /* and recurse with the glue */
425              link_recurse(st, idx+1,
426                           glue_pairs[my->glue_idx].output_mech,
427                           cost + PAIR_COST(elt_pairs[my->elt_idx])
428                                + PAIR_COST(glue_pairs[my->glue_idx]));
429         }
430     }
431 }
432
433 static void
434 link_elements(
435     Xfer *xfer)
436 {
437     GPtrArray *new_elements;
438     XferElement *elt;
439     char *linkage_str;
440     linking_state st;
441     gint i, len;
442
443     /* Note that this algorithm's running time is polynomial in the length of
444      * the transfer, with a fairly high order.  If Amanda is regularly assembling
445      * transfers with more than, say, 6 elements, then the algorithm should be
446      * redesigned. */
447
448     /* set up the state for recursion */
449     st.nlinks = xfer->elements->len;
450     st.cur = g_new0(linkage, st.nlinks);
451     st.best = g_new0(linkage, st.nlinks);
452     st.best_cost = MAX_COST;
453     for (i = 0; i < st.nlinks; i++) {
454         st.cur[i].elt = (XferElement *)g_ptr_array_index(xfer->elements, i);
455         st.cur[i].mech_pairs = xfer_element_get_mech_pairs(st.cur[i].elt);
456     }
457
458     /* check that the first element is an XferSource and the last is an XferDest.
459      * A source is identified by having no input mechanisms. */
460     if (st.cur[0].mech_pairs[0].input_mech != XFER_MECH_NONE)
461         error("Transfer element 0 is not a transfer source");
462
463     /* Similarly, a destination has no output mechanisms. */
464     if (st.cur[st.nlinks-1].mech_pairs[0].output_mech != XFER_MECH_NONE)
465         error("Last transfer element is not a transfer destination");
466
467     /* start recursing with the first element, asserting that its input mech is NONE */
468     link_recurse(&st, 0, XFER_MECH_NONE, 0);
469
470     /* check that we got *some* solution */
471     if (st.best_cost == MAX_COST) {
472         error(_("Xfer %s cannot be linked."), xfer_repr(xfer));
473     }
474
475     /* Now create a new list of elements, containing any glue elements
476      * that we need to add, and set their input_mech and output_mech fields */
477     new_elements = g_ptr_array_sized_new(xfer->elements->len);
478     for (i = 0; i < st.nlinks; i++) {
479         elt = st.best[i].elt;
480         elt->input_mech = st.best[i].mech_pairs[st.best[i].elt_idx].input_mech;
481         elt->output_mech = st.best[i].mech_pairs[st.best[i].elt_idx].output_mech;
482         g_ptr_array_add(new_elements, elt);
483
484         if (st.best[i].glue_idx != -1) {
485             elt = xfer_element_glue();
486             elt->xfer = xfer;
487             elt->input_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].input_mech;
488             elt->output_mech = xfer_element_glue_mech_pairs[st.best[i].glue_idx].output_mech;
489             g_ptr_array_add(new_elements, elt);
490         }
491     }
492
493     /* install the new list of elements */
494     g_ptr_array_free(xfer->elements, FALSE);
495     xfer->elements = new_elements;
496     new_elements = NULL;
497
498     /* debug-log the xfer's linkage */
499     len = xfer->elements->len;
500     linkage_str = stralloc("Final linkage: ");
501     for (i = 0; i < len; i++) {
502         XferElement *elt = g_ptr_array_index(xfer->elements, i);
503
504         if (i == 0)
505             linkage_str = newvstralloc(linkage_str, linkage_str, xfer_element_repr(elt), NULL);
506         else
507             linkage_str = newvstrallocf(linkage_str, "%s -(%s)-> %s",
508                 linkage_str, xfer_mech_name(elt->input_mech), xfer_element_repr(elt));
509     }
510     g_debug("%s", linkage_str);
511     amfree(linkage_str);
512
513     amfree(st.cur);
514     amfree(st.best);
515 }
516
517 /*
518  * XMsgSource
519  */
520
521 static gboolean
522 xmsgsource_prepare(
523     GSource *source,
524     gint *timeout_)
525 {
526     XMsgSource *xms = (XMsgSource *)source;
527
528     *timeout_ = -1;
529     return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
530 }
531
532 static gboolean
533 xmsgsource_check(
534     GSource *source)
535 {
536     XMsgSource *xms = (XMsgSource *)source;
537
538     return xms->xfer && g_async_queue_length(xms->xfer->queue) > 0;
539 }
540
541 static gboolean
542 xmsgsource_dispatch(
543     GSource *source G_GNUC_UNUSED,
544     GSourceFunc callback,
545     gpointer user_data)
546 {
547     XMsgSource *xms = (XMsgSource *)source;
548     Xfer *xfer = xms->xfer;
549     XMsgCallback my_cb = (XMsgCallback)callback;
550     XMsg *msg;
551     gboolean deliver_to_caller;
552     guint i;
553     gboolean xfer_done = FALSE;
554
555     /* we're potentially calling Perl code within this loop, so we have to
556      * check that everything is ok on each iteration of the loop. */
557     while (xfer
558         && xfer->status != XFER_DONE
559         && (msg = (XMsg *)g_async_queue_try_pop(xfer->queue))) {
560
561         /* We get first crack at interpreting messages, before calling the
562          * designated callback. */
563         deliver_to_caller = TRUE;
564         switch (msg->type) {
565             /* Intercept and count DONE messages so that we can determine when
566              * the entire transfer is finished. */
567             case XMSG_DONE:
568                 if (--xfer->num_active_elements <= 0) {
569                     /* mark the transfer as done, and take a note to break out
570                      * of this loop after delivering the message to the user */
571                     xfer_set_status(xfer, XFER_DONE);
572                     xfer_done = TRUE;
573                 } else {
574                     /* eat this XMSG_DONE, since we expect more */
575                     deliver_to_caller = FALSE;
576                 }
577                 break;
578
579             case XMSG_CANCEL:
580                 if (xfer->status == XFER_CANCELLING || xfer->status == XFER_CANCELLED) {
581                     /* ignore duplicate cancel messages */
582                     deliver_to_caller = FALSE;
583                 } else {
584                     /* call cancel() on each child element */
585                     gboolean expect_eof;
586
587                     g_debug("Cancelling %s", xfer_repr(xfer));
588                     xfer_set_status(xfer, XFER_CANCELLING);
589
590                     expect_eof = FALSE;
591                     for (i = 0; i < xfer->elements->len; i++) {
592                         XferElement *elt = (XferElement *)
593                                 g_ptr_array_index(xfer->elements, i);
594                         expect_eof = xfer_element_cancel(elt, expect_eof) || expect_eof;
595                     }
596
597                     /* if nothing in the transfer can generate an EOF, then we
598                      * can't cancel this transfer, and we'll just have to wait
599                      * until it's finished.  This may happen, for example, if
600                      * the operating system is copying data for us
601                      * asynchronously */
602                     if (!expect_eof)
603                         g_warning("Transfer %s cannot be cancelled.", xfer_repr(xfer));
604
605                     /* and now we're done cancelling */
606                     xfer_set_status(xfer, XFER_CANCELLED);
607                 }
608                 break;
609
610             default:
611                 break;  /* nothing interesting to do */
612         }
613
614         if (deliver_to_caller) {
615             if (my_cb) {
616                 my_cb(user_data, msg, xfer);
617             } else {
618                 g_warning("Dropping %s because no callback is set", xmsg_repr(msg));
619             }
620         }
621
622         xmsg_free(msg);
623
624         /* This transfer is done, so kill it and exit the loop */
625         if (xfer_done) {
626             xfer_unref(xfer);
627             xfer = NULL;
628             break;
629         }
630     }
631
632     /* Never automatically un-queue the event source */
633     return TRUE;
634 }
635
636 XMsgSource *
637 xmsgsource_new(
638     Xfer *xfer)
639 {
640     static GSourceFuncs *xmsgsource_funcs = NULL;
641     GSource *src;
642     XMsgSource *xms;
643
644     /* initialize these here to avoid a compiler warning */
645     if (!xmsgsource_funcs) {
646         xmsgsource_funcs = g_new0(GSourceFuncs, 1);
647         xmsgsource_funcs->prepare = xmsgsource_prepare;
648         xmsgsource_funcs->check = xmsgsource_check;
649         xmsgsource_funcs->dispatch = xmsgsource_dispatch;
650     }
651
652     src = g_source_new(xmsgsource_funcs, sizeof(XMsgSource));
653     xms = (XMsgSource *)src;
654     xms->xfer = xfer;
655
656     return xms;
657 }
658
659 xfer_status
660 wait_until_xfer_cancelled(
661     Xfer *xfer)
662 {
663     xfer_status seen_status;
664     g_assert(xfer != NULL);
665
666     g_mutex_lock(xfer->status_mutex);
667     while (xfer->status != XFER_CANCELLED && xfer->status != XFER_DONE)
668         g_cond_wait(xfer->status_cond, xfer->status_mutex);
669     seen_status = xfer->status;
670     g_mutex_unlock(xfer->status_mutex);
671
672     return seen_status;
673 }
674
675 xfer_status
676 wait_until_xfer_running(
677     Xfer *xfer)
678 {
679     xfer_status seen_status;
680     g_assert(xfer != NULL);
681
682     g_mutex_lock(xfer->status_mutex);
683     while (xfer->status == XFER_START)
684         g_cond_wait(xfer->status_cond, xfer->status_mutex);
685     seen_status = xfer->status;
686     g_mutex_unlock(xfer->status_mutex);
687
688     return seen_status;
689 }
690
691 void
692 xfer_cancel_with_error(
693     XferElement *elt,
694     const char *fmt,
695     ...)
696 {
697     va_list argp;
698     XMsg *msg;
699
700     g_assert(elt != NULL);
701     g_assert(elt->xfer != NULL);
702
703     msg = xmsg_new(elt, XMSG_ERROR, 0);
704
705     arglist_start(argp, fmt);
706     msg->message = g_strdup_vprintf(fmt, argp);
707     arglist_end(argp);
708
709     /* send the XMSG_ERROR */
710     xfer_queue_message(elt->xfer, msg);
711
712     /* cancel the transfer */
713     xfer_cancel(elt->xfer);
714 }
715
716 gint
717 xfer_atomic_swap_fd(Xfer *xfer, gint *fdp, gint newfd)
718 {
719     gint rv;
720
721     if (xfer)
722         g_mutex_lock(xfer->fd_mutex);
723     rv = *fdp;
724     *fdp = newfd;
725     if (xfer)
726         g_mutex_unlock(xfer->fd_mutex);
727
728     return rv;
729 }