2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
25 #include "xfer-device.h"
29 /* A transfer destination that writes and entire dumpfile to one or more files
30 * on one or more devices via DirectTCP, handling the work of spanning a
31 * directtcp connection over multiple devices. Note that this assumes the
32 * devices support early EOM warning. */
35 * Xfer Dest Taper DirectTCP
38 static GType xfer_dest_taper_directtcp_get_type(void);
39 #define XFER_DEST_TAPER_DIRECTTCP_TYPE (xfer_dest_taper_directtcp_get_type())
40 #define XFER_DEST_TAPER_DIRECTTCP(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCP)
41 #define XFER_DEST_TAPER_DIRECTTCP_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCP const)
42 #define XFER_DEST_TAPER_DIRECTTCP_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCPClass)
43 #define IS_XFER_DEST_TAPER_DIRECTTCP(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_directtcp_get_type ())
44 #define XFER_DEST_TAPER_DIRECTTCP_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCPClass)
46 static GObjectClass *parent_class = NULL;
48 typedef struct XferDestTaperDirectTCP {
49 XferDestTaper __parent__;
51 /* constructor parameters */
52 guint64 part_size; /* (bytes) */
55 GThread *worker_thread;
57 /* state (governs everything below) */
61 Device *volatile device; /* device to write to (refcounted) */
62 dumpfile_t *volatile part_header;
64 /* did the device listen proceed without error? */
67 /* part number in progress */
68 volatile guint64 partnum;
70 /* connection we're writing to (refcounted) */
71 DirectTCPConnection *conn;
73 /* is the element paused, waiting to start a new part? this is set to FALSE
74 * by the main thread to start a part, and the worker thread waits on the
75 * corresponding condition variable. */
76 volatile gboolean paused;
78 GCond *abort_cond; /* condition to trigger to abort an NDMP command */
80 } XferDestTaperDirectTCP;
83 XferDestTaperClass __parent__;
84 } XferDestTaperDirectTCPClass;
90 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
92 _xdt_dbg(const char *fmt, ...)
97 arglist_start(argp, fmt);
98 g_vsnprintf(msg, sizeof(msg), fmt, argp);
100 g_debug("XDT: %s", msg);
110 XferElement *elt = (XferElement *)data;
111 XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)data;
112 GTimer *timer = g_timer_new();
115 /* This thread's job is to accept() an incoming connection, then call
116 * write_from_connection for each part, and then close the connection */
118 /* If the device_listen failed, then we will soon be cancelled, so wait
119 * for that to occur and then send XMSG_DONE */
120 if (!self->listen_ok) {
121 DBG(2, "listen failed; waiting for cancellation without attempting an accept");
122 wait_until_xfer_cancelled(elt->xfer);
126 g_mutex_lock(self->state_mutex);
128 /* first, accept a new connection from the device */
129 DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
130 result = device_accept(self->device, &self->conn, &elt->cancelled,
131 self->state_mutex, self->abort_cond);
132 if (result == 1 && !elt->cancelled) {
133 xfer_cancel_with_error(XFER_ELEMENT(self),
134 "accepting DirectTCP connection: %s",
135 device_error_or_status(self->device));
136 g_mutex_unlock(self->state_mutex);
137 wait_until_xfer_cancelled(elt->xfer);
139 } else if (result == 2 || elt->cancelled) {
140 g_mutex_unlock(self->state_mutex);
141 wait_until_xfer_cancelled(elt->xfer);
145 DBG(2, "connection accepted; sending XMSG_READY");
146 xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
148 /* round the part size up to the next multiple of the block size */
149 if (self->part_size) {
150 self->part_size += self->device->block_size-1;
151 self->part_size -= self->part_size % self->device->block_size;
154 /* now loop until we're out of parts */
161 /* wait to be un-paused */
162 while (!elt->cancelled && self->paused) {
163 DBG(9, "waiting to be un-paused");
164 g_cond_wait(self->paused_cond, self->state_mutex);
166 DBG(9, "done waiting");
171 DBG(2, "writing part to %s", self->device->device_name);
172 if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
173 /* this is not fatal to the transfer, since no data was lost. We
174 * just need a new device. The scribe special-cases 0-byte parts, and will
175 * not record this in the catalog. */
178 dumpfile_free(self->part_header);
179 self->part_header = NULL;
184 dumpfile_free(self->part_header);
185 self->part_header = NULL;
187 fileno = self->device->file;
188 g_assert(fileno > 0);
191 g_timer_start(timer);
192 result = device_write_from_connection(self->device,
193 self->part_size, &size, &elt->cancelled,
194 self->state_mutex, self->abort_cond);
195 if (result == 1 && !elt->cancelled) {
196 /* even if this is just a physical EOM, we may have lost data, so
197 * the whole transfer is dead. */
198 xfer_cancel_with_error(XFER_ELEMENT(self),
199 "Error writing from DirectTCP connection: %s",
200 device_error_or_status(self->device));
202 } else if (result == 2 || elt->cancelled) {
207 eom = self->device->is_eom;
208 eof = self->device->is_eof;
210 /* finish the file, even if we're at EOM, but if this fails then we may
212 if (!device_finish_file(self->device)) {
213 xfer_cancel_with_error(XFER_ELEMENT(self),
214 "Error finishing tape file: %s",
215 device_error_or_status(self->device));
219 /* if we wrote zero bytes and reached EOM, then this is an empty part */
220 if (eom && !eof && size == 0) {
224 msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
226 msg->duration = g_timer_elapsed(timer, NULL);
227 msg->partnum = self->partnum;
228 msg->fileno = fileno;
229 msg->successful = TRUE;
233 /* time runs backward on some test boxes, so make sure this is positive */
234 if (msg->duration < 0) msg->duration = 0;
236 xfer_queue_message(elt->xfer, msg);
240 /* we're done at EOF */
244 /* wait to be unpaused again */
249 msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
254 msg->successful = TRUE;
257 xfer_queue_message(elt->xfer, msg);
259 /* wait to be unpaused again */
264 /* drop the mutex and wait until all elements have been cancelled
265 * before closing the connection */
266 g_mutex_unlock(self->state_mutex);
267 wait_until_xfer_cancelled(elt->xfer);
268 g_mutex_lock(self->state_mutex);
272 /* close the DirectTCP connection */
273 directtcp_connection_close(self->conn);
274 g_object_unref(self->conn);
277 g_mutex_unlock(self->state_mutex);
278 g_timer_destroy(timer);
281 xfer_queue_message(elt->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
294 XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)elt;
296 /* start the device listening, and get the addresses */
297 if (!device_listen(self->device, TRUE, &elt->input_listen_addrs)) {
298 elt->input_listen_addrs = NULL;
299 xfer_cancel_with_error(XFER_ELEMENT(self),
300 "Error starting DirectTCP listen: %s",
301 device_error_or_status(self->device));
302 self->listen_ok = FALSE;
306 self->listen_ok = TRUE;
314 XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)elt;
315 GError *error = NULL;
319 /* start up the thread */
320 self->worker_thread = g_thread_create(worker_thread, (gpointer)self, TRUE, &error);
321 if (!self->worker_thread) {
322 g_critical(_("Error creating new thread: %s (%s)"),
323 error->message, errno? strerror(errno) : _("no error code"));
334 XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(elt);
338 rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
340 /* signal all of the condition variables to realize that we're no
342 g_mutex_lock(self->state_mutex);
343 g_cond_broadcast(self->paused_cond);
344 g_cond_broadcast(self->abort_cond);
345 g_mutex_unlock(self->state_mutex);
352 XferDestTaper *xdtself,
356 XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(xdtself);
358 /* the only way self->device can become NULL is if use_device fails, in
359 * which case an error is already queued up, so just return silently */
360 if (self->device == NULL)
363 g_assert(!self->device->in_file);
364 g_assert(header != NULL);
366 DBG(1, "start_part(retry_part=%d)", retry_part);
368 g_mutex_lock(self->state_mutex);
369 g_assert(self->paused);
371 if (self->part_header)
372 dumpfile_free(self->part_header);
373 self->part_header = dumpfile_copy(header);
376 self->paused = FALSE;
377 g_cond_broadcast(self->paused_cond);
379 g_mutex_unlock(self->state_mutex);
384 XferDestTaper *xdtself,
387 XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(xdtself);
389 /* short-circuit if nothing is changing */
390 if (self->device == device)
393 g_mutex_lock(self->state_mutex);
396 g_object_unref(self->device);
399 /* if we already have a connection, then make this device use it */
401 if (!device_use_connection(device, self->conn)) {
402 /* queue up an error for later, and leave the device NULL.
403 * start_part will see this and fail silently */
404 xfer_cancel_with_error(XFER_ELEMENT(self),
405 _("Failed part was not cached; cannot retry"));
410 self->device = device;
411 g_object_ref(device);
413 g_mutex_unlock(self->state_mutex);
417 get_part_bytes_written_impl(
418 XferDestTaper *xdtself G_GNUC_UNUSED)
420 /* This operation is not supported for this taper dest. Maybe someday. */
428 XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(elt);
429 elt->can_generate_eof = FALSE;
431 self->worker_thread = NULL;
434 self->state_mutex = g_mutex_new();
435 self->paused_cond = g_cond_new();
436 self->abort_cond = g_cond_new();
443 XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(obj_self);
446 g_object_unref(self->conn);
450 g_object_unref(self->device);
454 g_object_unref(self->device);
457 g_mutex_free(self->state_mutex);
458 g_cond_free(self->paused_cond);
459 g_cond_free(self->abort_cond);
461 if (self->part_header)
462 dumpfile_free(self->part_header);
463 self->part_header = NULL;
466 G_OBJECT_CLASS(parent_class)->finalize(obj_self);
471 XferDestTaperDirectTCPClass * selfc)
473 XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
474 XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
475 GObjectClass *goc = G_OBJECT_CLASS(selfc);
476 static xfer_element_mech_pair_t mech_pairs[] = {
477 { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 0, 0},
478 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
481 klass->start = start_impl;
482 klass->setup = setup_impl;
483 klass->cancel = cancel_impl;
484 xdt_klass->start_part = start_part_impl;
485 xdt_klass->use_device = use_device_impl;
486 xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
487 goc->finalize = finalize_impl;
489 klass->perl_class = "Amanda::Xfer::Dest::Taper::DirectTCP";
490 klass->mech_pairs = mech_pairs;
492 parent_class = g_type_class_peek_parent(selfc);
496 xfer_dest_taper_directtcp_get_type (void)
498 static GType type = 0;
500 if G_UNLIKELY(type == 0) {
501 static const GTypeInfo info = {
502 sizeof (XferDestTaperDirectTCPClass),
503 (GBaseInitFunc) NULL,
504 (GBaseFinalizeFunc) NULL,
505 (GClassInitFunc) class_init,
506 (GClassFinalizeFunc) NULL,
507 NULL /* class_data */,
508 sizeof (XferDestTaperDirectTCP),
510 (GInstanceInitFunc) instance_init,
514 type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperDirectTCP", &info, 0);
525 xfer_dest_taper_directtcp(Device *first_device, guint64 part_size)
527 XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)g_object_new(XFER_DEST_TAPER_DIRECTTCP_TYPE, NULL);
529 g_assert(device_directtcp_supported(first_device));
531 self->part_size = part_size;
532 self->device = first_device;
534 g_object_ref(self->device);
536 return XFER_ELEMENT(self);