Imported Upstream version 3.3.2
[debian/amanda] / device-src / xfer-source-recovery.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amanda.h"
23 #include "amxfer.h"
24 #include "device.h"
25 #include "property.h"
26 #include "xfer-device.h"
27 #include "arglist.h"
28 #include "conffile.h"
29
30 /*
31  * Class declaration
32  *
33  * This declaration is entirely private; nothing but xfer_source_recovery() references
34  * it directly.
35  */
36
37 GType xfer_source_recovery_get_type(void);
38 #define XFER_SOURCE_RECOVERY_TYPE (xfer_source_recovery_get_type())
39 #define XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery)
40 #define XFER_SOURCE_RECOVERY_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery const)
41 #define XFER_SOURCE_RECOVERY_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
42 #define IS_XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_recovery_get_type ())
43 #define XFER_SOURCE_RECOVERY_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
44
45 static GObjectClass *parent_class = NULL;
46
47 /*
48  * Main object structure
49  */
50
51 typedef struct XferSourceRecovery {
52     XferElement __parent__;
53
54     /* thread for monitoring directtcp transfers */
55     GThread *thread;
56
57     /* this mutex in this condition variable governs all variables below */
58     GCond *start_part_cond;
59     GMutex *start_part_mutex;
60
61     /* is this device currently paused and awaiting a new part? */
62     gboolean paused;
63
64     /* device to read from (refcounted) */
65     Device *device;
66
67     /* TRUE if use_device found the device unsuitable; this makes start_part
68      * a no-op, allowing the cancellation to be handled normally */
69     gboolean device_bad;
70
71     /* directtcp connection (only valid after XMSG_READY) */
72     DirectTCPConnection *conn;
73     gboolean listen_ok;
74
75     /* and the block size for that device (reset to zero at the start of each
76      * part) */
77     size_t block_size;
78
79     /* bytes read for this image */
80     guint64 bytes_read;
81
82     /* part size (potentially including any zero-padding from the
83      * device) */
84     guint64 part_size;
85
86     /* timer for the duration; NULL while paused or cancelled */
87     GTimer *part_timer;
88
89     gint64   size;
90 } XferSourceRecovery;
91
92 /*
93  * Class definition
94  */
95
96 typedef struct {
97     XferElementClass __parent__;
98
99     /* start reading the part at which DEVICE is positioned, sending an
100      * XMSG_PART_DONE when the part has been read */
101     void (*start_part)(XferSourceRecovery *self, Device *device);
102
103     /* use the given device, much like the same method for xfer-dest-taper */
104     void (*use_device)(XferSourceRecovery *self, Device *device);
105 } XferSourceRecoveryClass;
106
107 /*
108  * Debug Logging
109  */
110
111 #define DBG(LEVEL, ...) if (debug_recovery >= LEVEL) { _xsr_dbg(__VA_ARGS__); }
112 static void
113 _xsr_dbg(const char *fmt, ...)
114 {
115     va_list argp;
116     char msg[1024];
117
118     arglist_start(argp, fmt);
119     g_vsnprintf(msg, sizeof(msg), fmt, argp);
120     arglist_end(argp);
121     g_debug("XSR: %s", msg);
122 }
123
124 /*
125  * Implementation
126  */
127
128 /* common code for both directtcp_listen_thread and directtcp_connect_thread;
129  * this is called after self->conn is filled in and carries out the data
130  * transfer over that connection.  NOTE: start_part_mutex is HELD when this
131  * function begins */
132 static gpointer
133 directtcp_common_thread(
134         XferSourceRecovery *self)
135 {
136     XferElement *elt = XFER_ELEMENT(self);
137     char *errmsg = NULL;
138
139     /* send XMSG_READY to indicate it's OK to call start_part now */
140     DBG(2, "sending XMSG_READY");
141     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
142
143     /* now we sit around waiting for signals to write a part */
144     while (1) {
145         guint64 actual_size;
146         XMsg *msg;
147
148         while (self->paused && !elt->cancelled) {
149             DBG(9, "waiting to be un-paused");
150             g_cond_wait(self->start_part_cond, self->start_part_mutex);
151         }
152         DBG(9, "done waiting");
153
154         if (elt->cancelled) {
155             g_mutex_unlock(self->start_part_mutex);
156             goto close_conn_and_send_done;
157         }
158
159         /* if the device is NULL, we're done */
160         if (!self->device)
161             break;
162
163         /* read the part */
164         self->part_timer = g_timer_new();
165
166         while (1) {
167             DBG(2, "reading part from %s", self->device->device_name);
168             if (!device_read_to_connection(self->device, G_MAXUINT64, &actual_size)) {
169                 xfer_cancel_with_error(elt, _("error reading from device: %s"),
170                     device_error_or_status(self->device));
171                 g_mutex_unlock(self->start_part_mutex);
172                 goto close_conn_and_send_done;
173             }
174
175             /* break on EOF; otherwise do another read_to_connection */
176             if (self->device->is_eof) {
177                 break;
178             }
179         }
180         DBG(2, "done reading part; sending XMSG_PART_DONE");
181
182         /* the device has signalled EOF (really end-of-part), so clean up instance
183          * variables and report the EOP to the caller in the form of an xmsg */
184         msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
185         msg->size = actual_size;
186         msg->duration = g_timer_elapsed(self->part_timer, NULL);
187         msg->partnum = 0;
188         msg->fileno = self->device->file;
189         msg->successful = TRUE;
190         msg->eof = FALSE;
191
192         self->paused = TRUE;
193         g_object_unref(self->device);
194         self->device = NULL;
195         self->part_size = 0;
196         self->block_size = 0;
197         g_timer_destroy(self->part_timer);
198         self->part_timer = NULL;
199
200         xfer_queue_message(elt->xfer, msg);
201     }
202     g_mutex_unlock(self->start_part_mutex);
203
204 close_conn_and_send_done:
205     if (self->conn) {
206         errmsg = directtcp_connection_close(self->conn);
207         g_object_unref(self->conn);
208         self->conn = NULL;
209         if (errmsg) {
210             xfer_cancel_with_error(elt, _("error closing DirectTCP connection: %s"), errmsg);
211             wait_until_xfer_cancelled(elt->xfer);
212         }
213     }
214
215     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
216
217     return NULL;
218 }
219
220 static gpointer
221 directtcp_connect_thread(
222         gpointer data)
223 {
224     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
225     XferElement *elt = XFER_ELEMENT(self);
226
227     DBG(1, "(this is directtcp_connect_thread)")
228
229     /* first, we need to accept the incoming connection; we do this while
230      * holding the start_part_mutex, so that a part doesn't get started until
231      * we're finished with the device */
232     g_mutex_lock(self->start_part_mutex);
233
234     if (elt->cancelled) {
235         g_mutex_unlock(self->start_part_mutex);
236         goto send_done;
237     }
238
239     g_assert(self->device != NULL); /* have a device */
240     g_assert(elt->output_listen_addrs != NULL); /* listening on it */
241     g_assert(self->listen_ok);
242
243     DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
244     if (!device_accept(self->device, &self->conn, NULL, NULL)) {
245         xfer_cancel_with_error(elt,
246             _("error accepting DirectTCP connection: %s"),
247             device_error_or_status(self->device));
248         g_mutex_unlock(self->start_part_mutex);
249         wait_until_xfer_cancelled(elt->xfer);
250         goto send_done;
251     }
252     DBG(2, "DirectTCP connection accepted");
253
254     return directtcp_common_thread(self);
255
256 send_done:
257     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
258     return NULL;
259 }
260
261 static gpointer
262 directtcp_listen_thread(
263         gpointer data)
264 {
265     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
266     XferElement *elt = XFER_ELEMENT(self);
267
268     DBG(1, "(this is directtcp_listen_thread)");
269
270     /* we need to make an outgoing connection to downstream; we do this while
271      * holding the start_part_mutex, so that a part doesn't get started until
272      * we're finished with the device */
273     g_mutex_lock(self->start_part_mutex);
274
275     if (elt->cancelled) {
276         g_mutex_unlock(self->start_part_mutex);
277         goto send_done;
278     }
279
280     g_assert(self->device != NULL); /* have a device */
281     g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */
282
283     DBG(2, "making DirectTCP connection on device %s", self->device->device_name);
284     if (!device_connect(self->device, FALSE, elt->downstream->input_listen_addrs,
285                         &self->conn, NULL, NULL)) {
286         xfer_cancel_with_error(elt,
287             _("error making DirectTCP connection: %s"),
288             device_error_or_status(self->device));
289         g_mutex_unlock(self->start_part_mutex);
290         wait_until_xfer_cancelled(elt->xfer);
291         goto send_done;
292     }
293     DBG(2, "DirectTCP connect succeeded");
294
295     return directtcp_common_thread(self);
296
297 send_done:
298     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
299     return NULL;
300 }
301
302 static gboolean
303 setup_impl(
304     XferElement *elt)
305 {
306     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
307
308     if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
309         g_assert(self->device != NULL);
310         DBG(2, "listening for DirectTCP connection on device %s", self->device->device_name);
311         if (!device_listen(self->device, FALSE, &elt->output_listen_addrs)) {
312             xfer_cancel_with_error(elt,
313                 _("error listening for DirectTCP connection: %s"),
314                 device_error_or_status(self->device));
315             return FALSE;
316         }
317         self->listen_ok = TRUE;
318     } else {
319         /* no output_listen_addrs for either XFER_MECH_DIRECTTCP_LISTEN or
320          * XFER_MECH_PULL_BUFFER */
321         elt->output_listen_addrs = NULL;
322     }
323
324     return TRUE;
325 }
326
327 static gboolean
328 start_impl(
329     XferElement *elt)
330 {
331     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
332
333     if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
334         g_assert(elt->output_listen_addrs != NULL);
335         self->thread = g_thread_create(directtcp_connect_thread, (gpointer)self, FALSE, NULL);
336         return TRUE; /* we'll send XMSG_DONE */
337     } else if (elt->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
338         g_assert(elt->output_listen_addrs == NULL);
339         self->thread = g_thread_create(directtcp_listen_thread, (gpointer)self, FALSE, NULL);
340         return TRUE; /* we'll send XMSG_DONE */
341     } else {
342         /* nothing to prepare for - we're ready already! */
343         DBG(2, "not using DirectTCP: sending XMSG_READY immediately");
344         xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
345
346         return FALSE; /* we won't send XMSG_DONE */
347     }
348 }
349
350 static gpointer
351 pull_buffer_impl(
352     XferElement *elt,
353     size_t *size)
354 {
355     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
356     gpointer buf = NULL;
357     int result;
358     int devsize;
359     XMsg *msg;
360
361     g_assert(elt->output_mech == XFER_MECH_PULL_BUFFER);
362     g_mutex_lock(self->start_part_mutex);
363
364     while (1) {
365         /* make sure we have a device */
366         while (self->paused && !elt->cancelled)
367             g_cond_wait(self->start_part_cond, self->start_part_mutex);
368
369         /* indicate EOF on an cancel or when there are no more parts */
370         if (elt->cancelled || !self->device) {
371             goto error;
372         }
373
374         /* start the timer if this is the first pull_buffer of this part */
375         if (!self->part_timer) {
376             DBG(2, "first pull_buffer of new part");
377             self->part_timer = g_timer_new();
378         }
379
380         /* loop until we read a full block, in case the blocks are larger than
381          * expected */
382         if (self->block_size == 0)
383             self->block_size = (size_t)self->device->block_size;
384
385         do {
386             buf = g_malloc(self->block_size);
387             devsize = (int)self->block_size;
388             result = device_read_block(self->device, buf, &devsize);
389             *size = devsize;
390
391             if (result == 0) {
392                 g_assert(*size > self->block_size);
393                 self->block_size = devsize;
394                 amfree(buf);
395             }
396         } while (result == 0);
397
398         /* if this block was successful, return it */
399         if (result > 0) {
400             self->part_size += *size;
401             break;
402         }
403
404         if (result < 0) {
405             amfree(buf);
406
407             /* if we're not at EOF, it's an error */
408             if (!self->device->is_eof) {
409                 xfer_cancel_with_error(elt,
410                     _("error reading from %s: %s"),
411                     self->device->device_name,
412                     device_error_or_status(self->device));
413                 g_mutex_unlock(self->start_part_mutex);
414                 wait_until_xfer_cancelled(elt->xfer);
415                 goto error_unlocked;
416             }
417
418             /* the device has signalled EOF (really end-of-part), so clean up instance
419              * variables and report the EOP to the caller in the form of an xmsg */
420             DBG(2, "pull_buffer hit EOF; sending XMSG_PART_DONE");
421             msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
422             msg->size = self->part_size;
423             msg->duration = g_timer_elapsed(self->part_timer, NULL);
424             msg->partnum = 0;
425             msg->fileno = self->device->file;
426             msg->successful = TRUE;
427             msg->eof = FALSE;
428
429             self->paused = TRUE;
430             g_object_unref(self->device);
431             self->device = NULL;
432             self->bytes_read += self->part_size;
433             self->part_size = 0;
434             self->block_size = 0;
435             if (self->part_timer) {
436                 g_timer_destroy(self->part_timer);
437                 self->part_timer = NULL;
438             }
439
440             /* don't queue the XMSG_PART_DONE until we've adjusted all of our
441              * instance variables appropriately */
442             xfer_queue_message(elt->xfer, msg);
443         }
444     }
445
446     g_mutex_unlock(self->start_part_mutex);
447
448     if (elt->size > 0) {
449         /* initialize on first pass */
450         if (self->size == 0)
451             self->size = elt->size;
452         
453         if (self->size == -1) {
454             *size = 0;
455             amfree(buf);
456             return NULL;
457         }
458
459         if (*size > (guint64)self->size) {
460             /* return only self->size bytes */
461             *size = self->size;
462             self->size = -1;
463         } else {
464             self->size -= *size;
465         }
466     }
467
468     return buf;
469 error:
470     g_mutex_unlock(self->start_part_mutex);
471 error_unlocked:
472     *size = 0;
473     return NULL;
474 }
475
476 static gboolean
477 cancel_impl(
478     XferElement *elt,
479     gboolean expect_eof G_GNUC_UNUSED)
480 {
481     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
482     elt->cancelled = TRUE;
483
484     /* trigger the condition variable, in case the thread is waiting on it */
485     g_mutex_lock(self->start_part_mutex);
486     g_cond_broadcast(self->start_part_cond);
487     g_mutex_unlock(self->start_part_mutex);
488
489     return TRUE;
490 }
491
492 static void
493 start_part_impl(
494     XferSourceRecovery *self,
495     Device *device)
496 {
497     g_assert(!device || device->in_file);
498
499     DBG(2, "start_part called");
500
501     if (self->device_bad) {
502         /* use_device didn't like the device it got, but the xfer cancellation
503          * has not completed yet, so do nothing */
504         return;
505     }
506
507     g_mutex_lock(self->start_part_mutex);
508
509     /* make sure we're ready to go */
510     g_assert(self->paused);
511     if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT
512      || XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
513         g_assert(self->conn != NULL);
514     }
515
516     /* if we already have a device, it should have been given to use_device */
517     if (device && self->device)
518         g_assert(self->device == device);
519
520     if (self->device)
521         g_object_unref(self->device);
522     if (device)
523         g_object_ref(device);
524     self->device = device;
525
526     self->paused = FALSE;
527
528     DBG(2, "triggering condition variable");
529     g_cond_broadcast(self->start_part_cond);
530     g_mutex_unlock(self->start_part_mutex);
531 }
532
533 static void
534 use_device_impl(
535     XferSourceRecovery *xdtself,
536     Device *device)
537 {
538     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(xdtself);
539
540     g_assert(self->paused);
541
542     /* short-circuit if nothing is changing */
543     if (self->device == device)
544         return;
545
546     if (self->device)
547         g_object_unref(self->device);
548     self->device = NULL;
549
550     /* if we already have a connection, then make this device use it */
551     if (self->conn) {
552         if (!device_use_connection(device, self->conn)) {
553             /* queue up an error for later, and set device_bad.
554              * start_part will see this and fail silently */
555             self->device_bad = TRUE;
556             xfer_cancel_with_error(XFER_ELEMENT(self),
557                 _("Cannot continue onto new volume: %s"),
558                 device_error_or_status(device));
559             return;
560         }
561     }
562
563     self->device = device;
564     g_object_ref(device);
565 }
566
567 static xfer_element_mech_pair_t *
568 get_mech_pairs_impl(
569     XferElement *elt)
570 {
571     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
572     static xfer_element_mech_pair_t basic_mech_pairs[] = {
573         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
574         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
575     };
576     static xfer_element_mech_pair_t directtcp_mech_pairs[] = {
577         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 0, 1},
578         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 0, 1},
579         /* devices which support DirectTCP are usually not very efficient
580          * at delivering data via device_read_block, so this counts an extra
581          * byte operation in the cost metrics (2 here vs. 1 in basic_mech_pairs).
582          * This is a hack, but it will do for now. */
583         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 2, 0},
584         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
585     };
586
587     return device_directtcp_supported(self->device)?
588         directtcp_mech_pairs : basic_mech_pairs;
589 }
590
591 static void
592 finalize_impl(
593     GObject * obj_self)
594 {
595     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(obj_self);
596
597     if (self->conn)
598         g_object_unref(self->conn);
599     if (self->device)
600         g_object_unref(self->device);
601
602     g_cond_free(self->start_part_cond);
603     g_mutex_free(self->start_part_mutex);
604 }
605
606 static void
607 instance_init(
608     XferElement *elt)
609 {
610     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
611
612     self->paused = TRUE;
613     self->start_part_cond = g_cond_new();
614     self->start_part_mutex = g_mutex_new();
615 }
616
617 static void
618 class_init(
619     XferSourceRecoveryClass * xsr_klass)
620 {
621     XferElementClass *klass = XFER_ELEMENT_CLASS(xsr_klass);
622     GObjectClass *gobject_klass = G_OBJECT_CLASS(xsr_klass);
623
624     klass->pull_buffer = pull_buffer_impl;
625     klass->cancel = cancel_impl;
626     klass->start = start_impl;
627     klass->setup = setup_impl;
628     klass->get_mech_pairs = get_mech_pairs_impl;
629
630     klass->perl_class = "Amanda::Xfer::Source::Recovery";
631     klass->mech_pairs = NULL; /* see get_mech_pairs_impl, above */
632
633     xsr_klass->start_part = start_part_impl;
634     xsr_klass->use_device = use_device_impl;
635
636     gobject_klass->finalize = finalize_impl;
637
638     parent_class = g_type_class_peek_parent(xsr_klass);
639 }
640
641 GType
642 xfer_source_recovery_get_type (void)
643 {
644     static GType type = 0;
645
646     if G_UNLIKELY(type == 0) {
647         static const GTypeInfo info = {
648             sizeof (XferSourceRecoveryClass),
649             (GBaseInitFunc) NULL,
650             (GBaseFinalizeFunc) NULL,
651             (GClassInitFunc) class_init,
652             (GClassFinalizeFunc) NULL,
653             NULL /* class_data */,
654             sizeof (XferSourceRecovery),
655             0 /* n_preallocs */,
656             (GInstanceInitFunc) instance_init,
657             NULL
658         };
659
660         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceRecovery", &info, 0);
661     }
662
663     return type;
664 }
665
666 /*
667  * Public methods and stubs
668  */
669
670 void
671 xfer_source_recovery_start_part(
672     XferElement *elt,
673     Device *device)
674 {
675     XferSourceRecoveryClass *klass;
676     g_assert(IS_XFER_SOURCE_RECOVERY(elt));
677
678     klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
679     klass->start_part(XFER_SOURCE_RECOVERY(elt), device);
680 }
681
682 /* create an element of this class; prototype is in xfer-device.h */
683 XferElement *
684 xfer_source_recovery(Device *first_device)
685 {
686     XferSourceRecovery *self = (XferSourceRecovery *)g_object_new(XFER_SOURCE_RECOVERY_TYPE, NULL);
687     XferElement *elt = XFER_ELEMENT(self);
688
689     g_assert(first_device != NULL);
690     g_object_ref(first_device);
691     self->device = first_device;
692
693     return elt;
694 }
695
696 void
697 xfer_source_recovery_use_device(
698     XferElement *elt,
699     Device *device)
700 {
701     XferSourceRecoveryClass *klass;
702     g_assert(IS_XFER_SOURCE_RECOVERY(elt));
703
704     klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
705     klass->use_device(XFER_SOURCE_RECOVERY(elt), device);
706 }
707
708 guint64
709 xfer_source_recovery_get_bytes_read(
710     XferElement *elt)
711 {
712     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
713     guint64 bytes_read = self->bytes_read;
714
715     if (self->device)
716         bytes_read += device_get_bytes_read(self->device);
717
718     return bytes_read;
719 }
720