add bug closure to changelog
[debian/amanda] / device-src / xfer-dest-taper-directtcp.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amanda.h"
23 #include "amxfer.h"
24 #include "xfer-device.h"
25 #include "arglist.h"
26 #include "conffile.h"
27
28 /* A transfer destination that writes and entire dumpfile to one or more files
29  * on one or more devices via DirectTCP, handling the work of spanning a
30  * directtcp connection over multiple devices.  Note that this assumes the
31  * devices support early EOM warning. */
32
33 /*
34  * Xfer Dest Taper DirectTCP
35  */
36
37 static GType xfer_dest_taper_directtcp_get_type(void);
38 #define XFER_DEST_TAPER_DIRECTTCP_TYPE (xfer_dest_taper_directtcp_get_type())
39 #define XFER_DEST_TAPER_DIRECTTCP(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCP)
40 #define XFER_DEST_TAPER_DIRECTTCP_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCP const)
41 #define XFER_DEST_TAPER_DIRECTTCP_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCPClass)
42 #define IS_XFER_DEST_TAPER_DIRECTTCP(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_directtcp_get_type ())
43 #define XFER_DEST_TAPER_DIRECTTCP_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_directtcp_get_type(), XferDestTaperDirectTCPClass)
44
45 static GObjectClass *parent_class = NULL;
46
47 typedef struct XferDestTaperDirectTCP {
48     XferDestTaper __parent__;
49
50     /* constructor parameters */
51     guint64 part_size; /* (bytes) */
52
53     /* thread */
54     GThread *worker_thread;
55
56     /* state (governs everything below) */
57     GMutex *state_mutex;
58
59     /* part parameters */
60     Device *volatile device; /* device to write to (refcounted) */
61     dumpfile_t *volatile part_header;
62
63     /* did the device listen proceed without error? */
64     gboolean listen_ok;
65
66     /* part number in progress */
67     volatile guint64 partnum;
68
69     /* connection we're writing to (refcounted) */
70     DirectTCPConnection *conn;
71
72     /* is the element paused, waiting to start a new part? this is set to FALSE
73      * by the main thread to start a part, and the worker thread waits on the
74      * corresponding condition variable. */
75     volatile gboolean paused;
76     GCond *paused_cond;
77     GCond *abort_accept_cond; /* condition to trigger to abort an accept */
78
79 } XferDestTaperDirectTCP;
80
81 typedef struct {
82     XferDestTaperClass __parent__;
83 } XferDestTaperDirectTCPClass;
84
85 /*
86  * Debug logging
87  */
88
89 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
90 static void
91 _xdt_dbg(const char *fmt, ...)
92 {
93     va_list argp;
94     char msg[1024];
95
96     arglist_start(argp, fmt);
97     g_vsnprintf(msg, sizeof(msg), fmt, argp);
98     arglist_end(argp);
99     g_debug("XDT: %s", msg);
100 }
101 /*
102  * Worker Thread
103  */
104
105 static gpointer
106 worker_thread(
107     gpointer data)
108 {
109     XferElement *elt = (XferElement *)data;
110     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)data;
111     GTimer *timer = g_timer_new();
112     int result;
113
114     /* This thread's job is to accept() an incoming connection, then call
115      * write_from_connection for each part, and then close the connection */
116
117     /* If the device_listen failed, then we will soon be cancelled, so wait
118      * for that to occur and then send XMSG_DONE */
119     if (!self->listen_ok) {
120         DBG(2, "listen failed; waiting for cancellation without attempting an accept");
121         wait_until_xfer_cancelled(elt->xfer);
122         goto send_xmsg_done;
123     }
124
125     g_mutex_lock(self->state_mutex);
126
127     /* first, accept a new connection from the device */
128     DBG(2, "accepting DirectTCP connection on device %s", self->device->device_name);
129     result = device_accept_with_cond(self->device, &self->conn,
130                                      self->state_mutex,
131                                      self->abort_accept_cond);
132     if (result == 2) {
133         xfer_cancel_with_error(XFER_ELEMENT(self),
134             "accepting DirectTCP connection: %s",
135             device_error_or_status(self->device));
136         g_mutex_unlock(self->state_mutex);
137         return NULL;
138     } else if (result == 1) {
139         g_mutex_unlock(self->state_mutex);
140         return NULL;
141     }
142
143     DBG(2, "connection accepted; sending XMSG_READY");
144     xfer_queue_message(elt->xfer, xmsg_new(elt, XMSG_READY, 0));
145
146     /* round the part size up to the next multiple of the block size */
147     if (self->part_size) {
148         self->part_size += self->device->block_size-1;
149         self->part_size -= self->part_size % self->device->block_size;
150     }
151
152     /* now loop until we're out of parts */
153     while (1) {
154         guint64 size;
155         int fileno;
156         XMsg *msg = NULL;
157         gboolean eom, eof;
158
159         /* wait to be un-paused */
160         while (!elt->cancelled && self->paused) {
161             DBG(9, "waiting to be un-paused");
162             g_cond_wait(self->paused_cond, self->state_mutex);
163         }
164         DBG(9, "done waiting");
165
166         if (elt->cancelled)
167             break;
168
169         DBG(2, "writing part to %s", self->device->device_name);
170         if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
171             /* this is not fatal to the transfer, since no data was lost.  We
172              * just need a new device.  The scribe special-cases 0-byte parts, and will
173              * not record this in the catalog. */
174
175             /* clean up */
176             dumpfile_free(self->part_header);
177             self->part_header = NULL;
178
179             goto empty_part;
180         }
181
182         dumpfile_free(self->part_header);
183         self->part_header = NULL;
184
185         fileno = self->device->file;
186         g_assert(fileno > 0);
187
188         /* write the part */
189         g_timer_start(timer);
190         if (!device_write_from_connection(self->device,
191                 self->part_size, &size)) {
192             /* even if this is just a physical EOM, we may have lost data, so
193              * the whole transfer is dead. */
194             xfer_cancel_with_error(XFER_ELEMENT(self),
195                 "Error writing from DirectTCP connection: %s",
196                 device_error_or_status(self->device));
197             goto cancelled;
198         }
199         g_timer_stop(timer);
200
201         eom = self->device->is_eom;
202         eof = self->device->is_eof;
203
204         /* finish the file, even if we're at EOM, but if this fails then we may
205          * have lost data */
206         if (!device_finish_file(self->device)) {
207             xfer_cancel_with_error(XFER_ELEMENT(self),
208                 "Error finishing tape file: %s",
209                 device_error_or_status(self->device));
210             goto cancelled;
211         }
212
213         /* if we wrote zero bytes and reached EOM, then this is an empty part */
214         if (eom && !eof && size == 0) {
215             goto empty_part;
216         }
217
218         msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
219         msg->size = size;
220         msg->duration = g_timer_elapsed(timer, NULL);
221         msg->partnum = self->partnum;
222         msg->fileno = fileno;
223         msg->successful = TRUE;
224         msg->eom = eom;
225         msg->eof = eof;
226
227         /* time runs backward on some test boxes, so make sure this is positive */
228         if (msg->duration < 0) msg->duration = 0;
229
230         xfer_queue_message(elt->xfer, msg);
231
232         self->partnum++;
233
234         /* we're done at EOF */
235         if (eof)
236             break;
237
238         /* wait to be unpaused again */
239         self->paused = TRUE;
240         continue;
241
242 empty_part:
243         msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
244         msg->size = 0;
245         msg->duration = 0;
246         msg->partnum = 0;
247         msg->fileno = 0;
248         msg->successful = TRUE;
249         msg->eom = TRUE;
250         msg->eof = FALSE;
251         xfer_queue_message(elt->xfer, msg);
252
253         /* wait to be unpaused again */
254         self->paused = TRUE;
255         continue;
256
257 cancelled:
258         /* drop the mutex and wait until all elements have been cancelled
259          * before closing the connection */
260         g_mutex_unlock(self->state_mutex);
261         wait_until_xfer_cancelled(elt->xfer);
262         g_mutex_lock(self->state_mutex);
263         break;
264     }
265
266     /* close the DirectTCP connection */
267     directtcp_connection_close(self->conn);
268     g_object_unref(self->conn);
269     self->conn = NULL;
270
271     g_mutex_unlock(self->state_mutex);
272     g_timer_destroy(timer);
273
274 send_xmsg_done:
275     xfer_queue_message(elt->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
276
277     return NULL;
278 }
279
280 /*
281  * Element mechanics
282  */
283
284 static gboolean
285 setup_impl(
286     XferElement *elt)
287 {
288     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)elt;
289
290     /* start the device listening, and get the addresses */
291     if (!device_listen(self->device, TRUE, &elt->input_listen_addrs)) {
292         elt->input_listen_addrs = NULL;
293         xfer_cancel_with_error(XFER_ELEMENT(self),
294             "Error starting DirectTCP listen: %s",
295             device_error_or_status(self->device));
296         self->listen_ok = FALSE;
297         return FALSE;
298     }
299
300     self->listen_ok = TRUE;
301     return TRUE;
302 }
303
304 static gboolean
305 start_impl(
306     XferElement *elt)
307 {
308     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)elt;
309     GError *error = NULL;
310
311     self->paused = TRUE;
312
313     /* start up the thread */
314     self->worker_thread = g_thread_create(worker_thread, (gpointer)self, TRUE, &error);
315     if (!self->worker_thread) {
316         g_critical(_("Error creating new thread: %s (%s)"),
317             error->message, errno? strerror(errno) : _("no error code"));
318     }
319
320     return TRUE;
321 }
322
323 static gboolean
324 cancel_impl(
325     XferElement *elt,
326     gboolean expect_eof)
327 {
328     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(elt);
329     gboolean rv;
330
331     /* chain up first */
332     rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
333
334     /* signal all of the condition variables to realize that we're no
335      * longer paused */
336     g_mutex_lock(self->state_mutex);
337     g_cond_broadcast(self->paused_cond);
338     g_cond_broadcast(self->abort_accept_cond);
339     g_mutex_unlock(self->state_mutex);
340
341     return rv;
342 }
343
344 static void
345 start_part_impl(
346     XferDestTaper *xdtself,
347     gboolean retry_part,
348     dumpfile_t *header)
349 {
350     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(xdtself);
351
352     /* the only way self->device can become NULL is if use_device fails, in
353      * which case an error is already queued up, so just return silently */
354     if (self->device == NULL)
355         return;
356
357     g_assert(!self->device->in_file);
358     g_assert(header != NULL);
359
360     DBG(1, "start_part(retry_part=%d)", retry_part);
361
362     g_mutex_lock(self->state_mutex);
363     g_assert(self->paused);
364
365     if (self->part_header)
366         dumpfile_free(self->part_header);
367     self->part_header = dumpfile_copy(header);
368
369     DBG(1, "unpausing");
370     self->paused = FALSE;
371     g_cond_broadcast(self->paused_cond);
372
373     g_mutex_unlock(self->state_mutex);
374 }
375
376 static void
377 use_device_impl(
378     XferDestTaper *xdtself,
379     Device *device)
380 {
381     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(xdtself);
382
383     /* short-circuit if nothing is changing */
384     if (self->device == device)
385         return;
386
387     g_mutex_lock(self->state_mutex);
388
389     if (self->device)
390         g_object_unref(self->device);
391     self->device = NULL;
392
393     /* if we already have a connection, then make this device use it */
394     if (self->conn) {
395         if (!device_use_connection(device, self->conn)) {
396             /* queue up an error for later, and leave the device NULL.
397              * start_part will see this and fail silently */
398             xfer_cancel_with_error(XFER_ELEMENT(self),
399                 _("Failed part was not cached; cannot retry"));
400             return;
401         }
402     }
403
404     self->device = device;
405     g_object_ref(device);
406
407     g_mutex_unlock(self->state_mutex);
408 }
409
410 static guint64
411 get_part_bytes_written_impl(
412     XferDestTaper *xdtself G_GNUC_UNUSED)
413 {
414     /* This operation is not supported for this taper dest.  Maybe someday. */
415     return 0;
416 }
417
418 static void
419 instance_init(
420     XferElement *elt)
421 {
422     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(elt);
423     elt->can_generate_eof = FALSE;
424
425     self->worker_thread = NULL;
426     self->paused = TRUE;
427     self->conn = NULL;
428     self->state_mutex = g_mutex_new();
429     self->paused_cond = g_cond_new();
430     self->abort_accept_cond = g_cond_new();
431 }
432
433 static void
434 finalize_impl(
435     GObject * obj_self)
436 {
437     XferDestTaperDirectTCP *self = XFER_DEST_TAPER_DIRECTTCP(obj_self);
438
439     if (self->conn)
440         g_object_unref(self->conn);
441     self->conn = NULL;
442
443     if (self->device)
444         g_object_unref(self->device);
445     self->device = NULL;
446
447     if (self->device)
448         g_object_unref(self->device);
449     self->device = NULL;
450
451     g_mutex_free(self->state_mutex);
452     g_cond_free(self->paused_cond);
453     g_cond_free(self->abort_accept_cond);
454
455     if (self->part_header)
456         dumpfile_free(self->part_header);
457     self->part_header = NULL;
458
459     /* chain up */
460     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
461 }
462
463 static void
464 class_init(
465     XferDestTaperDirectTCPClass * selfc)
466 {
467     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
468     XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
469     GObjectClass *goc = G_OBJECT_CLASS(selfc);
470     static xfer_element_mech_pair_t mech_pairs[] = {
471         { XFER_MECH_DIRECTTCP_LISTEN, XFER_MECH_NONE, 0, 0},
472         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
473     };
474
475     klass->start = start_impl;
476     klass->setup = setup_impl;
477     klass->cancel = cancel_impl;
478     xdt_klass->start_part = start_part_impl;
479     xdt_klass->use_device = use_device_impl;
480     xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
481     goc->finalize = finalize_impl;
482
483     klass->perl_class = "Amanda::Xfer::Dest::Taper::DirectTCP";
484     klass->mech_pairs = mech_pairs;
485
486     parent_class = g_type_class_peek_parent(selfc);
487 }
488
489 static GType
490 xfer_dest_taper_directtcp_get_type (void)
491 {
492     static GType type = 0;
493
494     if G_UNLIKELY(type == 0) {
495         static const GTypeInfo info = {
496             sizeof (XferDestTaperDirectTCPClass),
497             (GBaseInitFunc) NULL,
498             (GBaseFinalizeFunc) NULL,
499             (GClassInitFunc) class_init,
500             (GClassFinalizeFunc) NULL,
501             NULL /* class_data */,
502             sizeof (XferDestTaperDirectTCP),
503             0 /* n_preallocs */,
504             (GInstanceInitFunc) instance_init,
505             NULL
506         };
507
508         type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperDirectTCP", &info, 0);
509     }
510
511     return type;
512 }
513
514 /*
515  * Constructor
516  */
517
518 XferElement *
519 xfer_dest_taper_directtcp(Device *first_device, guint64 part_size)
520 {
521     XferDestTaperDirectTCP *self = (XferDestTaperDirectTCP *)g_object_new(XFER_DEST_TAPER_DIRECTTCP_TYPE, NULL);
522
523     g_assert(device_directtcp_supported(first_device));
524
525     self->part_size = part_size;
526     self->device = first_device;
527     self->partnum = 1;
528     g_object_ref(self->device);
529
530     return XFER_ELEMENT(self);
531 }