2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 as published
7 * by the Free Software Foundation.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
26 #include "xfer-device.h"
33 * This declaration is entirely private; nothing but xfer_source_recovery() references
37 GType xfer_source_recovery_get_type(void);
38 #define XFER_SOURCE_RECOVERY_TYPE (xfer_source_recovery_get_type())
39 #define XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery)
40 #define XFER_SOURCE_RECOVERY_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery const)
41 #define XFER_SOURCE_RECOVERY_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
42 #define IS_XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_recovery_get_type ())
43 #define XFER_SOURCE_RECOVERY_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
45 static GObjectClass *parent_class = NULL;
48 * Main object structure
51 typedef struct XferSourceRecovery {
52 XferElement __parent__;
54 /* thread for monitoring directtcp transfers */
57 /* this mutex in this condition variable governs all variables below */
58 GCond *start_part_cond;
59 GMutex *start_part_mutex;
61 /* is this device currently paused and awaiting a new part? */
64 /* device to read from (refcounted) */
67 /* TRUE if use_device found the device unsuitable; this makes start_part
68 * a no-op, allowing the cancellation to be handled normally */
71 /* directtcp connection (only valid after XMSG_READY) */
72 DirectTCPConnection *conn;
75 /* and the block size for that device (reset to zero at the start of each
79 /* bytes read for this image */
82 /* part size (potentially including any zero-padding from the
86 /* timer for the duration; NULL while paused or cancelled */
97 XferElementClass __parent__;
99 /* start reading the part at which DEVICE is positioned, sending an
100 * XMSG_PART_DONE when the part has been read */
101 void (*start_part)(XferSourceRecovery *self, Device *device);
103 /* use the given device, much like the same method for xfer-dest-taper */
104 void (*use_device)(XferSourceRecovery *self, Device *device);
105 } XferSourceRecoveryClass;
111 #define DBG(LEVEL, ...) if (debug_recovery >= LEVEL) { _xsr_dbg(__VA_ARGS__); }
113 _xsr_dbg(const char *fmt, ...)
118 arglist_start(argp, fmt);
119 g_vsnprintf(msg, sizeof(msg), fmt, argp);
121 g_debug("XSR: %s", msg);
128 /* common code for both directtcp_listen_thread and directtcp_connect_thread;
129 * this is called after self->conn is filled in and carries out the data
130 * transfer over that connection. NOTE: start_part_mutex is HELD when this
133 directtcp_common_thread(
134 XferSourceRecovery *self)
136 XferElement *elt = XFER_ELEMENT(self);
139 /* send XMSG_READY to indicate it's OK to call start_part now */
140 DBG(2, "sending XMSG_READY");
141 xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
143 /* now we sit around waiting for signals to write a part */
148 while (self->paused && !elt->cancelled) {
149 DBG(9, "waiting to be un-paused");
150 g_cond_wait(self->start_part_cond, self->start_part_mutex);
152 DBG(9, "done waiting");
154 if (elt->cancelled) {
155 g_mutex_unlock(self->start_part_mutex);
156 goto close_conn_and_send_done;
159 /* if the device is NULL, we're done */
164 self->part_timer = g_timer_new();
167 DBG(2, "reading part from %s", self->device->device_name);
168 if (!device_read_to_connection(self->device, G_MAXUINT64, &actual_size)) {
169 xfer_cancel_with_error(elt, _("error reading from device: %s"),
170 device_error_or_status(self->device));
171 g_mutex_unlock(self->start_part_mutex);
172 goto close_conn_and_send_done;
175 /* break on EOF; otherwise do another read_to_connection */
176 if (self->device->is_eof) {
180 DBG(2, "done reading part; sending XMSG_PART_DONE");
182 /* the device has signalled EOF (really end-of-part), so clean up instance
183 * variables and report the EOP to the caller in the form of an xmsg */
184 msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
185 msg->size = actual_size;
186 msg->duration = g_timer_elapsed(self->part_timer, NULL);
188 msg->fileno = self->device->file;
189 msg->successful = TRUE;
193 g_object_unref(self->device);
196 self->block_size = 0;
197 g_timer_destroy(self->part_timer);
198 self->part_timer = NULL;
200 xfer_queue_message(elt->xfer, msg);
202 g_mutex_unlock(self->start_part_mutex);
204 close_conn_and_send_done:
206 errmsg = directtcp_connection_close(self->conn);
207 g_object_unref(self->conn);
210 xfer_cancel_with_error(elt, _("error closing DirectTCP connection: %s"), errmsg);
211 wait_until_xfer_cancelled(elt->xfer);
215 xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
221 directtcp_connect_thread(
224 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
225 XferElement *elt = XFER_ELEMENT(self);
227 DBG(1, "(this is directtcp_connect_thread)")
229 /* first, we need to accept the incoming connection; we do this while
230 * holding the start_part_mutex, so that a part doesn't get started until
231 * we're finished with the device */
232 g_mutex_lock(self->start_part_mutex);
234 if (elt->cancelled) {
235 g_mutex_unlock(self->start_part_mutex);
239 g_assert(self->device != NULL); /* have a device */
240 g_assert(elt->output_listen_addrs != NULL); /* listening on it */
241 g_assert(self->listen_ok);
243 DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
244 if (!device_accept(self->device, &self->conn, NULL, NULL)) {
245 xfer_cancel_with_error(elt,
246 _("error accepting DirectTCP connection: %s"),
247 device_error_or_status(self->device));
248 g_mutex_unlock(self->start_part_mutex);
249 wait_until_xfer_cancelled(elt->xfer);
252 DBG(2, "DirectTCP connection accepted");
254 return directtcp_common_thread(self);
257 xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
262 directtcp_listen_thread(
265 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
266 XferElement *elt = XFER_ELEMENT(self);
268 DBG(1, "(this is directtcp_listen_thread)");
270 /* we need to make an outgoing connection to downstream; we do this while
271 * holding the start_part_mutex, so that a part doesn't get started until
272 * we're finished with the device */
273 g_mutex_lock(self->start_part_mutex);
275 if (elt->cancelled) {
276 g_mutex_unlock(self->start_part_mutex);
280 g_assert(self->device != NULL); /* have a device */
281 g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */
283 DBG(2, "making DirectTCP connection on device %s", self->device->device_name);
284 if (!device_connect(self->device, FALSE, elt->downstream->input_listen_addrs,
285 &self->conn, NULL, NULL)) {
286 xfer_cancel_with_error(elt,
287 _("error making DirectTCP connection: %s"),
288 device_error_or_status(self->device));
289 g_mutex_unlock(self->start_part_mutex);
290 wait_until_xfer_cancelled(elt->xfer);
293 DBG(2, "DirectTCP connect succeeded");
295 return directtcp_common_thread(self);
298 xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
306 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
308 if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
309 g_assert(self->device != NULL);
310 DBG(2, "listening for DirectTCP connection on device %s", self->device->device_name);
311 if (!device_listen(self->device, FALSE, &elt->output_listen_addrs)) {
312 xfer_cancel_with_error(elt,
313 _("error listening for DirectTCP connection: %s"),
314 device_error_or_status(self->device));
317 self->listen_ok = TRUE;
319 /* no output_listen_addrs for either XFER_MECH_DIRECTTCP_LISTEN or
320 * XFER_MECH_PULL_BUFFER */
321 elt->output_listen_addrs = NULL;
331 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
333 if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
334 g_assert(elt->output_listen_addrs != NULL);
335 self->thread = g_thread_create(directtcp_connect_thread, (gpointer)self, FALSE, NULL);
336 return TRUE; /* we'll send XMSG_DONE */
337 } else if (elt->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
338 g_assert(elt->output_listen_addrs == NULL);
339 self->thread = g_thread_create(directtcp_listen_thread, (gpointer)self, FALSE, NULL);
340 return TRUE; /* we'll send XMSG_DONE */
342 /* nothing to prepare for - we're ready already! */
343 DBG(2, "not using DirectTCP: sending XMSG_READY immediately");
344 xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
346 return FALSE; /* we won't send XMSG_DONE */
355 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
361 g_assert(elt->output_mech == XFER_MECH_PULL_BUFFER);
362 g_mutex_lock(self->start_part_mutex);
365 /* make sure we have a device */
366 while (self->paused && !elt->cancelled)
367 g_cond_wait(self->start_part_cond, self->start_part_mutex);
369 /* indicate EOF on an cancel or when there are no more parts */
370 if (elt->cancelled || !self->device) {
374 /* start the timer if this is the first pull_buffer of this part */
375 if (!self->part_timer) {
376 DBG(2, "first pull_buffer of new part");
377 self->part_timer = g_timer_new();
380 /* loop until we read a full block, in case the blocks are larger than
382 if (self->block_size == 0)
383 self->block_size = (size_t)self->device->block_size;
386 buf = g_malloc(self->block_size);
387 devsize = (int)self->block_size;
388 result = device_read_block(self->device, buf, &devsize);
392 g_assert(*size > self->block_size);
393 self->block_size = devsize;
396 } while (result == 0);
398 /* if this block was successful, return it */
400 self->part_size += *size;
407 /* if we're not at EOF, it's an error */
408 if (!self->device->is_eof) {
409 xfer_cancel_with_error(elt,
410 _("error reading from %s: %s"),
411 self->device->device_name,
412 device_error_or_status(self->device));
413 g_mutex_unlock(self->start_part_mutex);
414 wait_until_xfer_cancelled(elt->xfer);
418 /* the device has signalled EOF (really end-of-part), so clean up instance
419 * variables and report the EOP to the caller in the form of an xmsg */
420 DBG(2, "pull_buffer hit EOF; sending XMSG_PART_DONE");
421 msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
422 msg->size = self->part_size;
423 msg->duration = g_timer_elapsed(self->part_timer, NULL);
425 msg->fileno = self->device->file;
426 msg->successful = TRUE;
430 g_object_unref(self->device);
432 self->bytes_read += self->part_size;
434 self->block_size = 0;
435 if (self->part_timer) {
436 g_timer_destroy(self->part_timer);
437 self->part_timer = NULL;
440 /* don't queue the XMSG_PART_DONE until we've adjusted all of our
441 * instance variables appropriately */
442 xfer_queue_message(elt->xfer, msg);
446 g_mutex_unlock(self->start_part_mutex);
449 /* initialize on first pass */
451 self->size = elt->size;
453 if (self->size == -1) {
459 if (*size > (guint64)self->size) {
460 /* return only self->size bytes */
470 g_mutex_unlock(self->start_part_mutex);
479 gboolean expect_eof G_GNUC_UNUSED)
481 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
482 elt->cancelled = TRUE;
484 /* trigger the condition variable, in case the thread is waiting on it */
485 g_mutex_lock(self->start_part_mutex);
486 g_cond_broadcast(self->start_part_cond);
487 g_mutex_unlock(self->start_part_mutex);
494 XferSourceRecovery *self,
497 g_assert(!device || device->in_file);
499 DBG(2, "start_part called");
501 if (self->device_bad) {
502 /* use_device didn't like the device it got, but the xfer cancellation
503 * has not completed yet, so do nothing */
507 g_mutex_lock(self->start_part_mutex);
509 /* make sure we're ready to go */
510 g_assert(self->paused);
511 if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT
512 || XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
513 g_assert(self->conn != NULL);
516 /* if we already have a device, it should have been given to use_device */
517 if (device && self->device)
518 g_assert(self->device == device);
521 g_object_unref(self->device);
523 g_object_ref(device);
524 self->device = device;
526 self->paused = FALSE;
528 DBG(2, "triggering condition variable");
529 g_cond_broadcast(self->start_part_cond);
530 g_mutex_unlock(self->start_part_mutex);
535 XferSourceRecovery *xdtself,
538 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(xdtself);
540 g_assert(self->paused);
542 /* short-circuit if nothing is changing */
543 if (self->device == device)
547 g_object_unref(self->device);
550 /* if we already have a connection, then make this device use it */
552 if (!device_use_connection(device, self->conn)) {
553 /* queue up an error for later, and set device_bad.
554 * start_part will see this and fail silently */
555 self->device_bad = TRUE;
556 xfer_cancel_with_error(XFER_ELEMENT(self),
557 _("Cannot continue onto new volume: %s"),
558 device_error_or_status(device));
563 self->device = device;
564 g_object_ref(device);
567 static xfer_element_mech_pair_t *
571 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
572 static xfer_element_mech_pair_t basic_mech_pairs[] = {
573 { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
574 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
576 static xfer_element_mech_pair_t directtcp_mech_pairs[] = {
577 { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 0, 1},
578 { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 0, 1},
579 /* devices which support DirectTCP are usually not very efficient
580 * at delivering data via device_read_block, so this counts an extra
581 * byte operation in the cost metrics (2 here vs. 1 in basic_mech_pairs).
582 * This is a hack, but it will do for now. */
583 { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 2, 0},
584 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
587 return device_directtcp_supported(self->device)?
588 directtcp_mech_pairs : basic_mech_pairs;
595 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(obj_self);
598 g_object_unref(self->conn);
600 g_object_unref(self->device);
602 g_cond_free(self->start_part_cond);
603 g_mutex_free(self->start_part_mutex);
610 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
613 self->start_part_cond = g_cond_new();
614 self->start_part_mutex = g_mutex_new();
619 XferSourceRecoveryClass * xsr_klass)
621 XferElementClass *klass = XFER_ELEMENT_CLASS(xsr_klass);
622 GObjectClass *gobject_klass = G_OBJECT_CLASS(xsr_klass);
624 klass->pull_buffer = pull_buffer_impl;
625 klass->cancel = cancel_impl;
626 klass->start = start_impl;
627 klass->setup = setup_impl;
628 klass->get_mech_pairs = get_mech_pairs_impl;
630 klass->perl_class = "Amanda::Xfer::Source::Recovery";
631 klass->mech_pairs = NULL; /* see get_mech_pairs_impl, above */
633 xsr_klass->start_part = start_part_impl;
634 xsr_klass->use_device = use_device_impl;
636 gobject_klass->finalize = finalize_impl;
638 parent_class = g_type_class_peek_parent(xsr_klass);
642 xfer_source_recovery_get_type (void)
644 static GType type = 0;
646 if G_UNLIKELY(type == 0) {
647 static const GTypeInfo info = {
648 sizeof (XferSourceRecoveryClass),
649 (GBaseInitFunc) NULL,
650 (GBaseFinalizeFunc) NULL,
651 (GClassInitFunc) class_init,
652 (GClassFinalizeFunc) NULL,
653 NULL /* class_data */,
654 sizeof (XferSourceRecovery),
656 (GInstanceInitFunc) instance_init,
660 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceRecovery", &info, 0);
667 * Public methods and stubs
671 xfer_source_recovery_start_part(
675 XferSourceRecoveryClass *klass;
676 g_assert(IS_XFER_SOURCE_RECOVERY(elt));
678 klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
679 klass->start_part(XFER_SOURCE_RECOVERY(elt), device);
682 /* create an element of this class; prototype is in xfer-device.h */
684 xfer_source_recovery(Device *first_device)
686 XferSourceRecovery *self = (XferSourceRecovery *)g_object_new(XFER_SOURCE_RECOVERY_TYPE, NULL);
687 XferElement *elt = XFER_ELEMENT(self);
689 g_assert(first_device != NULL);
690 g_object_ref(first_device);
691 self->device = first_device;
697 xfer_source_recovery_use_device(
701 XferSourceRecoveryClass *klass;
702 g_assert(IS_XFER_SOURCE_RECOVERY(elt));
704 klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
705 klass->use_device(XFER_SOURCE_RECOVERY(elt), device);
709 xfer_source_recovery_get_bytes_read(
712 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
713 guint64 bytes_read = self->bytes_read;
716 bytes_read += device_get_bytes_read(self->device);