Imported Upstream version 3.2.0
[debian/amanda] / device-src / xfer-source-recovery.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amanda.h"
23 #include "amxfer.h"
24 #include "device.h"
25 #include "property.h"
26 #include "xfer-device.h"
27 #include "arglist.h"
28 #include "conffile.h"
29
30 /*
31  * Class declaration
32  *
33  * This declaration is entirely private; nothing but xfer_source_recovery() references
34  * it directly.
35  */
36
37 GType xfer_source_recovery_get_type(void);
38 #define XFER_SOURCE_RECOVERY_TYPE (xfer_source_recovery_get_type())
39 #define XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery)
40 #define XFER_SOURCE_RECOVERY_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery const)
41 #define XFER_SOURCE_RECOVERY_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
42 #define IS_XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_recovery_get_type ())
43 #define XFER_SOURCE_RECOVERY_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
44
45 static GObjectClass *parent_class = NULL;
46
47 /*
48  * Main object structure
49  */
50
51 typedef struct XferSourceRecovery {
52     XferElement __parent__;
53
54     /* thread for monitoring directtcp transfers */
55     GThread *thread;
56
57     /* this mutex in this condition variable governs all variables below */
58     GCond *start_part_cond;
59     GMutex *start_part_mutex;
60
61     /* is this device currently paused and awaiting a new part? */
62     gboolean paused;
63
64     /* device to read from (refcounted) */
65     Device *device;
66
67     /* TRUE if use_device found the device unsuitable; this makes start_part
68      * a no-op, allowing the cancellation to be handled normally */
69     gboolean device_bad;
70
71     /* directtcp connection (only valid after XMSG_READY) */
72     DirectTCPConnection *conn;
73     gboolean listen_ok;
74
75     /* and the block size for that device (reset to zero at the start of each
76      * part) */
77     size_t block_size;
78
79     /* part size (potentially including any zero-padding from the
80      * device) */
81     guint64 part_size;
82
83     /* timer for the duration; NULL while paused or cancelled */
84     GTimer *part_timer;
85 } XferSourceRecovery;
86
87 /*
88  * Class definition
89  */
90
91 typedef struct {
92     XferElementClass __parent__;
93
94     /* start reading the part at which DEVICE is positioned, sending an
95      * XMSG_PART_DONE when the part has been read */
96     void (*start_part)(XferSourceRecovery *self, Device *device);
97
98     /* use the given device, much like the same method for xfer-dest-taper */
99     void (*use_device)(XferSourceRecovery *self, Device *device);
100 } XferSourceRecoveryClass;
101
102 /*
103  * Debug Logging
104  */
105
106 #define DBG(LEVEL, ...) if (debug_recovery >= LEVEL) { _xsr_dbg(__VA_ARGS__); }
107 static void
108 _xsr_dbg(const char *fmt, ...)
109 {
110     va_list argp;
111     char msg[1024];
112
113     arglist_start(argp, fmt);
114     g_vsnprintf(msg, sizeof(msg), fmt, argp);
115     arglist_end(argp);
116     g_debug("XSR thd-%p: %s", g_thread_self(), msg);
117 }
118
119 /*
120  * Implementation
121  */
122
123 /* common code for both directtcp_listen_thread and directtcp_connect_thread;
124  * this is called after self->conn is filled in and carries out the data
125  * transfer over that connection.  NOTE: start_part_mutex is HELD when this
126  * function begins */
127 static gpointer
128 directtcp_common_thread(
129         XferSourceRecovery *self)
130 {
131     XferElement *elt = XFER_ELEMENT(self);
132     char *errmsg = NULL;
133
134     /* send XMSG_READY to indicate it's OK to call start_part now */
135     DBG(2, "sending XMSG_READY");
136     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
137
138     /* now we sit around waiting for signals to write a part */
139     while (1) {
140         guint64 actual_size;
141         XMsg *msg;
142
143         while (self->paused && !elt->cancelled) {
144             DBG(9, "waiting to be un-paused");
145             g_cond_wait(self->start_part_cond, self->start_part_mutex);
146         }
147         DBG(9, "done waiting");
148
149         if (elt->cancelled) {
150             g_mutex_unlock(self->start_part_mutex);
151             goto close_conn_and_send_done;
152         }
153
154         /* if the device is NULL, we're done */
155         if (!self->device)
156             break;
157
158         /* read the part */
159         self->part_timer = g_timer_new();
160
161         while (1) {
162             DBG(2, "reading part from %s", self->device->device_name);
163             if (!device_read_to_connection(self->device, G_MAXUINT64, &actual_size)) {
164                 xfer_cancel_with_error(elt, _("error reading from device: %s"),
165                     device_error_or_status(self->device));
166                 g_mutex_unlock(self->start_part_mutex);
167                 goto close_conn_and_send_done;
168             }
169
170             /* break on EOF; otherwise do another read_to_connection */
171             if (self->device->is_eof) {
172                 break;
173             }
174         }
175         DBG(2, "done reading part; sending XMSG_PART_DONE");
176
177         /* the device has signalled EOF (really end-of-part), so clean up instance
178          * variables and report the EOP to the caller in the form of an xmsg */
179         msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
180         msg->size = actual_size;
181         msg->duration = g_timer_elapsed(self->part_timer, NULL);
182         msg->partnum = 0;
183         msg->fileno = self->device->file;
184         msg->successful = TRUE;
185         msg->eof = FALSE;
186
187         self->paused = TRUE;
188         g_object_unref(self->device);
189         self->device = NULL;
190         self->part_size = 0;
191         self->block_size = 0;
192         g_timer_destroy(self->part_timer);
193         self->part_timer = NULL;
194
195         xfer_queue_message(elt->xfer, msg);
196     }
197     g_mutex_unlock(self->start_part_mutex);
198
199 close_conn_and_send_done:
200     if (self->conn) {
201         errmsg = directtcp_connection_close(self->conn);
202         g_object_unref(self->conn);
203         self->conn = NULL;
204         if (errmsg) {
205             xfer_cancel_with_error(elt, _("error closing DirectTCP connection: %s"), errmsg);
206             wait_until_xfer_cancelled(elt->xfer);
207         }
208     }
209
210     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
211
212     return NULL;
213 }
214
215 static gpointer
216 directtcp_connect_thread(
217         gpointer data)
218 {
219     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
220     XferElement *elt = XFER_ELEMENT(self);
221
222     DBG(1, "(this is directtcp_connect_thread)")
223
224     /* first, we need to accept the incoming connection; we do this while
225      * holding the start_part_mutex, so that a part doesn't get started until
226      * we're finished with the device */
227     g_mutex_lock(self->start_part_mutex);
228
229     if (elt->cancelled) {
230         g_mutex_unlock(self->start_part_mutex);
231         goto send_done;
232     }
233
234     g_assert(self->device != NULL); /* have a device */
235     g_assert(elt->output_listen_addrs != NULL); /* listening on it */
236     g_assert(self->listen_ok);
237
238     DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
239     if (!device_accept(self->device, &self->conn, NULL, NULL)) {
240         xfer_cancel_with_error(elt,
241             _("error accepting DirectTCP connection: %s"),
242             device_error_or_status(self->device));
243         g_mutex_unlock(self->start_part_mutex);
244         wait_until_xfer_cancelled(elt->xfer);
245         goto send_done;
246     }
247     DBG(2, "DirectTCP connection accepted");
248
249     return directtcp_common_thread(self);
250
251 send_done:
252     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
253     return NULL;
254 }
255
256 static gpointer
257 directtcp_listen_thread(
258         gpointer data)
259 {
260     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
261     XferElement *elt = XFER_ELEMENT(self);
262
263     DBG(1, "(this is directtcp_listen_thread)");
264
265     /* we need to make an outgoing connection to downstream; we do this while
266      * holding the start_part_mutex, so that a part doesn't get started until
267      * we're finished with the device */
268     g_mutex_lock(self->start_part_mutex);
269
270     if (elt->cancelled) {
271         g_mutex_unlock(self->start_part_mutex);
272         goto send_done;
273     }
274
275     g_assert(self->device != NULL); /* have a device */
276     g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */
277
278     DBG(2, "making DirectTCP connection on device %s", self->device->device_name);
279     if (!device_connect(self->device, FALSE, elt->downstream->input_listen_addrs,
280                         &self->conn, NULL, NULL)) {
281         xfer_cancel_with_error(elt,
282             _("error making DirectTCP connection: %s"),
283             device_error_or_status(self->device));
284         g_mutex_unlock(self->start_part_mutex);
285         wait_until_xfer_cancelled(elt->xfer);
286         goto send_done;
287     }
288     DBG(2, "DirectTCP connect succeeded");
289
290     return directtcp_common_thread(self);
291
292 send_done:
293     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
294     return NULL;
295 }
296
297 static gboolean
298 setup_impl(
299     XferElement *elt)
300 {
301     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
302
303     if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
304         g_assert(self->device != NULL);
305         DBG(2, "listening for DirectTCP connection on device %s", self->device->device_name);
306         if (!device_listen(self->device, FALSE, &elt->output_listen_addrs)) {
307             xfer_cancel_with_error(elt,
308                 _("error listening for DirectTCP connection: %s"),
309                 device_error_or_status(self->device));
310             return FALSE;
311         }
312         self->listen_ok = TRUE;
313     } else {
314         /* no output_listen_addrs for either XFER_MECH_DIRECTTCP_LISTEN or
315          * XFER_MECH_PULL_BUFFER */
316         elt->output_listen_addrs = NULL;
317     }
318
319     return TRUE;
320 }
321
322 static gboolean
323 start_impl(
324     XferElement *elt)
325 {
326     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
327
328     if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
329         g_assert(elt->output_listen_addrs != NULL);
330         self->thread = g_thread_create(directtcp_connect_thread, (gpointer)self, FALSE, NULL);
331         return TRUE; /* we'll send XMSG_DONE */
332     } else if (elt->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
333         g_assert(elt->output_listen_addrs == NULL);
334         self->thread = g_thread_create(directtcp_listen_thread, (gpointer)self, FALSE, NULL);
335         return TRUE; /* we'll send XMSG_DONE */
336     } else {
337         /* nothing to prepare for - we're ready already! */
338         DBG(2, "not using DirectTCP: sending XMSG_READY immediately");
339         xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
340
341         return FALSE; /* we won't send XMSG_DONE */
342     }
343 }
344
345 static gpointer
346 pull_buffer_impl(
347     XferElement *elt,
348     size_t *size)
349 {
350     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
351     gpointer buf = NULL;
352     int result;
353     int devsize;
354     XMsg *msg;
355
356     g_assert(elt->output_mech == XFER_MECH_PULL_BUFFER);
357     g_mutex_lock(self->start_part_mutex);
358
359     while (1) {
360         /* make sure we have a device */
361         while (self->paused && !elt->cancelled)
362             g_cond_wait(self->start_part_cond, self->start_part_mutex);
363
364         /* indicate EOF on an cancel or when there are no more parts */
365         if (elt->cancelled || !self->device) {
366             goto error;
367         }
368
369         /* start the timer if this is the first pull_buffer of this part */
370         if (!self->part_timer) {
371             DBG(2, "first pull_buffer of new part");
372             self->part_timer = g_timer_new();
373         }
374
375         /* loop until we read a full block, in case the blocks are larger than
376          * expected */
377         if (self->block_size == 0)
378             self->block_size = (size_t)self->device->block_size;
379
380         do {
381             buf = g_malloc(self->block_size);
382             devsize = (int)self->block_size;
383             result = device_read_block(self->device, buf, &devsize);
384             *size = devsize;
385
386             if (result == 0) {
387                 g_assert(*size > self->block_size);
388                 self->block_size = devsize;
389                 amfree(buf);
390             }
391         } while (result == 0);
392
393         /* if this block was successful, return it */
394         if (result > 0) {
395             self->part_size += *size;
396             break;
397         }
398
399         if (result < 0) {
400             amfree(buf);
401
402             /* if we're not at EOF, it's an error */
403             if (!self->device->is_eof) {
404                 xfer_cancel_with_error(elt,
405                     _("error reading from %s: %s"),
406                     self->device->device_name,
407                     device_error_or_status(self->device));
408                 wait_until_xfer_cancelled(elt->xfer);
409                 goto error;
410             }
411
412             /* the device has signalled EOF (really end-of-part), so clean up instance
413              * variables and report the EOP to the caller in the form of an xmsg */
414             DBG(2, "pull_buffer hit EOF; sending XMSG_PART_DONE");
415             msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
416             msg->size = self->part_size;
417             msg->duration = g_timer_elapsed(self->part_timer, NULL);
418             msg->partnum = 0;
419             msg->fileno = self->device->file;
420             msg->successful = TRUE;
421             msg->eof = FALSE;
422
423             self->paused = TRUE;
424             g_object_unref(self->device);
425             self->device = NULL;
426             self->part_size = 0;
427             self->block_size = 0;
428             if (self->part_timer) {
429                 g_timer_destroy(self->part_timer);
430                 self->part_timer = NULL;
431             }
432
433             /* don't queue the XMSG_PART_DONE until we've adjusted all of our
434              * instance variables appropriately */
435             xfer_queue_message(elt->xfer, msg);
436         }
437     }
438
439     g_mutex_unlock(self->start_part_mutex);
440
441     return buf;
442 error:
443     g_mutex_unlock(self->start_part_mutex);
444     *size = 0;
445     return NULL;
446 }
447
448 static gboolean
449 cancel_impl(
450     XferElement *elt,
451     gboolean expect_eof G_GNUC_UNUSED)
452 {
453     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
454     elt->cancelled = TRUE;
455
456     /* trigger the condition variable, in case the thread is waiting on it */
457     g_mutex_lock(self->start_part_mutex);
458     g_cond_broadcast(self->start_part_cond);
459     g_mutex_unlock(self->start_part_mutex);
460
461     return TRUE;
462 }
463
464 static void
465 start_part_impl(
466     XferSourceRecovery *self,
467     Device *device)
468 {
469     g_assert(!device || device->in_file);
470
471     DBG(2, "start_part called");
472
473     if (self->device_bad) {
474         /* use_device didn't like the device it got, but the xfer cancellation
475          * has not completed yet, so do nothing */
476         return;
477     }
478
479     g_mutex_lock(self->start_part_mutex);
480
481     /* make sure we're ready to go */
482     g_assert(self->paused);
483     if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT
484      || XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
485         g_assert(self->conn != NULL);
486     }
487
488     /* if we already have a device, it should have been given to use_device */
489     if (device && self->device)
490         g_assert(self->device == device);
491
492     if (self->device)
493         g_object_unref(self->device);
494     if (device)
495         g_object_ref(device);
496     self->device = device;
497
498     self->paused = FALSE;
499
500     DBG(2, "triggering condition variable");
501     g_cond_broadcast(self->start_part_cond);
502     g_mutex_unlock(self->start_part_mutex);
503 }
504
505 static void
506 use_device_impl(
507     XferSourceRecovery *xdtself,
508     Device *device)
509 {
510     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(xdtself);
511
512     g_assert(self->paused);
513
514     /* short-circuit if nothing is changing */
515     if (self->device == device)
516         return;
517
518     if (self->device)
519         g_object_unref(self->device);
520     self->device = NULL;
521
522     /* if we already have a connection, then make this device use it */
523     if (self->conn) {
524         if (!device_use_connection(device, self->conn)) {
525             /* queue up an error for later, and set device_bad.
526              * start_part will see this and fail silently */
527             self->device_bad = TRUE;
528             xfer_cancel_with_error(XFER_ELEMENT(self),
529                 _("Cannot continue onto new volume: %s"),
530                 device_error_or_status(device));
531             return;
532         }
533     }
534
535     self->device = device;
536     g_object_ref(device);
537 }
538
539 static xfer_element_mech_pair_t *
540 get_mech_pairs_impl(
541     XferElement *elt)
542 {
543     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
544     static xfer_element_mech_pair_t basic_mech_pairs[] = {
545         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
546         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
547     };
548     static xfer_element_mech_pair_t directtcp_mech_pairs[] = {
549         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 0, 1},
550         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 0, 1},
551         /* devices which support DirectTCP are usually not very efficient
552          * at delivering data via device_read_block, so this counts an extra
553          * byte operation in the cost metrics (2 here vs. 1 in basic_mech_pairs).
554          * This is a hack, but it will do for now. */
555         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 2, 0},
556         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
557     };
558
559     return device_directtcp_supported(self->device)?
560         directtcp_mech_pairs : basic_mech_pairs;
561 }
562
563 static void
564 finalize_impl(
565     GObject * obj_self)
566 {
567     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(obj_self);
568
569     if (self->conn)
570         g_object_unref(self->conn);
571     if (self->device)
572         g_object_unref(self->device);
573
574     g_cond_free(self->start_part_cond);
575     g_mutex_free(self->start_part_mutex);
576 }
577
578 static void
579 instance_init(
580     XferElement *elt)
581 {
582     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
583
584     self->paused = TRUE;
585     self->start_part_cond = g_cond_new();
586     self->start_part_mutex = g_mutex_new();
587 }
588
589 static void
590 class_init(
591     XferSourceRecoveryClass * xsr_klass)
592 {
593     XferElementClass *klass = XFER_ELEMENT_CLASS(xsr_klass);
594     GObjectClass *gobject_klass = G_OBJECT_CLASS(xsr_klass);
595
596     klass->pull_buffer = pull_buffer_impl;
597     klass->cancel = cancel_impl;
598     klass->start = start_impl;
599     klass->setup = setup_impl;
600     klass->get_mech_pairs = get_mech_pairs_impl;
601
602     klass->perl_class = "Amanda::Xfer::Source::Recovery";
603     klass->mech_pairs = NULL; /* see get_mech_pairs_impl, above */
604
605     xsr_klass->start_part = start_part_impl;
606     xsr_klass->use_device = use_device_impl;
607
608     gobject_klass->finalize = finalize_impl;
609
610     parent_class = g_type_class_peek_parent(xsr_klass);
611 }
612
613 GType
614 xfer_source_recovery_get_type (void)
615 {
616     static GType type = 0;
617
618     if G_UNLIKELY(type == 0) {
619         static const GTypeInfo info = {
620             sizeof (XferSourceRecoveryClass),
621             (GBaseInitFunc) NULL,
622             (GBaseFinalizeFunc) NULL,
623             (GClassInitFunc) class_init,
624             (GClassFinalizeFunc) NULL,
625             NULL /* class_data */,
626             sizeof (XferSourceRecovery),
627             0 /* n_preallocs */,
628             (GInstanceInitFunc) instance_init,
629             NULL
630         };
631
632         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceRecovery", &info, 0);
633     }
634
635     return type;
636 }
637
638 /*
639  * Public methods and stubs
640  */
641
642 void
643 xfer_source_recovery_start_part(
644     XferElement *elt,
645     Device *device)
646 {
647     XferSourceRecoveryClass *klass;
648     g_assert(IS_XFER_SOURCE_RECOVERY(elt));
649
650     klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
651     klass->start_part(XFER_SOURCE_RECOVERY(elt), device);
652 }
653
654 /* create an element of this class; prototype is in xfer-device.h */
655 XferElement *
656 xfer_source_recovery(Device *first_device)
657 {
658     XferSourceRecovery *self = (XferSourceRecovery *)g_object_new(XFER_SOURCE_RECOVERY_TYPE, NULL);
659     XferElement *elt = XFER_ELEMENT(self);
660
661     g_assert(first_device != NULL);
662     g_object_ref(first_device);
663     self->device = first_device;
664
665     return elt;
666 }
667
668 void
669 xfer_source_recovery_use_device(
670     XferElement *elt,
671     Device *device)
672 {
673     XferSourceRecoveryClass *klass;
674     g_assert(IS_XFER_SOURCE_RECOVERY(elt));
675
676     klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
677     klass->use_device(XFER_SOURCE_RECOVERY(elt), device);
678 }