2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
27 #include "xfer-device.h"
34 * This declaration is entirely private; nothing but xfer_source_recovery() references
38 GType xfer_source_recovery_get_type(void);
39 #define XFER_SOURCE_RECOVERY_TYPE (xfer_source_recovery_get_type())
40 #define XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery)
41 #define XFER_SOURCE_RECOVERY_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_recovery_get_type(), XferSourceRecovery const)
42 #define XFER_SOURCE_RECOVERY_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
43 #define IS_XFER_SOURCE_RECOVERY(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_recovery_get_type ())
44 #define XFER_SOURCE_RECOVERY_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_recovery_get_type(), XferSourceRecoveryClass)
46 static GObjectClass *parent_class = NULL;
49 * Main object structure
52 typedef struct XferSourceRecovery {
53 XferElement __parent__;
55 /* thread for monitoring directtcp transfers */
58 /* this mutex in this condition variable governs all variables below */
59 GCond *start_part_cond;
60 GMutex *start_part_mutex;
62 /* is this device currently paused and awaiting a new part? */
65 /* device to read from (refcounted) */
68 /* TRUE if use_device found the device unsuitable; this makes start_part
69 * a no-op, allowing the cancellation to be handled normally */
72 /* directtcp connection (only valid after XMSG_READY) */
73 DirectTCPConnection *conn;
76 /* and the block size for that device (reset to zero at the start of each
80 /* bytes read for this image */
83 /* part size (potentially including any zero-padding from the
87 /* timer for the duration; NULL while paused or cancelled */
92 GCond *abort_cond; /* condition to trigger to abort ndmp command */
100 XferElementClass __parent__;
102 /* start reading the part at which DEVICE is positioned, sending an
103 * XMSG_PART_DONE when the part has been read */
104 void (*start_part)(XferSourceRecovery *self, Device *device);
106 /* use the given device, much like the same method for xfer-dest-taper */
107 void (*use_device)(XferSourceRecovery *self, Device *device);
108 } XferSourceRecoveryClass;
114 #define DBG(LEVEL, ...) if (debug_recovery >= LEVEL) { _xsr_dbg(__VA_ARGS__); }
116 _xsr_dbg(const char *fmt, ...)
121 arglist_start(argp, fmt);
122 g_vsnprintf(msg, sizeof(msg), fmt, argp);
124 g_debug("XSR: %s", msg);
131 /* common code for both directtcp_listen_thread and directtcp_connect_thread;
132 * this is called after self->conn is filled in and carries out the data
133 * transfer over that connection. NOTE: start_part_mutex is HELD when this
136 directtcp_common_thread(
137 XferSourceRecovery *self)
139 XferElement *elt = XFER_ELEMENT(self);
143 /* send XMSG_READY to indicate it's OK to call start_part now */
144 DBG(2, "sending XMSG_READY");
145 xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
147 /* now we sit around waiting for signals to write a part */
152 while (self->paused && !elt->cancelled) {
153 DBG(9, "waiting to be un-paused");
154 g_cond_wait(self->start_part_cond, self->start_part_mutex);
156 DBG(9, "done waiting");
158 if (elt->cancelled) {
159 g_mutex_unlock(self->start_part_mutex);
160 goto close_conn_and_send_done;
163 /* if the device is NULL, we're done */
168 self->part_timer = g_timer_new();
171 DBG(2, "reading part from %s", self->device->device_name);
172 result = device_read_to_connection(self->device, G_MAXUINT64,
173 &actual_size, &elt->cancelled,
174 self->start_part_mutex, self->abort_cond);
175 if (result == 1 && !elt->cancelled) {
176 xfer_cancel_with_error(elt, _("error reading from device: %s"),
177 device_error_or_status(self->device));
178 g_mutex_unlock(self->start_part_mutex);
179 goto close_conn_and_send_done;
180 } else if (result == 2 || elt->cancelled) {
181 g_mutex_unlock(self->start_part_mutex);
182 goto close_conn_and_send_done;
185 /* break on EOF; otherwise do another read_to_connection */
186 if (self->device->is_eof) {
190 DBG(2, "done reading part; sending XMSG_PART_DONE");
192 /* the device has signalled EOF (really end-of-part), so clean up instance
193 * variables and report the EOP to the caller in the form of an xmsg */
194 msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
195 msg->size = actual_size;
196 msg->duration = g_timer_elapsed(self->part_timer, NULL);
198 msg->fileno = self->device->file;
199 msg->successful = TRUE;
203 g_object_unref(self->device);
206 self->block_size = 0;
207 g_timer_destroy(self->part_timer);
208 self->part_timer = NULL;
210 xfer_queue_message(elt->xfer, msg);
212 g_mutex_unlock(self->start_part_mutex);
214 close_conn_and_send_done:
216 errmsg = directtcp_connection_close(self->conn);
217 g_object_unref(self->conn);
220 xfer_cancel_with_error(elt, _("error closing DirectTCP connection: %s"), errmsg);
221 wait_until_xfer_cancelled(elt->xfer);
225 xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
231 directtcp_connect_thread(
234 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
235 XferElement *elt = XFER_ELEMENT(self);
238 DBG(1, "(this is directtcp_connect_thread)")
240 /* first, we need to accept the incoming connection; we do this while
241 * holding the start_part_mutex, so that a part doesn't get started until
242 * we're finished with the device */
243 g_mutex_lock(self->start_part_mutex);
245 if (elt->cancelled) {
246 g_mutex_unlock(self->start_part_mutex);
250 g_assert(self->device != NULL); /* have a device */
251 g_assert(elt->output_listen_addrs != NULL); /* listening on it */
252 g_assert(self->listen_ok);
254 DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
255 result = device_accept(self->device, &self->conn, &elt->cancelled,
256 self->start_part_mutex, self->abort_cond);
257 if (result == 1 && !elt->cancelled) {
258 xfer_cancel_with_error(elt,
259 _("error accepting DirectTCP connection: %s"),
260 device_error_or_status(self->device));
261 g_mutex_unlock(self->start_part_mutex);
262 wait_until_xfer_cancelled(elt->xfer);
264 } else if (result == 2 || elt->cancelled) {
265 g_mutex_unlock(self->start_part_mutex);
268 DBG(2, "DirectTCP connection accepted");
270 return directtcp_common_thread(self);
273 xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
278 directtcp_listen_thread(
281 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(data);
282 XferElement *elt = XFER_ELEMENT(self);
285 DBG(1, "(this is directtcp_listen_thread)");
287 /* we need to make an outgoing connection to downstream; we do this while
288 * holding the start_part_mutex, so that a part doesn't get started until
289 * we're finished with the device */
290 g_mutex_lock(self->start_part_mutex);
292 if (elt->cancelled) {
293 g_mutex_unlock(self->start_part_mutex);
297 g_assert(self->device != NULL); /* have a device */
298 g_assert(elt->downstream->input_listen_addrs != NULL); /* downstream listening */
300 DBG(2, "making DirectTCP connection on device %s", self->device->device_name);
301 result = device_connect(self->device, FALSE,
302 elt->downstream->input_listen_addrs,
303 &self->conn, &elt->cancelled,
304 self->start_part_mutex, self->abort_cond);
305 if (result == 1 && !elt->cancelled) {
306 xfer_cancel_with_error(elt,
307 _("error making DirectTCP connection: %s"),
308 device_error_or_status(self->device));
309 g_mutex_unlock(self->start_part_mutex);
310 wait_until_xfer_cancelled(elt->xfer);
312 } else if (result == 2 || elt->cancelled) {
313 g_mutex_unlock(self->start_part_mutex);
314 wait_until_xfer_cancelled(elt->xfer);
317 DBG(2, "DirectTCP connect succeeded");
319 return directtcp_common_thread(self);
322 xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_DONE, 0));
330 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
332 if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
333 g_assert(self->device != NULL);
334 DBG(2, "listening for DirectTCP connection on device %s", self->device->device_name);
335 if (!device_listen(self->device, FALSE, &elt->output_listen_addrs)) {
336 xfer_cancel_with_error(elt,
337 _("error listening for DirectTCP connection: %s"),
338 device_error_or_status(self->device));
341 self->listen_ok = TRUE;
343 /* no output_listen_addrs for either XFER_MECH_DIRECTTCP_LISTEN or
344 * XFER_MECH_PULL_BUFFER */
345 elt->output_listen_addrs = NULL;
355 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
357 if (elt->output_mech == XFER_MECH_DIRECTTCP_CONNECT) {
358 g_assert(elt->output_listen_addrs != NULL);
359 self->thread = g_thread_create(directtcp_connect_thread, (gpointer)self, FALSE, NULL);
360 return TRUE; /* we'll send XMSG_DONE */
361 } else if (elt->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
362 g_assert(elt->output_listen_addrs == NULL);
363 self->thread = g_thread_create(directtcp_listen_thread, (gpointer)self, FALSE, NULL);
364 return TRUE; /* we'll send XMSG_DONE */
366 /* nothing to prepare for - we're ready already! */
367 DBG(2, "not using DirectTCP: sending XMSG_READY immediately");
368 xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
370 return FALSE; /* we won't send XMSG_DONE */
379 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
385 g_assert(elt->output_mech == XFER_MECH_PULL_BUFFER);
386 g_mutex_lock(self->start_part_mutex);
389 /* make sure we have a device */
390 while (self->paused && !elt->cancelled)
391 g_cond_wait(self->start_part_cond, self->start_part_mutex);
393 /* indicate EOF on an cancel or when there are no more parts */
394 if (elt->cancelled || !self->device) {
398 /* start the timer if this is the first pull_buffer of this part */
399 if (!self->part_timer) {
400 DBG(2, "first pull_buffer of new part");
401 self->part_timer = g_timer_new();
404 /* loop until we read a full block, in case the blocks are larger than
406 if (self->block_size == 0)
407 self->block_size = (size_t)self->device->block_size;
410 buf = g_malloc(self->block_size);
411 devsize = (int)self->block_size;
412 result = device_read_block(self->device, buf, &devsize);
416 g_assert(*size > self->block_size);
417 self->block_size = devsize;
420 } while (result == 0);
422 /* if this block was successful, return it */
424 self->part_size += *size;
431 /* if we're not at EOF, it's an error */
432 if (!self->device->is_eof) {
433 xfer_cancel_with_error(elt,
434 _("error reading from %s: %s"),
435 self->device->device_name,
436 device_error_or_status(self->device));
437 g_mutex_unlock(self->start_part_mutex);
438 wait_until_xfer_cancelled(elt->xfer);
442 /* the device has signalled EOF (really end-of-part), so clean up instance
443 * variables and report the EOP to the caller in the form of an xmsg */
444 DBG(2, "pull_buffer hit EOF; sending XMSG_PART_DONE");
445 msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
446 msg->size = self->part_size;
447 msg->duration = g_timer_elapsed(self->part_timer, NULL);
449 msg->fileno = self->device->file;
450 msg->successful = TRUE;
454 g_object_unref(self->device);
456 self->bytes_read += self->part_size;
458 self->block_size = 0;
459 if (self->part_timer) {
460 g_timer_destroy(self->part_timer);
461 self->part_timer = NULL;
464 /* don't queue the XMSG_PART_DONE until we've adjusted all of our
465 * instance variables appropriately */
466 xfer_queue_message(elt->xfer, msg);
470 g_mutex_unlock(self->start_part_mutex);
473 /* initialize on first pass */
475 self->size = elt->size;
477 if (self->size == -1) {
483 if (*size > (guint64)self->size) {
484 /* return only self->size bytes */
494 g_mutex_unlock(self->start_part_mutex);
503 gboolean expect_eof G_GNUC_UNUSED)
505 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
506 elt->cancelled = TRUE;
508 /* trigger the condition variable, in case the thread is waiting on it */
509 g_mutex_lock(self->start_part_mutex);
510 g_cond_broadcast(self->start_part_cond);
511 g_cond_broadcast(self->abort_cond);
512 g_mutex_unlock(self->start_part_mutex);
519 XferSourceRecovery *self,
522 g_assert(!device || device->in_file);
524 DBG(2, "start_part called");
526 if (self->device_bad) {
527 /* use_device didn't like the device it got, but the xfer cancellation
528 * has not completed yet, so do nothing */
532 g_mutex_lock(self->start_part_mutex);
534 /* make sure we're ready to go */
535 g_assert(self->paused);
536 if (XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_CONNECT
537 || XFER_ELEMENT(self)->output_mech == XFER_MECH_DIRECTTCP_LISTEN) {
538 g_assert(self->conn != NULL);
541 /* if we already have a device, it should have been given to use_device */
542 if (device && self->device)
543 g_assert(self->device == device);
546 g_object_unref(self->device);
548 g_object_ref(device);
549 self->device = device;
551 self->paused = FALSE;
553 DBG(2, "triggering condition variable");
554 g_cond_broadcast(self->start_part_cond);
555 g_mutex_unlock(self->start_part_mutex);
560 XferSourceRecovery *xdtself,
563 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(xdtself);
565 g_assert(self->paused);
567 /* short-circuit if nothing is changing */
568 if (self->device == device)
572 g_object_unref(self->device);
575 /* if we already have a connection, then make this device use it */
577 if (!device_use_connection(device, self->conn)) {
578 /* queue up an error for later, and set device_bad.
579 * start_part will see this and fail silently */
580 self->device_bad = TRUE;
581 xfer_cancel_with_error(XFER_ELEMENT(self),
582 _("Cannot continue onto new volume: %s"),
583 device_error_or_status(device));
588 self->device = device;
589 g_object_ref(device);
592 static xfer_element_mech_pair_t *
596 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
597 static xfer_element_mech_pair_t basic_mech_pairs[] = {
598 { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
599 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
601 static xfer_element_mech_pair_t directtcp_mech_pairs[] = {
602 { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_CONNECT, 0, 1},
603 { XFER_MECH_NONE, XFER_MECH_DIRECTTCP_LISTEN, 0, 1},
604 /* devices which support DirectTCP are usually not very efficient
605 * at delivering data via device_read_block, so this counts an extra
606 * byte operation in the cost metrics (2 here vs. 1 in basic_mech_pairs).
607 * This is a hack, but it will do for now. */
608 { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 2, 0},
609 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
612 return device_directtcp_supported(self->device)?
613 directtcp_mech_pairs : basic_mech_pairs;
620 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(obj_self);
623 g_object_unref(self->conn);
625 g_object_unref(self->device);
627 g_cond_free(self->start_part_cond);
628 g_cond_free(self->abort_cond);
629 g_mutex_free(self->start_part_mutex);
636 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
639 self->start_part_cond = g_cond_new();
640 self->abort_cond = g_cond_new();
641 self->start_part_mutex = g_mutex_new();
646 XferSourceRecoveryClass * xsr_klass)
648 XferElementClass *klass = XFER_ELEMENT_CLASS(xsr_klass);
649 GObjectClass *gobject_klass = G_OBJECT_CLASS(xsr_klass);
651 klass->pull_buffer = pull_buffer_impl;
652 klass->cancel = cancel_impl;
653 klass->start = start_impl;
654 klass->setup = setup_impl;
655 klass->get_mech_pairs = get_mech_pairs_impl;
657 klass->perl_class = "Amanda::Xfer::Source::Recovery";
658 klass->mech_pairs = NULL; /* see get_mech_pairs_impl, above */
660 xsr_klass->start_part = start_part_impl;
661 xsr_klass->use_device = use_device_impl;
663 gobject_klass->finalize = finalize_impl;
665 parent_class = g_type_class_peek_parent(xsr_klass);
669 xfer_source_recovery_get_type (void)
671 static GType type = 0;
673 if G_UNLIKELY(type == 0) {
674 static const GTypeInfo info = {
675 sizeof (XferSourceRecoveryClass),
676 (GBaseInitFunc) NULL,
677 (GBaseFinalizeFunc) NULL,
678 (GClassInitFunc) class_init,
679 (GClassFinalizeFunc) NULL,
680 NULL /* class_data */,
681 sizeof (XferSourceRecovery),
683 (GInstanceInitFunc) instance_init,
687 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceRecovery", &info, 0);
694 * Public methods and stubs
698 xfer_source_recovery_start_part(
702 XferSourceRecoveryClass *klass;
703 g_assert(IS_XFER_SOURCE_RECOVERY(elt));
705 klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
706 klass->start_part(XFER_SOURCE_RECOVERY(elt), device);
709 /* create an element of this class; prototype is in xfer-device.h */
711 xfer_source_recovery(Device *first_device)
713 XferSourceRecovery *self = (XferSourceRecovery *)g_object_new(XFER_SOURCE_RECOVERY_TYPE, NULL);
714 XferElement *elt = XFER_ELEMENT(self);
716 g_assert(first_device != NULL);
717 g_object_ref(first_device);
718 self->device = first_device;
724 xfer_source_recovery_use_device(
728 XferSourceRecoveryClass *klass;
729 g_assert(IS_XFER_SOURCE_RECOVERY(elt));
731 klass = XFER_SOURCE_RECOVERY_GET_CLASS(elt);
732 klass->use_device(XFER_SOURCE_RECOVERY(elt), device);
736 xfer_source_recovery_get_bytes_read(
739 XferSourceRecovery *self = XFER_SOURCE_RECOVERY(elt);
740 guint64 bytes_read = self->bytes_read;
743 bytes_read += device_get_bytes_read(self->device);