2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 as published
7 * by the Free Software Foundation.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
24 #include "xfer-device.h"
28 /* A transfer destination that writes and entire dumpfile to one or more files
29 * on one or more devices via DirectTCP, handling the work of spanning a
30 * directtcp connection over multiple devices. Note that this assumes the
31 * devices support early EOM warning. */
34 * Xfer Dest Taper DirectTCP
37 static GType xfer_dest_taper_directtcp_get_type(void);
38 #define XFER_DEST_TAPER_DIRECTTCP_TYPE (xfer_dest_taper_directtcp_get_type())
39 #define XFER_DEST_TAPER_DIRECTTCP(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCP)
40 #define XFER_DEST_TAPER_DIRECTTCP_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCP const)
41 #define XFER_DEST_TAPER_DIRECTTCP_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCPClass)
42 #define IS_XFER_DEST_TAPER_DIRECTTCP(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_directtcp_get_type ())
43 #define XFER_DEST_TAPER_DIRECTTCP_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCPClass)
45 static GObjectClass *parent_class = NULL;
47 typedef struct XferDestTaperDirectTCP {
48 XferDestTaper __parent__;
50 /* constructor parameters */
51 guint64 part_size; /* (bytes) */
54 GThread *worker_thread;
56 /* state (governs everything below) */
60 Device *volatile device; /* device to write to (refcounted) */
61 dumpfile_t *volatile part_header;
63 /* did the device listen proceed without error? */
66 /* part number in progress */
67 volatile guint64 partnum;
69 /* connection we're writing to (refcounted) */
70 DirectTCPConnection *conn;
72 /* is the element paused, waiting to start a new part? this is set to FALSE
73 * by the main thread to start a part, and the worker thread waits on the
74 * corresponding condition variable. */
75 volatile gboolean paused;
78 } XferDestTaperDirectTCP;
81 XferDestTaperClass __parent__;
82 } XferDestTaperDirectTCPClass;
88 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
90 _xdt_dbg(const char *fmt, ...)
95 arglist_start(argp, fmt);
96 g_vsnprintf(msg, sizeof(msg), fmt, argp);
98 g_debug("XDT thd-%p: %s", g_thread_self(), msg);
108 XferElement *elt = (XferElement *)data;
109 XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)data;
110 GTimer *timer = g_timer_new();
112 /* This thread's job is to accept() an incoming connection, then call
113 * write_from_connection for each part, and then close the connection */
115 /* If the device_listen failed, then we will soon be cancelled, so wait
116 * for that to occur and then send XMSG_DONE */
117 if (!self->listen_ok) {
118 DBG(2, "listen failed; waiting for cancellation without attempting an accept");
119 wait_until_xfer_cancelled(elt->xfer);
123 g_mutex_lock(self->state_mutex);
125 /* first, accept a new connection from the device */
126 DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
127 if (!device_accept(self->device, &self->conn, NULL, NULL)) {
128 xfer_cancel_with_error(XFER_ELEMENT(self),
129 "accepting DirectTCP connection: %s",
130 device_error_or_status(self->device));
131 g_mutex_unlock(self->state_mutex);
135 DBG(2, "connection accepted; sending XMSG_READY");
136 xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
138 /* now loop until we're out of parts */
145 /* wait to be un-paused */
146 while (!elt->cancelled && self->paused) {
147 DBG(9, "waiting to be un-paused");
148 g_cond_wait(self->paused_cond, self->state_mutex);
150 DBG(9, "done waiting");
155 DBG(2, "writing part to %s", self->device->device_name);
156 if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
157 /* this is not fatal to the transfer, since no data was lost. We
158 * just need a new device. The scribe special-cases 0-byte parts, and will
159 * not record this in the catalog. */
162 dumpfile_free(self->part_header);
163 self->part_header = NULL;
168 dumpfile_free(self->part_header);
169 self->part_header = NULL;
171 fileno = self->device->file;
172 g_assert(fileno > 0);
175 g_timer_start(timer);
176 if (!device_write_from_connection(self->device,
177 self->part_size, &size)) {
178 /* even if this is just a physical EOM, we may have lost data, so
179 * the whole transfer is dead. */
180 xfer_cancel_with_error(XFER_ELEMENT(self),
181 "Error writing from DirectTCP connection: %s",
182 device_error_or_status(self->device));
187 eom = self->device->is_eom;
188 eof = self->device->is_eof;
190 /* finish the file, even if we're at EOM, but if this fails then we may
192 if (!device_finish_file(self->device)) {
193 xfer_cancel_with_error(XFER_ELEMENT(self),
194 "Error finishing tape file: %s",
195 device_error_or_status(self->device));
199 /* if we wrote zero bytes and reached EOM, then this is an empty part */
200 if (eom && !eof && size == 0) {
204 msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
206 msg->duration = g_timer_elapsed(timer, NULL);
207 msg->partnum = self->partnum;
208 msg->fileno = fileno;
209 msg->successful = TRUE;
212 xfer_queue_message(elt->xfer, msg);
216 /* we're done at EOF */
220 /* wait to be unpaused again */
225 msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
230 msg->successful = TRUE;
233 xfer_queue_message(elt->xfer, msg);
235 /* wait to be unpaused again */
240 /* drop the mutex and wait until all elements have been cancelled
241 * before closing the connection */
242 g_mutex_unlock(self->state_mutex);
243 wait_until_xfer_cancelled(elt->xfer);
244 g_mutex_lock(self->state_mutex);
248 /* close the DirectTCP connection */
249 directtcp_connection_close(self->conn);
250 g_object_unref(self->conn);
253 g_mutex_unlock(self->state_mutex);
254 g_timer_destroy(timer);
257 xfer_queue_message(elt->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
270 XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)elt;
272 /* start the device listening, and get the addresses */
273 if (!device_listen(self->device, TRUE, &elt->input_listen_addrs)) {
274 elt->input_listen_addrs = NULL;
275 xfer_cancel_with_error(XFER_ELEMENT(self),
276 "Error starting DirectTCP listen: %s",
277 device_error_or_status(self->device));
278 self->listen_ok = FALSE;
282 self->listen_ok = TRUE;
290 XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)elt;
291 GError *error = NULL;
295 /* start up the thread */
296 self->worker_thread = g_thread_create(worker_thread, (gpointer)self, TRUE, &error);
297 if (!self->worker_thread) {
298 g_critical(_("Error creating new thread: %s (%s)"),
299 error->message, errno? strerror(errno) : _("no error code"));
310 XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(elt);
314 rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
316 /* signal all of the condition variables to realize that we're no
318 g_mutex_lock(self->state_mutex);
319 g_cond_broadcast(self->paused_cond);
320 g_mutex_unlock(self->state_mutex);
327 XferDestTaper *xdtself,
331 XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(xdtself);
333 /* the only way self->device can become NULL is if use_device fails, in
334 * which case an error is already queued up, so just return silently */
335 if (self->device == NULL)
338 g_assert(!self->device->in_file);
339 g_assert(header != NULL);
341 DBG(1, "start_part(retry_part=%d)", retry_part);
343 g_mutex_lock(self->state_mutex);
344 g_assert(self->paused);
346 if (self->part_header)
347 dumpfile_free(self->part_header);
348 self->part_header = dumpfile_copy(header);
351 self->paused = FALSE;
352 g_cond_broadcast(self->paused_cond);
354 g_mutex_unlock(self->state_mutex);
359 XferDestTaper *xdtself,
362 XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(xdtself);
364 /* short-circuit if nothing is changing */
365 if (self->device == device)
368 g_mutex_lock(self->state_mutex);
371 g_object_unref(self->device);
374 /* if we already have a connection, then make this device use it */
376 if (!device_use_connection(device, self->conn)) {
377 /* queue up an error for later, and leave the device NULL.
378 * start_part will see this and fail silently */
379 xfer_cancel_with_error(XFER_ELEMENT(self),
380 _("Failed part was not cached; cannot retry"));
385 self->device = device;
386 g_object_ref(device);
388 g_mutex_unlock(self->state_mutex);
393 XferDestTaper *xdtself G_GNUC_UNUSED,
394 const char *filename G_GNUC_UNUSED,
395 off_t offset G_GNUC_UNUSED,
396 off_t length G_GNUC_UNUSED)
402 get_part_bytes_written_impl(
403 XferDestTaper *xdtself G_GNUC_UNUSED)
405 /* This operation is not supported for this taper dest. Maybe someday. */
413 XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(elt);
414 elt->can_generate_eof = FALSE;
416 self->worker_thread = NULL;
419 self->state_mutex = g_mutex_new();
420 self->paused_cond = g_cond_new();
427 XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(obj_self);
430 g_object_unref(self->conn);
434 g_object_unref(self->device);
438 g_object_unref(self->device);
441 g_mutex_free(self->state_mutex);
442 g_cond_free(self->paused_cond);
444 if (self->part_header)
445 dumpfile_free(self->part_header);
446 self->part_header = NULL;
449 G_OBJECT_CLASS(parent_class)->finalize(obj_self);
454 XferDestTaperDirectTCPClass * selfc)
456 XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
457 XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
458 GObjectClass *goc = G_OBJECT_CLASS(selfc);
459 static xfer_element_mech_pair_t mech_pairs[] = {
460 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 0, 0},
461 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
464 klass->start = start_impl;
465 klass->setup = setup_impl;
466 klass->cancel = cancel_impl;
467 xdt_klass->start_part = start_part_impl;
468 xdt_klass->use_device = use_device_impl;
469 xdt_klass->cache_inform = cache_inform_impl;
470 xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
471 goc->finalize = finalize_impl;
473 klass->perl_class = "Amanda::Xfer::Dest::Taper::DirectTCP";
474 klass->mech_pairs = mech_pairs;
476 parent_class = g_type_class_peek_parent(selfc);
480 xfer_dest_taper_directtcp_get_type (void)
482 static GType type = 0;
484 if G_UNLIKELY(type == 0) {
485 static const GTypeInfo info = {
486 sizeof (XferDestTaperDirectTCPClass),
487 (GBaseInitFunc) NULL,
488 (GBaseFinalizeFunc) NULL,
489 (GClassInitFunc) class_init,
490 (GClassFinalizeFunc) NULL,
491 NULL /* class_data */,
492 sizeof (XferDestTaperDirectTCP),
494 (GInstanceInitFunc) instance_init,
498 type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperDirectTCP", &info, 0);
509 xfer_dest_taper_directtcp(Device *first_device, guint64 part_size)
511 XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)g_object_new(XFER_DEST_TAPER_DIRECTTCP_TYPE, NULL);
513 g_assert(device_directtcp_supported(first_device));
515 self->part_size = part_size;
516 self->device = first_device;
518 g_object_ref(self->device);
520 return XFER_ELEMENT(self);