Imported Upstream version 3.3.3
[debian/amanda] / device-src / xfer-dest-taper-directtcp.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18  *
19  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21  */
22
23 #include "amanda.h"
24 #include "amxfer.h"
25 #include "xfer-device.h"
26 #include "arglist.h"
27 #include "conffile.h"
28
29 /* A transfer destination that writes and entire dumpfile to one or more files
30  * on one or more devices via DirectTCP, handling the work of spanning a
31  * directtcp connection over multiple devices.  Note that this assumes the
32  * devices support early EOM warning. */
33
34 /*
35  * Xfer Dest Taper DirectTCP
36  */
37
38 static GType xfer_dest_taper_directtcp_get_type(void);
39 #define XFER_DEST_TAPER_DIRECTTCP_TYPE (xfer_dest_taper_directtcp_get_type())
40 #define XFER_DEST_TAPER_DIRECTTCP(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCP)
41 #define XFER_DEST_TAPER_DIRECTTCP_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCP const)
42 #define XFER_DEST_TAPER_DIRECTTCP_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCPClass)
43 #define IS_XFER_DEST_TAPER_DIRECTTCP(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_directtcp_get_type ())
44 #define XFER_DEST_TAPER_DIRECTTCP_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCPClass)
45
46 static GObjectClass *parent_class = NULL;
47
48 typedef struct XferDestTaperDirectTCP {
49     XferDestTaper __parent__;
50
51     /* constructor parameters */
52     guint64 part_size; /* (bytes) */
53
54     /* thread */
55     GThread *worker_thread;
56
57     /* state (governs everything below) */
58     GMutex *state_mutex;
59
60     /* part parameters */
61     Device *volatile device; /* device to write to (refcounted) */
62     dumpfile_t *volatile part_header;
63
64     /* did the device listen proceed without error? */
65     gboolean listen_ok;
66
67     /* part number in progress */
68     volatile guint64 partnum;
69
70     /* connection we're writing to (refcounted) */
71     DirectTCPConnection *conn;
72
73     /* is the element paused, waiting to start a new part? this is set to FALSE
74      * by the main thread to start a part, and the worker thread waits on the
75      * corresponding condition variable. */
76     volatile gboolean paused;
77     GCond *paused_cond;
78     GCond *abort_cond; /* condition to trigger to abort an NDMP command */
79
80 } XferDestTaperDirectTCP;
81
82 typedef struct {
83     XferDestTaperClass __parent__;
84 } XferDestTaperDirectTCPClass;
85
86 /*
87  * Debug logging
88  */
89
90 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
91 static void
92 _xdt_dbg(const char *fmt, ...)
93 {
94     va_list argp;
95     char msg[1024];
96
97     arglist_start(argp, fmt);
98     g_vsnprintf(msg, sizeof(msg), fmt, argp);
99     arglist_end(argp);
100     g_debug("XDT: %s", msg);
101 }
102 /*
103  * Worker Thread
104  */
105
106 static gpointer
107 worker_thread(
108     gpointer data)
109 {
110     XferElement *elt = (XferElement *)data;
111     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)data;
112     GTimer *timer = g_timer_new();
113     int result;
114
115     /* This thread's job is to accept() an incoming connection, then call
116      * write_from_connection for each part, and then close the connection */
117
118     /* If the device_listen failed, then we will soon be cancelled, so wait
119      * for that to occur and then send XMSG_DONE */
120     if (!self->listen_ok) {
121         DBG(2, "listen failed; waiting for cancellation without attempting an accept");
122         wait_until_xfer_cancelled(elt->xfer);
123         goto send_xmsg_done;
124     }
125
126     g_mutex_lock(self->state_mutex);
127
128     /* first, accept a new connection from the device */
129     DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
130     result = device_accept(self->device, &self->conn, &elt->cancelled,
131                            self->state_mutex, self->abort_cond);
132     if (result == 1 && !elt->cancelled) {
133         xfer_cancel_with_error(XFER_ELEMENT(self),
134             "accepting DirectTCP connection: %s",
135             device_error_or_status(self->device));
136         g_mutex_unlock(self->state_mutex);
137         wait_until_xfer_cancelled(elt->xfer);
138         goto send_xmsg_done;
139     } else if (result == 2 || elt->cancelled) {
140         g_mutex_unlock(self->state_mutex);
141         wait_until_xfer_cancelled(elt->xfer);
142         goto send_xmsg_done;
143     }
144
145     DBG(2, "connection accepted; sending XMSG_READY");
146     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
147
148     /* round the part size up to the next multiple of the block size */
149     if (self->part_size) {
150         self->part_size += self->device->block_size-1;
151         self->part_size -= self->part_size % self->device->block_size;
152     }
153
154     /* now loop until we're out of parts */
155     while (1) {
156         guint64 size;
157         int fileno;
158         XMsg *msg = NULL;
159         gboolean eom, eof;
160
161         /* wait to be un-paused */
162         while (!elt->cancelled && self->paused) {
163             DBG(9, "waiting to be un-paused");
164             g_cond_wait(self->paused_cond, self->state_mutex);
165         }
166         DBG(9, "done waiting");
167
168         if (elt->cancelled)
169             break;
170
171         DBG(2, "writing part to %s", self->device->device_name);
172         if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
173             /* this is not fatal to the transfer, since no data was lost.  We
174              * just need a new device.  The scribe special-cases 0-byte parts, and will
175              * not record this in the catalog. */
176
177             /* clean up */
178             dumpfile_free(self->part_header);
179             self->part_header = NULL;
180
181             goto empty_part;
182         }
183
184         dumpfile_free(self->part_header);
185         self->part_header = NULL;
186
187         fileno = self->device->file;
188         g_assert(fileno > 0);
189
190         /* write the part */
191         g_timer_start(timer);
192         result = device_write_from_connection(self->device,
193                 self->part_size, &size, &elt->cancelled,
194                 self->state_mutex, self->abort_cond);
195         if (result == 1 && !elt->cancelled) {
196             /* even if this is just a physical EOM, we may have lost data, so
197              * the whole transfer is dead. */
198             xfer_cancel_with_error(XFER_ELEMENT(self),
199                 "Error writing from DirectTCP connection: %s",
200                 device_error_or_status(self->device));
201             goto cancelled;
202         } else if (result == 2 || elt->cancelled) {
203             goto cancelled;
204         }
205         g_timer_stop(timer);
206
207         eom = self->device->is_eom;
208         eof = self->device->is_eof;
209
210         /* finish the file, even if we're at EOM, but if this fails then we may
211          * have lost data */
212         if (!device_finish_file(self->device)) {
213             xfer_cancel_with_error(XFER_ELEMENT(self),
214                 "Error finishing tape file: %s",
215                 device_error_or_status(self->device));
216             goto cancelled;
217         }
218
219         /* if we wrote zero bytes and reached EOM, then this is an empty part */
220         if (eom && !eof && size == 0) {
221             goto empty_part;
222         }
223
224         msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
225         msg->size = size;
226         msg->duration = g_timer_elapsed(timer, NULL);
227         msg->partnum = self->partnum;
228         msg->fileno = fileno;
229         msg->successful = TRUE;
230         msg->eom = eom;
231         msg->eof = eof;
232
233         /* time runs backward on some test boxes, so make sure this is positive */
234         if (msg->duration < 0) msg->duration = 0;
235
236         xfer_queue_message(elt->xfer, msg);
237
238         self->partnum++;
239
240         /* we're done at EOF */
241         if (eof)
242             break;
243
244         /* wait to be unpaused again */
245         self->paused = TRUE;
246         continue;
247
248 empty_part:
249         msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
250         msg->size = 0;
251         msg->duration = 0;
252         msg->partnum = 0;
253         msg->fileno = 0;
254         msg->successful = TRUE;
255         msg->eom = TRUE;
256         msg->eof = FALSE;
257         xfer_queue_message(elt->xfer, msg);
258
259         /* wait to be unpaused again */
260         self->paused = TRUE;
261         continue;
262
263 cancelled:
264         /* drop the mutex and wait until all elements have been cancelled
265          * before closing the connection */
266         g_mutex_unlock(self->state_mutex);
267         wait_until_xfer_cancelled(elt->xfer);
268         g_mutex_lock(self->state_mutex);
269         break;
270     }
271
272     /* close the DirectTCP connection */
273     directtcp_connection_close(self->conn);
274     g_object_unref(self->conn);
275     self->conn = NULL;
276
277     g_mutex_unlock(self->state_mutex);
278     g_timer_destroy(timer);
279
280 send_xmsg_done:
281     xfer_queue_message(elt->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
282
283     return NULL;
284 }
285
286 /*
287  * Element mechanics
288  */
289
290 static gboolean
291 setup_impl(
292     XferElement *elt)
293 {
294     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)elt;
295
296     /* start the device listening, and get the addresses */
297     if (!device_listen(self->device, TRUE, &elt->input_listen_addrs)) {
298         elt->input_listen_addrs = NULL;
299         xfer_cancel_with_error(XFER_ELEMENT(self),
300             "Error starting DirectTCP listen: %s",
301             device_error_or_status(self->device));
302         self->listen_ok = FALSE;
303         return FALSE;
304     }
305
306     self->listen_ok = TRUE;
307     return TRUE;
308 }
309
310 static gboolean
311 start_impl(
312     XferElement *elt)
313 {
314     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)elt;
315     GError *error = NULL;
316
317     self->paused = TRUE;
318
319     /* start up the thread */
320     self->worker_thread = g_thread_create(worker_thread, (gpointer)self, TRUE, &error);
321     if (!self->worker_thread) {
322         g_critical(_("Error creating new thread: %s (%s)"),
323             error->message, errno? strerror(errno) : _("no error code"));
324     }
325
326     return TRUE;
327 }
328
329 static gboolean
330 cancel_impl(
331     XferElement *elt,
332     gboolean expect_eof)
333 {
334     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(elt);
335     gboolean rv;
336
337     /* chain up first */
338     rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
339
340     /* signal all of the condition variables to realize that we're no
341      * longer paused */
342     g_mutex_lock(self->state_mutex);
343     g_cond_broadcast(self->paused_cond);
344     g_cond_broadcast(self->abort_cond);
345     g_mutex_unlock(self->state_mutex);
346
347     return rv;
348 }
349
350 static void
351 start_part_impl(
352     XferDestTaper *xdtself,
353     gboolean retry_part,
354     dumpfile_t *header)
355 {
356     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(xdtself);
357
358     /* the only way self->device can become NULL is if use_device fails, in
359      * which case an error is already queued up, so just return silently */
360     if (self->device == NULL)
361         return;
362
363     g_assert(!self->device->in_file);
364     g_assert(header != NULL);
365
366     DBG(1, "start_part(retry_part=%d)", retry_part);
367
368     g_mutex_lock(self->state_mutex);
369     g_assert(self->paused);
370
371     if (self->part_header)
372         dumpfile_free(self->part_header);
373     self->part_header = dumpfile_copy(header);
374
375     DBG(1, "unpausing");
376     self->paused = FALSE;
377     g_cond_broadcast(self->paused_cond);
378
379     g_mutex_unlock(self->state_mutex);
380 }
381
382 static void
383 use_device_impl(
384     XferDestTaper *xdtself,
385     Device *device)
386 {
387     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(xdtself);
388
389     /* short-circuit if nothing is changing */
390     if (self->device == device)
391         return;
392
393     g_mutex_lock(self->state_mutex);
394
395     if (self->device)
396         g_object_unref(self->device);
397     self->device = NULL;
398
399     /* if we already have a connection, then make this device use it */
400     if (self->conn) {
401         if (!device_use_connection(device, self->conn)) {
402             /* queue up an error for later, and leave the device NULL.
403              * start_part will see this and fail silently */
404             xfer_cancel_with_error(XFER_ELEMENT(self),
405                 _("Failed part was not cached; cannot retry"));
406             return;
407         }
408     }
409
410     self->device = device;
411     g_object_ref(device);
412
413     g_mutex_unlock(self->state_mutex);
414 }
415
416 static guint64
417 get_part_bytes_written_impl(
418     XferDestTaper *xdtself G_GNUC_UNUSED)
419 {
420     /* This operation is not supported for this taper dest.  Maybe someday. */
421     return 0;
422 }
423
424 static void
425 instance_init(
426     XferElement *elt)
427 {
428     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(elt);
429     elt->can_generate_eof = FALSE;
430
431     self->worker_thread = NULL;
432     self->paused = TRUE;
433     self->conn = NULL;
434     self->state_mutex = g_mutex_new();
435     self->paused_cond = g_cond_new();
436     self->abort_cond = g_cond_new();
437 }
438
439 static void
440 finalize_impl(
441     GObject * obj_self)
442 {
443     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(obj_self);
444
445     if (self->conn)
446         g_object_unref(self->conn);
447     self->conn = NULL;
448
449     if (self->device)
450         g_object_unref(self->device);
451     self->device = NULL;
452
453     if (self->device)
454         g_object_unref(self->device);
455     self->device = NULL;
456
457     g_mutex_free(self->state_mutex);
458     g_cond_free(self->paused_cond);
459     g_cond_free(self->abort_cond);
460
461     if (self->part_header)
462         dumpfile_free(self->part_header);
463     self->part_header = NULL;
464
465     /* chain up */
466     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
467 }
468
469 static void
470 class_init(
471     XferDestTaperDirectTCPClass * selfc)
472 {
473     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
474     XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
475     GObjectClass *goc = G_OBJECT_CLASS(selfc);
476     static xfer_element_mech_pair_t mech_pairs[] = {
477         { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 0, 0},
478         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
479     };
480
481     klass->start = start_impl;
482     klass->setup = setup_impl;
483     klass->cancel = cancel_impl;
484     xdt_klass->start_part = start_part_impl;
485     xdt_klass->use_device = use_device_impl;
486     xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
487     goc->finalize = finalize_impl;
488
489     klass->perl_class = "Amanda::Xfer::Dest::Taper::DirectTCP";
490     klass->mech_pairs = mech_pairs;
491
492     parent_class = g_type_class_peek_parent(selfc);
493 }
494
495 static GType
496 xfer_dest_taper_directtcp_get_type (void)
497 {
498     static GType type = 0;
499
500     if G_UNLIKELY(type == 0) {
501         static const GTypeInfo info = {
502             sizeof (XferDestTaperDirectTCPClass),
503             (GBaseInitFunc) NULL,
504             (GBaseFinalizeFunc) NULL,
505             (GClassInitFunc) class_init,
506             (GClassFinalizeFunc) NULL,
507             NULL /* class_data */,
508             sizeof (XferDestTaperDirectTCP),
509             0 /* n_preallocs */,
510             (GInstanceInitFunc) instance_init,
511             NULL
512         };
513
514         type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperDirectTCP", &info, 0);
515     }
516
517     return type;
518 }
519
520 /*
521  * Constructor
522  */
523
524 XferElement *
525 xfer_dest_taper_directtcp(Device *first_device, guint64 part_size)
526 {
527     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)g_object_new(XFER_DEST_TAPER_DIRECTTCP_TYPE, NULL);
528
529     g_assert(device_directtcp_supported(first_device));
530
531     self->part_size = part_size;
532     self->device = first_device;
533     self->partnum = 1;
534     g_object_ref(self->device);
535
536     return XFER_ELEMENT(self);
537 }