425a83789218e3f8a3a248d5f7696e16583475a9
[debian/amanda] / device-src / xfer-source-recovery.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amxfer.h"
23 #include "amanda.h"
24 #include "device.h"
25 #include "property.h"
26 #include "xfer-device.h"
27 #include "arglist.h"
28 #include "conffile.h"
29
30 /*
31  * Class declaration
32  *
33  * This declaration is entirely private; nothing but xfer_source_recovery() references
34  * it directly.
35  */
36
37 GType xfer_source_recovery_get_type(void);
38 #define XFER_SOURCE_RECOVERY_TYPE (xfer_source_recovery_get_type())
39 #define XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery)
40 #define XFER_SOURCE_RECOVERY_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery const)
41 #define XFER_SOURCE_RECOVERY_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
42 #define IS_XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_recovery_get_type ())
43 #define XFER_SOURCE_RECOVERY_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
44
45 static GObjectClass *parent_class = NULL;
46
47 /*
48  * Main object structure
49  */
50
51 typedef struct XferSourceRecovery {
52     XferElement __parent__;
53
54     /* thread for monitoring directtcp transfers */
55     GThread *thread;
56
57     /* this mutex in this condition variable governs all variables below */
58     GCond *start_part_cond;
59     GMutex *start_part_mutex;
60
61     /* is this device currently paused and awaiting a new part? */
62     gboolean paused;
63
64     /* device to read from (refcounted) */
65     Device *device;
66
67     /* TRUE if use_device found the device unsuitable; this makes start_part
68      * a no-op, allowing the cancellation to be handled normally */
69     gboolean device_bad;
70
71     /* directtcp connection (only valid after XMSG_READY) */
72     DirectTCPConnection *conn;
73     gboolean listen_ok;
74
75     /* and the block size for that device (reset to zero at the start of each
76      * part) */
77     size_t block_size;
78
79     /* part size (potentially including any zero-padding from the
80      * device) */
81     guint64 part_size;
82
83     /* timer for the duration; NULL while paused or cancelled */
84     GTimer *part_timer;
85 } XferSourceRecovery;
86
87 /*
88  * Class definition
89  */
90
91 typedef struct {
92     XferElementClass __parent__;
93
94     /* start reading the part at which DEVICE is positioned, sending an
95      * XMSG_PART_DONE when the part has been read */
96     void (*start_part)(XferSourceRecovery *self, Device *device);
97
98     /* use the given device, much like the same method for xfer-dest-taper */
99     void (*use_device)(XferSourceRecovery *self, Device *device);
100 } XferSourceRecoveryClass;
101
102 /*
103  * Debug Logging
104  */
105
106 #define DBG(LEVEL, ...) if (debug_recovery >= LEVEL) { _xsr_dbg(__VA_ARGS__); }
107 static void
108 _xsr_dbg(const char *fmt, ...)
109 {
110     va_list argp;
111     char msg[1024];
112
113     arglist_start(argp, fmt);
114     g_vsnprintf(msg, sizeof(msg), fmt, argp);
115     arglist_end(argp);
116     g_debug("XSR thd-%p: %s", g_thread_self(), msg);
117 }
118
119 /*
120  * Implementation
121  */
122
123 static gpointer
124 directtcp_thread(
125         gpointer data)
126 {
127     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
128     XferElement *elt = XFER_ELEMENT(self);
129     char *errmsg = NULL;
130
131     /* first, we need to accept the incoming connection; we do this while
132      * holding the start_part_mutex, so that a part doesn't get started until
133      * we're finished with the device */
134     g_mutex_lock(self->start_part_mutex);
135
136     if (elt->cancelled) {
137         g_mutex_unlock(self->start_part_mutex);
138         goto send_done;
139     }
140
141     g_assert(self->device != NULL); /* have a device */
142     g_assert(elt->output_listen_addrs != NULL); /* listening on it */
143     g_assert(self->listen_ok);
144
145     DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
146     if (!device_accept(self->device, &self->conn, NULL, NULL)) {
147         xfer_cancel_with_error(elt,
148             _("error accepting DirectTCP connection: %s"),
149             device_error_or_status(self->device));
150         g_mutex_unlock(self->start_part_mutex);
151         wait_until_xfer_cancelled(elt->xfer);
152         goto send_done;
153     }
154
155     /* send XMSG_READY to indicate it's OK to call start_part now */
156     DBG(2, "connection accepted; sending XMSG_READY");
157     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
158
159     /* now we sit around waiting for signals to write a part */
160     while (1) {
161         guint64 actual_size;
162         XMsg *msg;
163
164         while (self->paused && !elt->cancelled) {
165             DBG(9, "waiting to be un-paused");
166             g_cond_wait(self->start_part_cond, self->start_part_mutex);
167         }
168         DBG(9, "done waiting");
169
170         if (elt->cancelled) {
171             g_mutex_unlock(self->start_part_mutex);
172             goto close_conn_and_send_done;
173         }
174
175         /* if the device is NULL, we're done */
176         if (!self->device)
177             break;
178
179         /* read the part */
180         self->part_timer = g_timer_new();
181
182         while (1) {
183             DBG(2, "reading part from %s", self->device->device_name);
184             if (!device_read_to_connection(self->device, G_MAXUINT64, &actual_size)) {
185                 xfer_cancel_with_error(elt, _("error reading from device: %s"),
186                     device_error_or_status(self->device));
187                 g_mutex_unlock(self->start_part_mutex);
188                 goto close_conn_and_send_done;
189             }
190
191             /* break on EOF; otherwise do another read_to_connection */
192             if (self->device->is_eof) {
193                 break;
194             }
195         }
196         DBG(2, "done reading part; sending XMSG_PART_DONE");
197
198         /* the device has signalled EOF (really end-of-part), so clean up instance
199          * variables and report the EOP to the caller in the form of an xmsg */
200         msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
201         msg->size = actual_size;
202         msg->duration = g_timer_elapsed(self->part_timer, NULL);
203         msg->partnum = 0;
204         msg->fileno = self->device->file;
205         msg->successful = TRUE;
206         msg->eof = FALSE;
207
208         self->paused = TRUE;
209         g_object_unref(self->device);
210         self->device = NULL;
211         self->part_size = 0;
212         self->block_size = 0;
213         g_timer_destroy(self->part_timer);
214         self->part_timer = NULL;
215
216         xfer_queue_message(elt->xfer, msg);
217     }
218     g_mutex_unlock(self->start_part_mutex);
219
220 close_conn_and_send_done:
221     if (self->conn) {
222         errmsg = directtcp_connection_close(self->conn);
223         g_object_unref(self->conn);
224         self->conn = NULL;
225         if (errmsg) {
226             xfer_cancel_with_error(elt, _("error closing DirectTCP connection: %s"), errmsg);
227             wait_until_xfer_cancelled(elt->xfer);
228         }
229     }
230
231 send_done:
232     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
233
234     return NULL;
235 }
236
237 static gboolean
238 setup_impl(
239     XferElement *elt)
240 {
241     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
242
243     if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
244         g_assert(self->device != NULL);
245         DBG(2, "listening for DirectTCP connection on device %s", self->device->device_name);
246         if (!device_listen(self->device, FALSE, &elt->output_listen_addrs)) {
247             xfer_cancel_with_error(elt,
248                 _("error listening for DirectTCP connection: %s"),
249                 device_error_or_status(self->device));
250             return FALSE;
251         }
252         self->listen_ok = TRUE;
253     } else {
254         elt->output_listen_addrs = NULL;
255     }
256
257     return TRUE;
258 }
259
260 static gboolean
261 start_impl(
262     XferElement *elt)
263 {
264     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
265
266     if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
267         g_assert(elt->output_listen_addrs != NULL);
268         self->thread = g_thread_create(directtcp_thread, (gpointer)self, FALSE, NULL);
269         return TRUE; /* we'll send XMSG_DONE */
270     } else {
271         /* nothing to prepare for - we're ready already! */
272         DBG(2, "not using DirectTCP: sending XMSG_READY immediately");
273         xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
274
275         return FALSE; /* we won't send XMSG_DONE */
276     }
277 }
278
279 static gpointer
280 pull_buffer_impl(
281     XferElement *elt,
282     size_t *size)
283 {
284     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
285     gpointer buf = NULL;
286     int result;
287     int devsize;
288     XMsg *msg;
289
290     g_assert(elt->output_mech == XFER_MECH_PULL_BUFFER);
291     g_mutex_lock(self->start_part_mutex);
292
293     while (1) {
294         /* make sure we have a device */
295         while (self->paused && !elt->cancelled)
296             g_cond_wait(self->start_part_cond, self->start_part_mutex);
297
298         /* indicate EOF on an cancel or when there are no more parts */
299         if (elt->cancelled || !self->device) {
300             goto error;
301         }
302
303         /* start the timer if this is the first pull_buffer of this part */
304         if (!self->part_timer) {
305             DBG(2, "first pull_buffer of new part");
306             self->part_timer = g_timer_new();
307         }
308
309         /* loop until we read a full block, in case the blocks are larger than
310          * expected */
311         if (self->block_size == 0)
312             self->block_size = (size_t)self->device->block_size;
313
314         do {
315             buf = g_malloc(self->block_size);
316             devsize = (int)self->block_size;
317             result = device_read_block(self->device, buf, &devsize);
318             *size = devsize;
319
320             if (result == 0) {
321                 g_assert(*size > self->block_size);
322                 self->block_size = devsize;
323                 amfree(buf);
324             }
325         } while (result == 0);
326
327         /* if this block was successful, return it */
328         if (result > 0) {
329             self->part_size += *size;
330             break;
331         }
332
333         if (result < 0) {
334             amfree(buf);
335
336             /* if we're not at EOF, it's an error */
337             if (!self->device->is_eof) {
338                 xfer_cancel_with_error(elt,
339                     _("error reading from %s: %s"),
340                     self->device->device_name,
341                     device_error_or_status(self->device));
342                 wait_until_xfer_cancelled(elt->xfer);
343                 goto error;
344             }
345
346             /* the device has signalled EOF (really end-of-part), so clean up instance
347              * variables and report the EOP to the caller in the form of an xmsg */
348             DBG(2, "pull_buffer hit EOF; sending XMSG_PART_DONE");
349             msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
350             msg->size = self->part_size;
351             msg->duration = g_timer_elapsed(self->part_timer, NULL);
352             msg->partnum = 0;
353             msg->fileno = self->device->file;
354             msg->successful = TRUE;
355             msg->eof = FALSE;
356
357             self->paused = TRUE;
358             g_object_unref(self->device);
359             self->device = NULL;
360             self->part_size = 0;
361             self->block_size = 0;
362             if (self->part_timer) {
363                 g_timer_destroy(self->part_timer);
364                 self->part_timer = NULL;
365             }
366
367             /* don't queue the XMSG_PART_DONE until we've adjusted all of our
368              * instance variables appropriately */
369             xfer_queue_message(elt->xfer, msg);
370         }
371     }
372
373     g_mutex_unlock(self->start_part_mutex);
374
375     return buf;
376 error:
377     g_mutex_unlock(self->start_part_mutex);
378     *size = 0;
379     return NULL;
380 }
381
382 static gboolean
383 cancel_impl(
384     XferElement *elt,
385     gboolean expect_eof G_GNUC_UNUSED)
386 {
387     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
388     elt->cancelled = TRUE;
389
390     /* trigger the condition variable, in case the thread is waiting on it */
391     g_mutex_lock(self->start_part_mutex);
392     g_cond_broadcast(self->start_part_cond);
393     g_mutex_unlock(self->start_part_mutex);
394
395     return TRUE;
396 }
397
398 static void
399 start_part_impl(
400     XferSourceRecovery *self,
401     Device *device)
402 {
403     g_assert(!device || device->in_file);
404
405     DBG(2, "start_part called");
406
407     if (self->device_bad) {
408         /* use_device didn't like the device it got, but the xfer cancellation
409          * has not completed yet, so do nothing */
410         return;
411     }
412
413     g_mutex_lock(self->start_part_mutex);
414
415     /* make sure we're ready to go */
416     g_assert(self->paused);
417     if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
418         g_assert(self->conn != NULL);
419     }
420
421     /* if we already have a device, it should have been given to use_device */
422     if (device && self->device)
423         g_assert(self->device == device);
424
425     if (self->device)
426         g_object_unref(self->device);
427     if (device)
428         g_object_ref(device);
429     self->device = device;
430
431     self->paused = FALSE;
432
433     DBG(2, "triggering condition variable");
434     g_cond_broadcast(self->start_part_cond);
435     g_mutex_unlock(self->start_part_mutex);
436 }
437
438 static void
439 use_device_impl(
440     XferSourceRecovery *xdtself,
441     Device *device)
442 {
443     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(xdtself);
444
445     g_assert(self->paused);
446
447     /* short-circuit if nothing is changing */
448     if (self->device == device)
449         return;
450
451     if (self->device)
452         g_object_unref(self->device);
453     self->device = NULL;
454
455     /* if we already have a connection, then make this device use it */
456     if (self->conn) {
457         if (!device_use_connection(device, self->conn)) {
458             /* queue up an error for later, and set device_bad.
459              * start_part will see this and fail silently */
460             self->device_bad = TRUE;
461             xfer_cancel_with_error(XFER_ELEMENT(self),
462                 _("Cannot continue onto new volume: %s"),
463                 device_error_or_status(device));
464             return;
465         }
466     }
467
468     self->device = device;
469     g_object_ref(device);
470 }
471
472 static xfer_element_mech_pair_t *
473 get_mech_pairs_impl(
474     XferElement *elt)
475 {
476     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
477     static xfer_element_mech_pair_t basic_mech_pairs[] = {
478         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
479         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
480     };
481     static xfer_element_mech_pair_t directtcp_mech_pairs[] = {
482         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 0, 1},
483         /* devices which support DirectTCP are usually not very efficient
484          * at delivering data via device_read_block, so this counts an extra
485          * byte operation in the cost metrics (2 here vs. 1 in basic_mech_pairs).
486          * This is a hack, but it will do for now. */
487         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 2, 0},
488         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
489     };
490
491     return device_directtcp_supported(self->device)?
492         directtcp_mech_pairs : basic_mech_pairs;
493 }
494
495 static void
496 finalize_impl(
497     GObject * obj_self)
498 {
499     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(obj_self);
500
501     if (self->conn)
502         g_object_unref(self->conn);
503     if (self->device)
504         g_object_unref(self->device);
505
506     g_cond_free(self->start_part_cond);
507     g_mutex_free(self->start_part_mutex);
508 }
509
510 static void
511 instance_init(
512     XferElement *elt)
513 {
514     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
515
516     self->paused = TRUE;
517     self->start_part_cond = g_cond_new();
518     self->start_part_mutex = g_mutex_new();
519 }
520
521 static void
522 class_init(
523     XferSourceRecoveryClass * xsr_klass)
524 {
525     XferElementClass *klass = XFER_ELEMENT_CLASS(xsr_klass);
526     GObjectClass *gobject_klass = G_OBJECT_CLASS(xsr_klass);
527
528     klass->pull_buffer = pull_buffer_impl;
529     klass->cancel = cancel_impl;
530     klass->start = start_impl;
531     klass->setup = setup_impl;
532     klass->get_mech_pairs = get_mech_pairs_impl;
533
534     klass->perl_class = "Amanda::Xfer::Source::Recovery";
535     klass->mech_pairs = NULL; /* see get_mech_pairs_impl, above */
536
537     xsr_klass->start_part = start_part_impl;
538     xsr_klass->use_device = use_device_impl;
539
540     gobject_klass->finalize = finalize_impl;
541
542     parent_class = g_type_class_peek_parent(xsr_klass);
543 }
544
545 GType
546 xfer_source_recovery_get_type (void)
547 {
548     static GType type = 0;
549
550     if G_UNLIKELY(type == 0) {
551         static const GTypeInfo info = {
552             sizeof (XferSourceRecoveryClass),
553             (GBaseInitFunc) NULL,
554             (GBaseFinalizeFunc) NULL,
555             (GClassInitFunc) class_init,
556             (GClassFinalizeFunc) NULL,
557             NULL /* class_data */,
558             sizeof (XferSourceRecovery),
559             0 /* n_preallocs */,
560             (GInstanceInitFunc) instance_init,
561             NULL
562         };
563
564         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceRecovery", &info, 0);
565     }
566
567     return type;
568 }
569
570 /*
571  * Public methods and stubs
572  */
573
574 void
575 xfer_source_recovery_start_part(
576     XferElement *elt,
577     Device *device)
578 {
579     XferSourceRecoveryClass *klass;
580     g_assert(IS_XFER_SOURCE_RECOVERY(elt));
581
582     klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
583     klass->start_part(XFER_SOURCE_RECOVERY(elt), device);
584 }
585
586 /* create an element of this class; prototype is in xfer-device.h */
587 XferElement *
588 xfer_source_recovery(Device *first_device)
589 {
590     XferSourceRecovery *self = (XferSourceRecovery *)g_object_new(XFER_SOURCE_RECOVERY_TYPE, NULL);
591     XferElement *elt = XFER_ELEMENT(self);
592
593     g_assert(first_device != NULL);
594     g_object_ref(first_device);
595     self->device = first_device;
596
597     return elt;
598 }
599
600 void
601 xfer_source_recovery_use_device(
602     XferElement *elt,
603     Device *device)
604 {
605     XferSourceRecoveryClass *klass;
606     g_assert(IS_XFER_SOURCE_RECOVERY(elt));
607
608     klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
609     klass->use_device(XFER_SOURCE_RECOVERY(elt), device);
610 }