16673e0d1932abecda5f1ba130320e81bf797b81
[debian/amanda] / device-src / xfer-source-recovery.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amanda.h"
23 #include "amxfer.h"
24 #include "device.h"
25 #include "property.h"
26 #include "xfer-device.h"
27 #include "arglist.h"
28 #include "conffile.h"
29
30 /*
31  * Class declaration
32  *
33  * This declaration is entirely private; nothing but xfer_source_recovery() references
34  * it directly.
35  */
36
37 GType xfer_source_recovery_get_type(void);
38 #define XFER_SOURCE_RECOVERY_TYPE (xfer_source_recovery_get_type())
39 #define XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery)
40 #define XFER_SOURCE_RECOVERY_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery const)
41 #define XFER_SOURCE_RECOVERY_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
42 #define IS_XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_recovery_get_type ())
43 #define XFER_SOURCE_RECOVERY_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
44
45 static GObjectClass *parent_class = NULL;
46
47 /*
48  * Main object structure
49  */
50
51 typedef struct XferSourceRecovery {
52     XferElement __parent__;
53
54     /* thread for monitoring directtcp transfers */
55     GThread *thread;
56
57     /* this mutex in this condition variable governs all variables below */
58     GCond *start_part_cond;
59     GMutex *start_part_mutex;
60
61     /* is this device currently paused and awaiting a new part? */
62     gboolean paused;
63
64     /* device to read from (refcounted) */
65     Device *device;
66
67     /* TRUE if use_device found the device unsuitable; this makes start_part
68      * a no-op, allowing the cancellation to be handled normally */
69     gboolean device_bad;
70
71     /* directtcp connection (only valid after XMSG_READY) */
72     DirectTCPConnection *conn;
73     gboolean listen_ok;
74
75     /* and the block size for that device (reset to zero at the start of each
76      * part) */
77     size_t block_size;
78
79     /* part size (potentially including any zero-padding from the
80      * device) */
81     guint64 part_size;
82
83     /* timer for the duration; NULL while paused or cancelled */
84     GTimer *part_timer;
85
86     gint64   size;
87 } XferSourceRecovery;
88
89 /*
90  * Class definition
91  */
92
93 typedef struct {
94     XferElementClass __parent__;
95
96     /* start reading the part at which DEVICE is positioned, sending an
97      * XMSG_PART_DONE when the part has been read */
98     void (*start_part)(XferSourceRecovery *self, Device *device);
99
100     /* use the given device, much like the same method for xfer-dest-taper */
101     void (*use_device)(XferSourceRecovery *self, Device *device);
102 } XferSourceRecoveryClass;
103
104 /*
105  * Debug Logging
106  */
107
108 #define DBG(LEVEL, ...) if (debug_recovery >= LEVEL) { _xsr_dbg(__VA_ARGS__); }
109 static void
110 _xsr_dbg(const char *fmt, ...)
111 {
112     va_list argp;
113     char msg[1024];
114
115     arglist_start(argp, fmt);
116     g_vsnprintf(msg, sizeof(msg), fmt, argp);
117     arglist_end(argp);
118     g_debug("XSR: %s", msg);
119 }
120
121 /*
122  * Implementation
123  */
124
125 /* common code for both directtcp_listen_thread and directtcp_connect_thread;
126  * this is called after self->conn is filled in and carries out the data
127  * transfer over that connection.  NOTE: start_part_mutex is HELD when this
128  * function begins */
129 static gpointer
130 directtcp_common_thread(
131         XferSourceRecovery *self)
132 {
133     XferElement *elt = XFER_ELEMENT(self);
134     char *errmsg = NULL;
135
136     /* send XMSG_READY to indicate it's OK to call start_part now */
137     DBG(2, "sending XMSG_READY");
138     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
139
140     /* now we sit around waiting for signals to write a part */
141     while (1) {
142         guint64 actual_size;
143         XMsg *msg;
144
145         while (self->paused && !elt->cancelled) {
146             DBG(9, "waiting to be un-paused");
147             g_cond_wait(self->start_part_cond, self->start_part_mutex);
148         }
149         DBG(9, "done waiting");
150
151         if (elt->cancelled) {
152             g_mutex_unlock(self->start_part_mutex);
153             goto close_conn_and_send_done;
154         }
155
156         /* if the device is NULL, we're done */
157         if (!self->device)
158             break;
159
160         /* read the part */
161         self->part_timer = g_timer_new();
162
163         while (1) {
164             DBG(2, "reading part from %s", self->device->device_name);
165             if (!device_read_to_connection(self->device, G_MAXUINT64, &actual_size)) {
166                 xfer_cancel_with_error(elt, _("error reading from device: %s"),
167                     device_error_or_status(self->device));
168                 g_mutex_unlock(self->start_part_mutex);
169                 goto close_conn_and_send_done;
170             }
171
172             /* break on EOF; otherwise do another read_to_connection */
173             if (self->device->is_eof) {
174                 break;
175             }
176         }
177         DBG(2, "done reading part; sending XMSG_PART_DONE");
178
179         /* the device has signalled EOF (really end-of-part), so clean up instance
180          * variables and report the EOP to the caller in the form of an xmsg */
181         msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
182         msg->size = actual_size;
183         msg->duration = g_timer_elapsed(self->part_timer, NULL);
184         msg->partnum = 0;
185         msg->fileno = self->device->file;
186         msg->successful = TRUE;
187         msg->eof = FALSE;
188
189         self->paused = TRUE;
190         g_object_unref(self->device);
191         self->device = NULL;
192         self->part_size = 0;
193         self->block_size = 0;
194         g_timer_destroy(self->part_timer);
195         self->part_timer = NULL;
196
197         xfer_queue_message(elt->xfer, msg);
198     }
199     g_mutex_unlock(self->start_part_mutex);
200
201 close_conn_and_send_done:
202     if (self->conn) {
203         errmsg = directtcp_connection_close(self->conn);
204         g_object_unref(self->conn);
205         self->conn = NULL;
206         if (errmsg) {
207             xfer_cancel_with_error(elt, _("error closing DirectTCP connection: %s"), errmsg);
208             wait_until_xfer_cancelled(elt->xfer);
209         }
210     }
211
212     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
213
214     return NULL;
215 }
216
217 static gpointer
218 directtcp_connect_thread(
219         gpointer data)
220 {
221     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
222     XferElement *elt = XFER_ELEMENT(self);
223
224     DBG(1, "(this is directtcp_connect_thread)")
225
226     /* first, we need to accept the incoming connection; we do this while
227      * holding the start_part_mutex, so that a part doesn't get started until
228      * we're finished with the device */
229     g_mutex_lock(self->start_part_mutex);
230
231     if (elt->cancelled) {
232         g_mutex_unlock(self->start_part_mutex);
233         goto send_done;
234     }
235
236     g_assert(self->device != NULL); /* have a device */
237     g_assert(elt->output_listen_addrs != NULL); /* listening on it */
238     g_assert(self->listen_ok);
239
240     DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
241     if (!device_accept(self->device, &self->conn, NULL, NULL)) {
242         xfer_cancel_with_error(elt,
243             _("error accepting DirectTCP connection: %s"),
244             device_error_or_status(self->device));
245         g_mutex_unlock(self->start_part_mutex);
246         wait_until_xfer_cancelled(elt->xfer);
247         goto send_done;
248     }
249     DBG(2, "DirectTCP connection accepted");
250
251     return directtcp_common_thread(self);
252
253 send_done:
254     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
255     return NULL;
256 }
257
258 static gpointer
259 directtcp_listen_thread(
260         gpointer data)
261 {
262     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
263     XferElement *elt = XFER_ELEMENT(self);
264
265     DBG(1, "(this is directtcp_listen_thread)");
266
267     /* we need to make an outgoing connection to downstream; we do this while
268      * holding the start_part_mutex, so that a part doesn't get started until
269      * we're finished with the device */
270     g_mutex_lock(self->start_part_mutex);
271
272     if (elt->cancelled) {
273         g_mutex_unlock(self->start_part_mutex);
274         goto send_done;
275     }
276
277     g_assert(self->device != NULL); /* have a device */
278     g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */
279
280     DBG(2, "making DirectTCP connection on device %s", self->device->device_name);
281     if (!device_connect(self->device, FALSE, elt->downstream->input_listen_addrs,
282                         &self->conn, NULL, NULL)) {
283         xfer_cancel_with_error(elt,
284             _("error making DirectTCP connection: %s"),
285             device_error_or_status(self->device));
286         g_mutex_unlock(self->start_part_mutex);
287         wait_until_xfer_cancelled(elt->xfer);
288         goto send_done;
289     }
290     DBG(2, "DirectTCP connect succeeded");
291
292     return directtcp_common_thread(self);
293
294 send_done:
295     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
296     return NULL;
297 }
298
299 static gboolean
300 setup_impl(
301     XferElement *elt)
302 {
303     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
304
305     if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
306         g_assert(self->device != NULL);
307         DBG(2, "listening for DirectTCP connection on device %s", self->device->device_name);
308         if (!device_listen(self->device, FALSE, &elt->output_listen_addrs)) {
309             xfer_cancel_with_error(elt,
310                 _("error listening for DirectTCP connection: %s"),
311                 device_error_or_status(self->device));
312             return FALSE;
313         }
314         self->listen_ok = TRUE;
315     } else {
316         /* no output_listen_addrs for either XFER_MECH_DIRECTTCP_LISTEN or
317          * XFER_MECH_PULL_BUFFER */
318         elt->output_listen_addrs = NULL;
319     }
320
321     return TRUE;
322 }
323
324 static gboolean
325 start_impl(
326     XferElement *elt)
327 {
328     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
329
330     if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
331         g_assert(elt->output_listen_addrs != NULL);
332         self->thread = g_thread_create(directtcp_connect_thread, (gpointer)self, FALSE, NULL);
333         return TRUE; /* we'll send XMSG_DONE */
334     } else if (elt->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
335         g_assert(elt->output_listen_addrs == NULL);
336         self->thread = g_thread_create(directtcp_listen_thread, (gpointer)self, FALSE, NULL);
337         return TRUE; /* we'll send XMSG_DONE */
338     } else {
339         /* nothing to prepare for - we're ready already! */
340         DBG(2, "not using DirectTCP: sending XMSG_READY immediately");
341         xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
342
343         return FALSE; /* we won't send XMSG_DONE */
344     }
345 }
346
347 static gpointer
348 pull_buffer_impl(
349     XferElement *elt,
350     size_t *size)
351 {
352     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
353     gpointer buf = NULL;
354     int result;
355     int devsize;
356     XMsg *msg;
357
358     g_assert(elt->output_mech == XFER_MECH_PULL_BUFFER);
359     g_mutex_lock(self->start_part_mutex);
360
361     while (1) {
362         /* make sure we have a device */
363         while (self->paused && !elt->cancelled)
364             g_cond_wait(self->start_part_cond, self->start_part_mutex);
365
366         /* indicate EOF on an cancel or when there are no more parts */
367         if (elt->cancelled || !self->device) {
368             goto error;
369         }
370
371         /* start the timer if this is the first pull_buffer of this part */
372         if (!self->part_timer) {
373             DBG(2, "first pull_buffer of new part");
374             self->part_timer = g_timer_new();
375         }
376
377         /* loop until we read a full block, in case the blocks are larger than
378          * expected */
379         if (self->block_size == 0)
380             self->block_size = (size_t)self->device->block_size;
381
382         do {
383             buf = g_malloc(self->block_size);
384             devsize = (int)self->block_size;
385             result = device_read_block(self->device, buf, &devsize);
386             *size = devsize;
387
388             if (result == 0) {
389                 g_assert(*size > self->block_size);
390                 self->block_size = devsize;
391                 amfree(buf);
392             }
393         } while (result == 0);
394
395         /* if this block was successful, return it */
396         if (result > 0) {
397             self->part_size += *size;
398             break;
399         }
400
401         if (result < 0) {
402             amfree(buf);
403
404             /* if we're not at EOF, it's an error */
405             if (!self->device->is_eof) {
406                 xfer_cancel_with_error(elt,
407                     _("error reading from %s: %s"),
408                     self->device->device_name,
409                     device_error_or_status(self->device));
410                 g_mutex_unlock(self->start_part_mutex);
411                 wait_until_xfer_cancelled(elt->xfer);
412                 goto error_unlocked;
413             }
414
415             /* the device has signalled EOF (really end-of-part), so clean up instance
416              * variables and report the EOP to the caller in the form of an xmsg */
417             DBG(2, "pull_buffer hit EOF; sending XMSG_PART_DONE");
418             msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
419             msg->size = self->part_size;
420             msg->duration = g_timer_elapsed(self->part_timer, NULL);
421             msg->partnum = 0;
422             msg->fileno = self->device->file;
423             msg->successful = TRUE;
424             msg->eof = FALSE;
425
426             self->paused = TRUE;
427             g_object_unref(self->device);
428             self->device = NULL;
429             self->part_size = 0;
430             self->block_size = 0;
431             if (self->part_timer) {
432                 g_timer_destroy(self->part_timer);
433                 self->part_timer = NULL;
434             }
435
436             /* don't queue the XMSG_PART_DONE until we've adjusted all of our
437              * instance variables appropriately */
438             xfer_queue_message(elt->xfer, msg);
439         }
440     }
441
442     g_mutex_unlock(self->start_part_mutex);
443
444     if (elt->size > 0) {
445         /* initialize on first pass */
446         if (self->size == 0)
447             self->size = elt->size;
448         
449         if (self->size == -1) {
450             *size = 0;
451             amfree(buf);
452             return NULL;
453         }
454
455         if (*size > (guint64)self->size) {
456             /* return only self->size bytes */
457             *size = self->size;
458             self->size = -1;
459         } else {
460             self->size -= *size;
461         }
462     }
463
464     return buf;
465 error:
466     g_mutex_unlock(self->start_part_mutex);
467 error_unlocked:
468     *size = 0;
469     return NULL;
470 }
471
472 static gboolean
473 cancel_impl(
474     XferElement *elt,
475     gboolean expect_eof G_GNUC_UNUSED)
476 {
477     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
478     elt->cancelled = TRUE;
479
480     /* trigger the condition variable, in case the thread is waiting on it */
481     g_mutex_lock(self->start_part_mutex);
482     g_cond_broadcast(self->start_part_cond);
483     g_mutex_unlock(self->start_part_mutex);
484
485     return TRUE;
486 }
487
488 static void
489 start_part_impl(
490     XferSourceRecovery *self,
491     Device *device)
492 {
493     g_assert(!device || device->in_file);
494
495     DBG(2, "start_part called");
496
497     if (self->device_bad) {
498         /* use_device didn't like the device it got, but the xfer cancellation
499          * has not completed yet, so do nothing */
500         return;
501     }
502
503     g_mutex_lock(self->start_part_mutex);
504
505     /* make sure we're ready to go */
506     g_assert(self->paused);
507     if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT
508      || XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
509         g_assert(self->conn != NULL);
510     }
511
512     /* if we already have a device, it should have been given to use_device */
513     if (device && self->device)
514         g_assert(self->device == device);
515
516     if (self->device)
517         g_object_unref(self->device);
518     if (device)
519         g_object_ref(device);
520     self->device = device;
521
522     self->paused = FALSE;
523
524     DBG(2, "triggering condition variable");
525     g_cond_broadcast(self->start_part_cond);
526     g_mutex_unlock(self->start_part_mutex);
527 }
528
529 static void
530 use_device_impl(
531     XferSourceRecovery *xdtself,
532     Device *device)
533 {
534     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(xdtself);
535
536     g_assert(self->paused);
537
538     /* short-circuit if nothing is changing */
539     if (self->device == device)
540         return;
541
542     if (self->device)
543         g_object_unref(self->device);
544     self->device = NULL;
545
546     /* if we already have a connection, then make this device use it */
547     if (self->conn) {
548         if (!device_use_connection(device, self->conn)) {
549             /* queue up an error for later, and set device_bad.
550              * start_part will see this and fail silently */
551             self->device_bad = TRUE;
552             xfer_cancel_with_error(XFER_ELEMENT(self),
553                 _("Cannot continue onto new volume: %s"),
554                 device_error_or_status(device));
555             return;
556         }
557     }
558
559     self->device = device;
560     g_object_ref(device);
561 }
562
563 static xfer_element_mech_pair_t *
564 get_mech_pairs_impl(
565     XferElement *elt)
566 {
567     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
568     static xfer_element_mech_pair_t basic_mech_pairs[] = {
569         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
570         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
571     };
572     static xfer_element_mech_pair_t directtcp_mech_pairs[] = {
573         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 0, 1},
574         { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 0, 1},
575         /* devices which support DirectTCP are usually not very efficient
576          * at delivering data via device_read_block, so this counts an extra
577          * byte operation in the cost metrics (2 here vs. 1 in basic_mech_pairs).
578          * This is a hack, but it will do for now. */
579         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 2, 0},
580         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
581     };
582
583     return device_directtcp_supported(self->device)?
584         directtcp_mech_pairs : basic_mech_pairs;
585 }
586
587 static void
588 finalize_impl(
589     GObject * obj_self)
590 {
591     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(obj_self);
592
593     if (self->conn)
594         g_object_unref(self->conn);
595     if (self->device)
596         g_object_unref(self->device);
597
598     g_cond_free(self->start_part_cond);
599     g_mutex_free(self->start_part_mutex);
600 }
601
602 static void
603 instance_init(
604     XferElement *elt)
605 {
606     XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
607
608     self->paused = TRUE;
609     self->start_part_cond = g_cond_new();
610     self->start_part_mutex = g_mutex_new();
611 }
612
613 static void
614 class_init(
615     XferSourceRecoveryClass * xsr_klass)
616 {
617     XferElementClass *klass = XFER_ELEMENT_CLASS(xsr_klass);
618     GObjectClass *gobject_klass = G_OBJECT_CLASS(xsr_klass);
619
620     klass->pull_buffer = pull_buffer_impl;
621     klass->cancel = cancel_impl;
622     klass->start = start_impl;
623     klass->setup = setup_impl;
624     klass->get_mech_pairs = get_mech_pairs_impl;
625
626     klass->perl_class = "Amanda::Xfer::Source::Recovery";
627     klass->mech_pairs = NULL; /* see get_mech_pairs_impl, above */
628
629     xsr_klass->start_part = start_part_impl;
630     xsr_klass->use_device = use_device_impl;
631
632     gobject_klass->finalize = finalize_impl;
633
634     parent_class = g_type_class_peek_parent(xsr_klass);
635 }
636
637 GType
638 xfer_source_recovery_get_type (void)
639 {
640     static GType type = 0;
641
642     if G_UNLIKELY(type == 0) {
643         static const GTypeInfo info = {
644             sizeof (XferSourceRecoveryClass),
645             (GBaseInitFunc) NULL,
646             (GBaseFinalizeFunc) NULL,
647             (GClassInitFunc) class_init,
648             (GClassFinalizeFunc) NULL,
649             NULL /* class_data */,
650             sizeof (XferSourceRecovery),
651             0 /* n_preallocs */,
652             (GInstanceInitFunc) instance_init,
653             NULL
654         };
655
656         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceRecovery", &info, 0);
657     }
658
659     return type;
660 }
661
662 /*
663  * Public methods and stubs
664  */
665
666 void
667 xfer_source_recovery_start_part(
668     XferElement *elt,
669     Device *device)
670 {
671     XferSourceRecoveryClass *klass;
672     g_assert(IS_XFER_SOURCE_RECOVERY(elt));
673
674     klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
675     klass->start_part(XFER_SOURCE_RECOVERY(elt), device);
676 }
677
678 /* create an element of this class; prototype is in xfer-device.h */
679 XferElement *
680 xfer_source_recovery(Device *first_device)
681 {
682     XferSourceRecovery *self = (XferSourceRecovery *)g_object_new(XFER_SOURCE_RECOVERY_TYPE, NULL);
683     XferElement *elt = XFER_ELEMENT(self);
684
685     g_assert(first_device != NULL);
686     g_object_ref(first_device);
687     self->device = first_device;
688
689     return elt;
690 }
691
692 void
693 xfer_source_recovery_use_device(
694     XferElement *elt,
695     Device *device)
696 {
697     XferSourceRecoveryClass *klass;
698     g_assert(IS_XFER_SOURCE_RECOVERY(elt));
699
700     klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
701     klass->use_device(XFER_SOURCE_RECOVERY(elt), device);
702 }