Imported Upstream version 3.3.3
[debian/amanda] / device-src / xfer-source-recovery.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18  *
19  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21  */
22
23 #include "amanda.h"
24 #include "amxfer.h"
25 #include "device.h"
26 #include "property.h"
27 #include "xfer-device.h"
28 #include "arglist.h"
29 #include "conffile.h"
30
31 /*
32  * Class declaration
33  *
34  * This declaration is entirely private; nothing but xfer_source_recovery() references
35  * it directly.
36  */
37
38 GType xfer_source_recovery_get_type(void);
39 #define XFER_SOURCE_RECOVERY_TYPE (xfer_source_recovery_get_type())
40 #define XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery)
41 #define XFER_SOURCE_RECOVERY_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery const)
42 #define XFER_SOURCE_RECOVERY_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
43 #define IS_XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_recovery_get_type ())
44 #define XFER_SOURCE_RECOVERY_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
45
46 static GObjectClass *parent_class = NULL;
47
48 /*
49  * Main object structure
50  */
51
52 typedef struct XferSourceRecovery {
53     XferElement __parent__;
54
55     /* thread for monitoring directtcp transfers */
56     GThread *thread;
57
58     /* this mutex in this condition variable governs all variables below */
59     GCond  *start_part_cond;
60     GMutex *start_part_mutex;
61
62     /* is this device currently paused and awaiting a new part? */
63     gboolean paused;
64
65     /* device to read from (refcounted) */
66     Device *device;
67
68     /* TRUE if use_device found the device unsuitable; this makes start_part
69      * a no-op, allowing the cancellation to be handled normally */
70     gboolean device_bad;
71
72     /* directtcp connection (only valid after XMSG_READY) */
73     DirectTCPConnection *conn;
74     gboolean listen_ok;
75
76     /* and the block size for that device (reset to zero at the start of each
77      * part) */
78     size_t block_size;
79
80     /* bytes read for this image */
81     guint64 bytes_read;
82
83     /* part size (potentially including any zero-padding from the
84      * device) */
85     guint64 part_size;
86
87     /* timer for the duration; NULL while paused or cancelled */
88     GTimer *part_timer;
89
90     gint64   size;
91
92     GCond *abort_cond; /* condition to trigger to abort ndmp command */
93 } XferSourceRecovery;
94
95 /*
96  * Class definition
97  */
98
99 typedef struct {
100     XferElementClass __parent__;
101
102     /* start reading the part at which DEVICE is positioned, sending an
103      * XMSG_PART_DONE when the part has been read */
104     void (*start_part)(XferSourceRecovery *self, Device *device);
105
106     /* use the given device, much like the same method for xfer-dest-taper */
107     void (*use_device)(XferSourceRecovery *self, Device *device);
108 } XferSourceRecoveryClass;
109
110 /*
111  * Debug Logging
112  */
113
114 #define DBG(LEVEL, ...) if (debug_recovery >= LEVEL) { _xsr_dbg(__VA_ARGS__); }
115 static void
116 _xsr_dbg(const char *fmt, ...)
117 {
118     va_list argp;
119     char msg[1024];
120
121     arglist_start(argp, fmt);
122     g_vsnprintf(msg, sizeof(msg), fmt, argp);
123     arglist_end(argp);
124     g_debug("XSR: %s", msg);
125 }
126
127 /*
128  * Implementation
129  */
130
131 /* common code for both directtcp_listen_thread and directtcp_connect_thread;
132  * this is called after self->conn is filled in and carries out the data
133  * transfer over that connection.  NOTE: start_part_mutex is HELD when this
134  * function begins */
135 static gpointer
136 directtcp_common_thread(
137         XferSourceRecovery *self)
138 {
139     XferElement *elt = XFER_ELEMENT(self);
140     char *errmsg = NULL;
141     int result;
142
143     /* send XMSG_READY to indicate it's OK to call start_part now */
144     DBG(2, "sending XMSG_READY");
145     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
146
147     /* now we sit around waiting for signals to write a part */
148     while (1) {
149         guint64 actual_size;
150         XMsg *msg;
151
152         while (self->paused && !elt->cancelled) {
153             DBG(9, "waiting to be un-paused");
154             g_cond_wait(self->start_part_cond, self->start_part_mutex);
155         }
156         DBG(9, "done waiting");
157
158         if (elt->cancelled) {
159             g_mutex_unlock(self->start_part_mutex);
160             goto close_conn_and_send_done;
161         }
162
163         /* if the device is NULL, we're done */
164         if (!self->device)
165             break;
166
167         /* read the part */
168         self->part_timer = g_timer_new();
169
170         while (1) {
171             DBG(2, "reading part from %s", self->device->device_name);
172             result = device_read_to_connection(self->device, G_MAXUINT64,
173                         &actual_size, &elt->cancelled,
174                         self->start_part_mutex, self->abort_cond);
175             if (result == 1 && !elt->cancelled) {
176                 xfer_cancel_with_error(elt, _("error reading from device: %s"),
177                     device_error_or_status(self->device));
178                 g_mutex_unlock(self->start_part_mutex);
179                 goto close_conn_and_send_done;
180             } else if (result == 2 || elt->cancelled) {
181                 g_mutex_unlock(self->start_part_mutex);
182                 goto close_conn_and_send_done;
183             }
184
185             /* break on EOF; otherwise do another read_to_connection */
186             if (self->device->is_eof) {
187                 break;
188             }
189         }
190         DBG(2, "done reading part; sending XMSG_PART_DONE");
191
192         /* the device has signalled EOF (really end-of-part), so clean up instance
193          * variables and report the EOP to the caller in the form of an xmsg */
194         msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
195         msg->size = actual_size;
196         msg->duration = g_timer_elapsed(self->part_timer, NULL);
197         msg->partnum = 0;
198         msg->fileno = self->device->file;
199         msg->successful = TRUE;
200         msg->eof = FALSE;
201
202         self->paused = TRUE;
203         g_object_unref(self->device);
204         self->device = NULL;
205         self->part_size = 0;
206         self->block_size = 0;
207         g_timer_destroy(self->part_timer);
208         self->part_timer = NULL;
209
210         xfer_queue_message(elt->xfer, msg);
211     }
212     g_mutex_unlock(self->start_part_mutex);
213
214 close_conn_and_send_done:
215     if (self->conn) {
216         errmsg = directtcp_connection_close(self->conn);
217         g_object_unref(self->conn);
218         self->conn = NULL;
219         if (errmsg) {
220             xfer_cancel_with_error(elt, _("error closing DirectTCP connection: %s"), errmsg);
221             wait_until_xfer_cancelled(elt->xfer);
222         }
223     }
224
225     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
226
227     return NULL;
228 }
229
230 static gpointer
231 directtcp_connect_thread(
232         gpointer data)
233 {
234     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
235     XferElement *elt = XFER_ELEMENT(self);
236     int result;
237
238     DBG(1, "(this is directtcp_connect_thread)")
239
240     /* first, we need to accept the incoming connection; we do this while
241      * holding the start_part_mutex, so that a part doesn't get started until
242      * we're finished with the device */
243     g_mutex_lock(self->start_part_mutex);
244
245     if (elt->cancelled) {
246         g_mutex_unlock(self->start_part_mutex);
247         goto send_done;
248     }
249
250     g_assert(self->device != NULL); /* have a device */
251     g_assert(elt->output_listen_addrs != NULL); /* listening on it */
252     g_assert(self->listen_ok);
253
254     DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
255     result = device_accept(self->device, &self->conn, &elt->cancelled,
256                            self->start_part_mutex, self->abort_cond);
257     if (result == 1 && !elt->cancelled) {
258         xfer_cancel_with_error(elt,
259             _("error accepting DirectTCP connection: %s"),
260             device_error_or_status(self->device));
261         g_mutex_unlock(self->start_part_mutex);
262         wait_until_xfer_cancelled(elt->xfer);
263         goto send_done;
264     } else if (result == 2 || elt->cancelled) {
265         g_mutex_unlock(self->start_part_mutex);
266         goto send_done;
267     }
268     DBG(2, "DirectTCP connection accepted");
269
270     return directtcp_common_thread(self);
271
272 send_done:
273     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
274     return NULL;
275 }
276
277 static gpointer
278 directtcp_listen_thread(
279         gpointer data)
280 {
281     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
282     XferElement *elt = XFER_ELEMENT(self);
283     int result;
284
285     DBG(1, "(this is directtcp_listen_thread)");
286
287     /* we need to make an outgoing connection to downstream; we do this while
288      * holding the start_part_mutex, so that a part doesn't get started until
289      * we're finished with the device */
290     g_mutex_lock(self->start_part_mutex);
291
292     if (elt->cancelled) {
293         g_mutex_unlock(self->start_part_mutex);
294         goto send_done;
295     }
296
297     g_assert(self->device != NULL); /* have a device */
298     g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */
299
300     DBG(2, "making DirectTCP connection on device %s", self->device->device_name);
301     result = device_connect(self->device, FALSE,
302                             elt->downstream->input_listen_addrs,
303                             &self->conn, &elt->cancelled,
304                             self->start_part_mutex, self->abort_cond);
305     if (result == 1 && !elt->cancelled) {
306         xfer_cancel_with_error(elt,
307             _("error making DirectTCP connection: %s"),
308             device_error_or_status(self->device));
309         g_mutex_unlock(self->start_part_mutex);
310         wait_until_xfer_cancelled(elt->xfer);
311         goto send_done;
312     } else if (result == 2 || elt->cancelled) {
313         g_mutex_unlock(self->start_part_mutex);
314         wait_until_xfer_cancelled(elt->xfer);
315         goto send_done;
316     }
317     DBG(2, "DirectTCP connect succeeded");
318
319     return directtcp_common_thread(self);
320
321 send_done:
322     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
323     return NULL;
324 }
325
326 static gboolean
327 setup_impl(
328     XferElement *elt)
329 {
330     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
331
332     if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
333         g_assert(self->device != NULL);
334         DBG(2, "listening for DirectTCP connection on device %s", self->device->device_name);
335         if (!device_listen(self->device, FALSE, &elt->output_listen_addrs)) {
336             xfer_cancel_with_error(elt,
337                 _("error listening for DirectTCP connection: %s"),
338                 device_error_or_status(self->device));
339             return FALSE;
340         }
341         self->listen_ok = TRUE;
342     } else {
343         /* no output_listen_addrs for either XFER_MECH_DIRECTTCP_LISTEN or
344          * XFER_MECH_PULL_BUFFER */
345         elt->output_listen_addrs = NULL;
346     }
347
348     return TRUE;
349 }
350
351 static gboolean
352 start_impl(
353     XferElement *elt)
354 {
355     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
356
357     if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
358         g_assert(elt->output_listen_addrs != NULL);
359         self->thread = g_thread_create(directtcp_connect_thread, (gpointer)self, FALSE, NULL);
360         return TRUE; /* we'll send XMSG_DONE */
361     } else if (elt->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
362         g_assert(elt->output_listen_addrs == NULL);
363         self->thread = g_thread_create(directtcp_listen_thread, (gpointer)self, FALSE, NULL);
364         return TRUE; /* we'll send XMSG_DONE */
365     } else {
366         /* nothing to prepare for - we're ready already! */
367         DBG(2, "not using DirectTCP: sending XMSG_READY immediately");
368         xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
369
370         return FALSE; /* we won't send XMSG_DONE */
371     }
372 }
373
374 static gpointer
375 pull_buffer_impl(
376     XferElement *elt,
377     size_t *size)
378 {
379     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
380     gpointer buf = NULL;
381     int result;
382     int devsize;
383     XMsg *msg;
384
385     g_assert(elt->output_mech == XFER_MECH_PULL_BUFFER);
386     g_mutex_lock(self->start_part_mutex);
387
388     while (1) {
389         /* make sure we have a device */
390         while (self->paused && !elt->cancelled)
391             g_cond_wait(self->start_part_cond, self->start_part_mutex);
392
393         /* indicate EOF on an cancel or when there are no more parts */
394         if (elt->cancelled || !self->device) {
395             goto error;
396         }
397
398         /* start the timer if this is the first pull_buffer of this part */
399         if (!self->part_timer) {
400             DBG(2, "first pull_buffer of new part");
401             self->part_timer = g_timer_new();
402         }
403
404         /* loop until we read a full block, in case the blocks are larger than
405          * expected */
406         if (self->block_size == 0)
407             self->block_size = (size_t)self->device->block_size;
408
409         do {
410             buf = g_malloc(self->block_size);
411             devsize = (int)self->block_size;
412             result = device_read_block(self->device, buf, &devsize);
413             *size = devsize;
414
415             if (result == 0) {
416                 g_assert(*size > self->block_size);
417                 self->block_size = devsize;
418                 amfree(buf);
419             }
420         } while (result == 0);
421
422         /* if this block was successful, return it */
423         if (result > 0) {
424             self->part_size += *size;
425             break;
426         }
427
428         if (result < 0) {
429             amfree(buf);
430
431             /* if we're not at EOF, it's an error */
432             if (!self->device->is_eof) {
433                 xfer_cancel_with_error(elt,
434                     _("error reading from %s: %s"),
435                     self->device->device_name,
436                     device_error_or_status(self->device));
437                 g_mutex_unlock(self->start_part_mutex);
438                 wait_until_xfer_cancelled(elt->xfer);
439                 goto error_unlocked;
440             }
441
442             /* the device has signalled EOF (really end-of-part), so clean up instance
443              * variables and report the EOP to the caller in the form of an xmsg */
444             DBG(2, "pull_buffer hit EOF; sending XMSG_PART_DONE");
445             msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
446             msg->size = self->part_size;
447             msg->duration = g_timer_elapsed(self->part_timer, NULL);
448             msg->partnum = 0;
449             msg->fileno = self->device->file;
450             msg->successful = TRUE;
451             msg->eof = FALSE;
452
453             self->paused = TRUE;
454             g_object_unref(self->device);
455             self->device = NULL;
456             self->bytes_read += self->part_size;
457             self->part_size = 0;
458             self->block_size = 0;
459             if (self->part_timer) {
460                 g_timer_destroy(self->part_timer);
461                 self->part_timer = NULL;
462             }
463
464             /* don't queue the XMSG_PART_DONE until we've adjusted all of our
465              * instance variables appropriately */
466             xfer_queue_message(elt->xfer, msg);
467         }
468     }
469
470     g_mutex_unlock(self->start_part_mutex);
471
472     if (elt->size > 0) {
473         /* initialize on first pass */
474         if (self->size == 0)
475             self->size = elt->size;
476         
477         if (self->size == -1) {
478             *size = 0;
479             amfree(buf);
480             return NULL;
481         }
482
483         if (*size > (guint64)self->size) {
484             /* return only self->size bytes */
485             *size = self->size;
486             self->size = -1;
487         } else {
488             self->size -= *size;
489         }
490     }
491
492     return buf;
493 error:
494     g_mutex_unlock(self->start_part_mutex);
495 error_unlocked:
496     *size = 0;
497     return NULL;
498 }
499
500 static gboolean
501 cancel_impl(
502     XferElement *elt,
503     gboolean expect_eof G_GNUC_UNUSED)
504 {
505     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
506     elt->cancelled = TRUE;
507
508     /* trigger the condition variable, in case the thread is waiting on it */
509     g_mutex_lock(self->start_part_mutex);
510     g_cond_broadcast(self->start_part_cond);
511     g_cond_broadcast(self->abort_cond);
512     g_mutex_unlock(self->start_part_mutex);
513
514     return TRUE;
515 }
516
517 static void
518 start_part_impl(
519     XferSourceRecovery *self,
520     Device *device)
521 {
522     g_assert(!device || device->in_file);
523
524     DBG(2, "start_part called");
525
526     if (self->device_bad) {
527         /* use_device didn't like the device it got, but the xfer cancellation
528          * has not completed yet, so do nothing */
529         return;
530     }
531
532     g_mutex_lock(self->start_part_mutex);
533
534     /* make sure we're ready to go */
535     g_assert(self->paused);
536     if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT
537      || XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
538         g_assert(self->conn != NULL);
539     }
540
541     /* if we already have a device, it should have been given to use_device */
542     if (device && self->device)
543         g_assert(self->device == device);
544
545     if (self->device)
546         g_object_unref(self->device);
547     if (device)
548         g_object_ref(device);
549     self->device = device;
550
551     self->paused = FALSE;
552
553     DBG(2, "triggering condition variable");
554     g_cond_broadcast(self->start_part_cond);
555     g_mutex_unlock(self->start_part_mutex);
556 }
557
558 static void
559 use_device_impl(
560     XferSourceRecovery *xdtself,
561     Device *device)
562 {
563     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(xdtself);
564
565     g_assert(self->paused);
566
567     /* short-circuit if nothing is changing */
568     if (self->device == device)
569         return;
570
571     if (self->device)
572         g_object_unref(self->device);
573     self->device = NULL;
574
575     /* if we already have a connection, then make this device use it */
576     if (self->conn) {
577         if (!device_use_connection(device, self->conn)) {
578             /* queue up an error for later, and set device_bad.
579              * start_part will see this and fail silently */
580             self->device_bad = TRUE;
581             xfer_cancel_with_error(XFER_ELEMENT(self),
582                 _("Cannot continue onto new volume: %s"),
583                 device_error_or_status(device));
584             return;
585         }
586     }
587
588     self->device = device;
589     g_object_ref(device);
590 }
591
592 static xfer_element_mech_pair_t *
593 get_mech_pairs_impl(
594     XferElement *elt)
595 {
596     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
597     static xfer_element_mech_pair_t basic_mech_pairs[] = {
598         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
599         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
600     };
601     static xfer_element_mech_pair_t directtcp_mech_pairs[] = {
602         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 0, 1},
603         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 0, 1},
604         /* devices which support DirectTCP are usually not very efficient
605          * at delivering data via device_read_block, so this counts an extra
606          * byte operation in the cost metrics (2 here vs. 1 in basic_mech_pairs).
607          * This is a hack, but it will do for now. */
608         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 2, 0},
609         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
610     };
611
612     return device_directtcp_supported(self->device)?
613         directtcp_mech_pairs : basic_mech_pairs;
614 }
615
616 static void
617 finalize_impl(
618     GObject * obj_self)
619 {
620     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(obj_self);
621
622     if (self->conn)
623         g_object_unref(self->conn);
624     if (self->device)
625         g_object_unref(self->device);
626
627     g_cond_free(self->start_part_cond);
628     g_cond_free(self->abort_cond);
629     g_mutex_free(self->start_part_mutex);
630 }
631
632 static void
633 instance_init(
634     XferElement *elt)
635 {
636     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
637
638     self->paused = TRUE;
639     self->start_part_cond = g_cond_new();
640     self->abort_cond = g_cond_new();
641     self->start_part_mutex = g_mutex_new();
642 }
643
644 static void
645 class_init(
646     XferSourceRecoveryClass * xsr_klass)
647 {
648     XferElementClass *klass = XFER_ELEMENT_CLASS(xsr_klass);
649     GObjectClass *gobject_klass = G_OBJECT_CLASS(xsr_klass);
650
651     klass->pull_buffer = pull_buffer_impl;
652     klass->cancel = cancel_impl;
653     klass->start = start_impl;
654     klass->setup = setup_impl;
655     klass->get_mech_pairs = get_mech_pairs_impl;
656
657     klass->perl_class = "Amanda::Xfer::Source::Recovery";
658     klass->mech_pairs = NULL; /* see get_mech_pairs_impl, above */
659
660     xsr_klass->start_part = start_part_impl;
661     xsr_klass->use_device = use_device_impl;
662
663     gobject_klass->finalize = finalize_impl;
664
665     parent_class = g_type_class_peek_parent(xsr_klass);
666 }
667
668 GType
669 xfer_source_recovery_get_type (void)
670 {
671     static GType type = 0;
672
673     if G_UNLIKELY(type == 0) {
674         static const GTypeInfo info = {
675             sizeof (XferSourceRecoveryClass),
676             (GBaseInitFunc) NULL,
677             (GBaseFinalizeFunc) NULL,
678             (GClassInitFunc) class_init,
679             (GClassFinalizeFunc) NULL,
680             NULL /* class_data */,
681             sizeof (XferSourceRecovery),
682             0 /* n_preallocs */,
683             (GInstanceInitFunc) instance_init,
684             NULL
685         };
686
687         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceRecovery", &info, 0);
688     }
689
690     return type;
691 }
692
693 /*
694  * Public methods and stubs
695  */
696
697 void
698 xfer_source_recovery_start_part(
699     XferElement *elt,
700     Device *device)
701 {
702     XferSourceRecoveryClass *klass;
703     g_assert(IS_XFER_SOURCE_RECOVERY(elt));
704
705     klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
706     klass->start_part(XFER_SOURCE_RECOVERY(elt), device);
707 }
708
709 /* create an element of this class; prototype is in xfer-device.h */
710 XferElement *
711 xfer_source_recovery(Device *first_device)
712 {
713     XferSourceRecovery *self = (XferSourceRecovery *)g_object_new(XFER_SOURCE_RECOVERY_TYPE, NULL);
714     XferElement *elt = XFER_ELEMENT(self);
715
716     g_assert(first_device != NULL);
717     g_object_ref(first_device);
718     self->device = first_device;
719
720     return elt;
721 }
722
723 void
724 xfer_source_recovery_use_device(
725     XferElement *elt,
726     Device *device)
727 {
728     XferSourceRecoveryClass *klass;
729     g_assert(IS_XFER_SOURCE_RECOVERY(elt));
730
731     klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
732     klass->use_device(XFER_SOURCE_RECOVERY(elt), device);
733 }
734
735 guint64
736 xfer_source_recovery_get_bytes_read(
737     XferElement *elt)
738 {
739     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
740     guint64 bytes_read = self->bytes_read;
741
742     if (self->device)
743         bytes_read += device_get_bytes_read(self->device);
744
745     return bytes_read;
746 }
747