Imported Upstream version 3.1.0
[debian/amanda] / device-src / xfer-dest-taper-directtcp.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amxfer.h"
23 #include "amanda.h"
24 #include "xfer-device.h"
25 #include "arglist.h"
26 #include "conffile.h"
27
28 /* A transfer destination that writes and entire dumpfile to one or more files
29  * on one or more devices via DirectTCP, handling the work of spanning a
30  * directtcp connection over multiple devices.  Note that this assumes the
31  * devices support early EOM warning. */
32
33 /*
34  * Xfer Dest Taper DirectTCP
35  */
36
37 static GType xfer_dest_taper_directtcp_get_type(void);
38 #define XFER_DEST_TAPER_DIRECTTCP_TYPE (xfer_dest_taper_directtcp_get_type())
39 #define XFER_DEST_TAPER_DIRECTTCP(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCP)
40 #define XFER_DEST_TAPER_DIRECTTCP_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCP const)
41 #define XFER_DEST_TAPER_DIRECTTCP_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCPClass)
42 #define IS_XFER_DEST_TAPER_DIRECTTCP(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_directtcp_get_type ())
43 #define XFER_DEST_TAPER_DIRECTTCP_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCPClass)
44
45 static GObjectClass *parent_class = NULL;
46
47 typedef struct XferDestTaperDirectTCP {
48     XferDestTaper __parent__;
49
50     /* constructor parameters */
51     guint64 part_size; /* (bytes) */
52
53     /* thread */
54     GThread *worker_thread;
55
56     /* state (governs everything below) */
57     GMutex *state_mutex;
58
59     /* part parameters */
60     Device *volatile device; /* device to write to (refcounted) */
61     dumpfile_t *volatile part_header;
62
63     /* did the device listen proceed without error? */
64     gboolean listen_ok;
65
66     /* part number in progress */
67     volatile guint64 partnum;
68
69     /* connection we're writing to (refcounted) */
70     DirectTCPConnection *conn;
71
72     /* is the element paused, waiting to start a new part? this is set to FALSE
73      * by the main thread to start a part, and the worker thread waits on the
74      * corresponding condition variable. */
75     volatile gboolean paused;
76     GCond *paused_cond;
77
78 } XferDestTaperDirectTCP;
79
80 typedef struct {
81     XferDestTaperClass __parent__;
82 } XferDestTaperDirectTCPClass;
83
84 /*
85  * Debug logging
86  */
87
88 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
89 static void
90 _xdt_dbg(const char *fmt, ...)
91 {
92     va_list argp;
93     char msg[1024];
94
95     arglist_start(argp, fmt);
96     g_vsnprintf(msg, sizeof(msg), fmt, argp);
97     arglist_end(argp);
98     g_debug("XDT thd-%p: %s", g_thread_self(), msg);
99 }
100 /*
101  * Worker Thread
102  */
103
104 static gpointer
105 worker_thread(
106     gpointer data)
107 {
108     XferElement *elt = (XferElement *)data;
109     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)data;
110     GTimer *timer = g_timer_new();
111
112     /* This thread's job is to accept() an incoming connection, then call
113      * write_from_connection for each part, and then close the connection */
114
115     /* If the device_listen failed, then we will soon be cancelled, so wait
116      * for that to occur and then send XMSG_DONE */
117     if (!self->listen_ok) {
118         DBG(2, "listen failed; waiting for cancellation without attempting an accept");
119         wait_until_xfer_cancelled(elt->xfer);
120         goto send_xmsg_done;
121     }
122
123     g_mutex_lock(self->state_mutex);
124
125     /* first, accept a new connection from the device */
126     DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
127     if (!device_accept(self->device, &self->conn, NULL, NULL)) {
128         xfer_cancel_with_error(XFER_ELEMENT(self),
129             "accepting DirectTCP connection: %s",
130             device_error_or_status(self->device));
131         g_mutex_unlock(self->state_mutex);
132         return NULL;
133     }
134
135     DBG(2, "connection accepted; sending XMSG_READY");
136     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
137
138     /* now loop until we're out of parts */
139     while (1) {
140         guint64 size;
141         int fileno;
142         XMsg *msg = NULL;
143         gboolean eom, eof;
144
145         /* wait to be un-paused */
146         while (!elt->cancelled && self->paused) {
147             DBG(9, "waiting to be un-paused");
148             g_cond_wait(self->paused_cond, self->state_mutex);
149         }
150         DBG(9, "done waiting");
151
152         if (elt->cancelled)
153             break;
154
155         DBG(2, "writing part to %s", self->device->device_name);
156         if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
157             /* this is not fatal to the transfer, since no data was lost.  We
158              * just need a new device.  The scribe special-cases 0-byte parts, and will
159              * not record this in the catalog. */
160
161             /* clean up */
162             dumpfile_free(self->part_header);
163             self->part_header = NULL;
164
165             goto empty_part;
166         }
167
168         dumpfile_free(self->part_header);
169         self->part_header = NULL;
170
171         fileno = self->device->file;
172         g_assert(fileno > 0);
173
174         /* write the part */
175         g_timer_start(timer);
176         if (!device_write_from_connection(self->device,
177                 self->part_size, &size)) {
178             /* even if this is just a physical EOM, we may have lost data, so
179              * the whole transfer is dead. */
180             xfer_cancel_with_error(XFER_ELEMENT(self),
181                 "Error writing from DirectTCP connection: %s",
182                 device_error_or_status(self->device));
183             goto cancelled;
184         }
185         g_timer_stop(timer);
186
187         eom = self->device->is_eom;
188         eof = self->device->is_eof;
189
190         /* finish the file, even if we're at EOM, but if this fails then we may
191          * have lost data */
192         if (!device_finish_file(self->device)) {
193             xfer_cancel_with_error(XFER_ELEMENT(self),
194                 "Error finishing tape file: %s",
195                 device_error_or_status(self->device));
196             goto cancelled;
197         }
198
199         /* if we wrote zero bytes and reached EOM, then this is an empty part */
200         if (eom && !eof && size == 0) {
201             goto empty_part;
202         }
203
204         msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
205         msg->size = size;
206         msg->duration = g_timer_elapsed(timer, NULL);
207         msg->partnum = self->partnum;
208         msg->fileno = fileno;
209         msg->successful = TRUE;
210         msg->eom = eom;
211         msg->eof = eof;
212         xfer_queue_message(elt->xfer, msg);
213
214         self->partnum++;
215
216         /* we're done at EOF */
217         if (eof)
218             break;
219
220         /* wait to be unpaused again */
221         self->paused = TRUE;
222         continue;
223
224 empty_part:
225         msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
226         msg->size = 0;
227         msg->duration = 0;
228         msg->partnum = 0;
229         msg->fileno = 0;
230         msg->successful = TRUE;
231         msg->eom = TRUE;
232         msg->eof = FALSE;
233         xfer_queue_message(elt->xfer, msg);
234
235         /* wait to be unpaused again */
236         self->paused = TRUE;
237         continue;
238
239 cancelled:
240         /* drop the mutex and wait until all elements have been cancelled
241          * before closing the connection */
242         g_mutex_unlock(self->state_mutex);
243         wait_until_xfer_cancelled(elt->xfer);
244         g_mutex_lock(self->state_mutex);
245         break;
246     }
247
248     /* close the DirectTCP connection */
249     directtcp_connection_close(self->conn);
250     g_object_unref(self->conn);
251     self->conn = NULL;
252
253     g_mutex_unlock(self->state_mutex);
254     g_timer_destroy(timer);
255
256 send_xmsg_done:
257     xfer_queue_message(elt->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
258
259     return NULL;
260 }
261
262 /*
263  * Element mechanics
264  */
265
266 static gboolean
267 setup_impl(
268     XferElement *elt)
269 {
270     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)elt;
271
272     /* start the device listening, and get the addresses */
273     if (!device_listen(self->device, TRUE, &elt->input_listen_addrs)) {
274         elt->input_listen_addrs = NULL;
275         xfer_cancel_with_error(XFER_ELEMENT(self),
276             "Error starting DirectTCP listen: %s",
277             device_error_or_status(self->device));
278         self->listen_ok = FALSE;
279         return FALSE;
280     }
281
282     self->listen_ok = TRUE;
283     return TRUE;
284 }
285
286 static gboolean
287 start_impl(
288     XferElement *elt)
289 {
290     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)elt;
291     GError *error = NULL;
292
293     self->paused = TRUE;
294
295     /* start up the thread */
296     self->worker_thread = g_thread_create(worker_thread, (gpointer)self, TRUE, &error);
297     if (!self->worker_thread) {
298         g_critical(_("Error creating new thread: %s (%s)"),
299             error->message, errno? strerror(errno) : _("no error code"));
300     }
301
302     return TRUE;
303 }
304
305 static gboolean
306 cancel_impl(
307     XferElement *elt,
308     gboolean expect_eof)
309 {
310     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(elt);
311     gboolean rv;
312
313     /* chain up first */
314     rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
315
316     /* signal all of the condition variables to realize that we're no
317      * longer paused */
318     g_mutex_lock(self->state_mutex);
319     g_cond_broadcast(self->paused_cond);
320     g_mutex_unlock(self->state_mutex);
321
322     return rv;
323 }
324
325 static void
326 start_part_impl(
327     XferDestTaper *xdtself,
328     gboolean retry_part,
329     dumpfile_t *header)
330 {
331     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(xdtself);
332
333     /* the only way self->device can become NULL is if use_device fails, in
334      * which case an error is already queued up, so just return silently */
335     if (self->device == NULL)
336         return;
337
338     g_assert(!self->device->in_file);
339     g_assert(header != NULL);
340
341     DBG(1, "start_part(retry_part=%d)", retry_part);
342
343     g_mutex_lock(self->state_mutex);
344     g_assert(self->paused);
345
346     if (self->part_header)
347         dumpfile_free(self->part_header);
348     self->part_header = dumpfile_copy(header);
349
350     DBG(1, "unpausing");
351     self->paused = FALSE;
352     g_cond_broadcast(self->paused_cond);
353
354     g_mutex_unlock(self->state_mutex);
355 }
356
357 static void
358 use_device_impl(
359     XferDestTaper *xdtself,
360     Device *device)
361 {
362     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(xdtself);
363
364     /* short-circuit if nothing is changing */
365     if (self->device == device)
366         return;
367
368     g_mutex_lock(self->state_mutex);
369
370     if (self->device)
371         g_object_unref(self->device);
372     self->device = NULL;
373
374     /* if we already have a connection, then make this device use it */
375     if (self->conn) {
376         if (!device_use_connection(device, self->conn)) {
377             /* queue up an error for later, and leave the device NULL.
378              * start_part will see this and fail silently */
379             xfer_cancel_with_error(XFER_ELEMENT(self),
380                 _("Failed part was not cached; cannot retry"));
381             return;
382         }
383     }
384
385     self->device = device;
386     g_object_ref(device);
387
388     g_mutex_unlock(self->state_mutex);
389 }
390
391 static void
392 cache_inform_impl(
393     XferDestTaper *xdtself G_GNUC_UNUSED,
394     const char *filename G_GNUC_UNUSED,
395     off_t offset G_GNUC_UNUSED,
396     off_t length G_GNUC_UNUSED)
397 {
398     /* do nothing */
399 }
400
401 static guint64
402 get_part_bytes_written_impl(
403     XferDestTaper *xdtself G_GNUC_UNUSED)
404 {
405     /* This operation is not supported for this taper dest.  Maybe someday. */
406     return 0;
407 }
408
409 static void
410 instance_init(
411     XferElement *elt)
412 {
413     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(elt);
414     elt->can_generate_eof = FALSE;
415
416     self->worker_thread = NULL;
417     self->paused = TRUE;
418     self->conn = NULL;
419     self->state_mutex = g_mutex_new();
420     self->paused_cond = g_cond_new();
421 }
422
423 static void
424 finalize_impl(
425     GObject * obj_self)
426 {
427     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(obj_self);
428
429     if (self->conn)
430         g_object_unref(self->conn);
431     self->conn = NULL;
432
433     if (self->device)
434         g_object_unref(self->device);
435     self->device = NULL;
436
437     if (self->device)
438         g_object_unref(self->device);
439     self->device = NULL;
440
441     g_mutex_free(self->state_mutex);
442     g_cond_free(self->paused_cond);
443
444     if (self->part_header)
445         dumpfile_free(self->part_header);
446     self->part_header = NULL;
447
448     /* chain up */
449     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
450 }
451
452 static void
453 class_init(
454     XferDestTaperDirectTCPClass * selfc)
455 {
456     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
457     XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
458     GObjectClass *goc = G_OBJECT_CLASS(selfc);
459     static xfer_element_mech_pair_t mech_pairs[] = {
460         { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 0, 0},
461         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
462     };
463
464     klass->start = start_impl;
465     klass->setup = setup_impl;
466     klass->cancel = cancel_impl;
467     xdt_klass->start_part = start_part_impl;
468     xdt_klass->use_device = use_device_impl;
469     xdt_klass->cache_inform = cache_inform_impl;
470     xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
471     goc->finalize = finalize_impl;
472
473     klass->perl_class = "Amanda::Xfer::Dest::Taper::DirectTCP";
474     klass->mech_pairs = mech_pairs;
475
476     parent_class = g_type_class_peek_parent(selfc);
477 }
478
479 static GType
480 xfer_dest_taper_directtcp_get_type (void)
481 {
482     static GType type = 0;
483
484     if G_UNLIKELY(type == 0) {
485         static const GTypeInfo info = {
486             sizeof (XferDestTaperDirectTCPClass),
487             (GBaseInitFunc) NULL,
488             (GBaseFinalizeFunc) NULL,
489             (GClassInitFunc) class_init,
490             (GClassFinalizeFunc) NULL,
491             NULL /* class_data */,
492             sizeof (XferDestTaperDirectTCP),
493             0 /* n_preallocs */,
494             (GInstanceInitFunc) instance_init,
495             NULL
496         };
497
498         type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperDirectTCP", &info, 0);
499     }
500
501     return type;
502 }
503
504 /*
505  * Constructor
506  */
507
508 XferElement *
509 xfer_dest_taper_directtcp(Device *first_device, guint64 part_size)
510 {
511     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)g_object_new(XFER_DEST_TAPER_DIRECTTCP_TYPE, NULL);
512
513     g_assert(device_directtcp_supported(first_device));
514
515     self->part_size = part_size;
516     self->device = first_device;
517     self->partnum = 1;
518     g_object_ref(self->device);
519
520     return XFER_ELEMENT(self);
521 }