Imported Upstream version 3.3.3
[debian/amanda] / device-src / xfer-dest-taper-splitter.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18  *
19  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21  */
22
23 #include "amanda.h"
24 #include "amxfer.h"
25 #include "xfer-device.h"
26 #include "arglist.h"
27 #include "conffile.h"
28
29 /* A transfer destination that writes an entire dumpfile to one or more files
30  * on one or more devices, without any caching.  This destination supports both
31  * LEOM-based splitting (in which parts are never rewound) and cache_inform-based
32  * splitting (in which rewound parts are read from holding disk). */
33
34 /*
35  * File Slices - Cache Information
36  *
37  * The cache_inform implementation adds cache information to a linked list of
38  * these objects, in order.  The objects are arranged in a linked list, and
39  * describe the files in which the part data is stored.  Note that we assume
40  * that slices are added *before* they are needed: the xfer element will fail
41  * if it tries to rewind and does not find a suitable slice.
42  *
43  * The slices should be "fast forwarded" after every part, so that the first
44  * byte in part_slices is the first byte of the part; when a retry of a part is
45  * required, use the iterator methods to properly open the various files and do
46  * the buffering.
47  */
48
49 typedef struct FileSlice {
50     struct FileSlice *next;
51
52     /* fully-qualified filename to read from (or NULL to read from
53      * disk_cache_read_fd in XferDestTaperCacher) */
54     char *filename;
55
56     /* offset in file to start at */
57     guint64 offset;
58
59     /* length of data to read */
60     guint64 length;
61 } FileSlice;
62
63 /*
64  * Xfer Dest Taper
65  */
66
67 static GObjectClass *parent_class = NULL;
68
69 typedef struct XferDestTaperSplitter {
70     XferDestTaper __parent__;
71
72     /* object parameters
73      *
74      * These values are supplied to the constructor, and can be assumed
75      * constant for the lifetime of the element.
76      */
77
78     /* Maximum size of each part (bytes) */
79     guint64 part_size;
80
81     /* the device's need for streaming (it's assumed that all subsequent devices
82      * have the same needs) */
83     StreamingRequirement streaming;
84
85     /* block size expected by the target device */
86     gsize block_size;
87
88     /* TRUE if this element is expecting slices via cache_inform */
89     gboolean expect_cache_inform;
90
91     /* The thread doing the actual writes to tape; this also handles buffering
92      * for streaming */
93     GThread *device_thread;
94
95     /* Ring Buffer
96      *
97      * This buffer holds MAX_MEMORY bytes of data (rounded up to the next
98      * blocksize), and serves as the interface between the device_thread and
99      * the thread calling push_buffer.  Ring_length is the total length of the
100      * buffer in bytes, while ring_count is the number of data bytes currently
101      * in the buffer.  The ring_add_cond is signalled when data is added to the
102      * buffer, while ring_free_cond is signalled when data is removed.  Both
103      * are governed by ring_mutex, and both are signalled when the transfer is
104      * cancelled.
105      */
106
107     GMutex *ring_mutex;
108     GCond *ring_add_cond, *ring_free_cond;
109     gchar *ring_buffer;
110     gsize ring_length, ring_count;
111     gsize ring_head, ring_tail;
112     gboolean ring_head_at_eof;
113
114     /* Element State
115      *
116      * "state" includes all of the variables below (including device
117      * parameters).  Note that the device_thread holdes this mutex for the
118      * entire duration of writing a part.
119      *
120      * state_mutex should always be locked before ring_mutex, if both are to be
121      * held simultaneously.
122      */
123     GMutex *state_mutex;
124     GCond *state_cond;
125     volatile gboolean paused;
126
127     /* The device to write to, and the header to write to it */
128     Device *volatile device;
129     dumpfile_t *volatile part_header;
130
131     /* bytes to read from cached slices before reading from the ring buffer */
132     guint64 bytes_to_read_from_slices;
133
134     /* part number in progress */
135     volatile guint64 partnum;
136
137     /* status of the last part */
138     gboolean last_part_eof;
139     gboolean last_part_eom;
140     gboolean last_part_successful;
141
142     /* true if the element is done writing to devices */
143     gboolean no_more_parts;
144
145     /* total bytes written in the current part */
146     volatile guint64 part_bytes_written;
147
148     /* The list of active slices for the current part.  The cache_inform method
149      * appends to this list. It is safe to read this linked list, beginning at
150      * the head, *if* you can guarantee that slices will not be fast-forwarded
151      * in the interim.  The finalize method for this class will take care of
152      * freeing any leftover slices. Take the part_slices mutex while modifying
153      * the links in this list. */
154     FileSlice *part_slices;
155     GMutex *part_slices_mutex;
156 } XferDestTaperSplitter;
157
158 static GType xfer_dest_taper_splitter_get_type(void);
159 #define XFER_DEST_TAPER_SPLITTER_TYPE (xfer_dest_taper_splitter_get_type())
160 #define XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter)
161 #define XFER_DEST_TAPER_SPLITTER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter const)
162 #define XFER_DEST_TAPER_SPLITTER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
163 #define IS_XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_splitter_get_type ())
164 #define XFER_DEST_TAPER_SPLITTER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
165
166 typedef struct {
167     XferDestTaperClass __parent__;
168
169 } XferDestTaperSplitterClass;
170
171 /*
172  * Debug logging
173  */
174
175 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
176 static void
177 _xdt_dbg(const char *fmt, ...)
178 {
179     va_list argp;
180     char msg[1024];
181
182     arglist_start(argp, fmt);
183     g_vsnprintf(msg, sizeof(msg), fmt, argp);
184     arglist_end(argp);
185     g_debug("XDT: %s", msg);
186 }
187
188 /* "Fast forward" the slice list by the given length.  This will free any
189  * slices that are no longer necessary, and adjust the offset and length of the
190  * first remaining slice.  This assumes the state mutex is locked during its
191  * operation.
192  *
193  * @param self: element
194  * @param length: number of bytes to fast forward
195  */
196 static void
197 fast_forward_slices(
198         XferDestTaperSplitter *self,
199         guint64 length)
200 {
201     FileSlice *slice;
202
203     /* consume slices until we've eaten the whole part */
204     g_mutex_lock(self->part_slices_mutex);
205     while (length > 0) {
206         g_assert(self->part_slices);
207         slice = self->part_slices;
208
209         if (slice->length <= length) {
210             length -= slice->length;
211
212             self->part_slices = slice->next;
213             if (slice->filename)
214                 g_free(slice->filename);
215             g_free(slice);
216             slice = self->part_slices;
217         } else {
218             slice->length -= length;
219             slice->offset += length;
220             break;
221         }
222     }
223     g_mutex_unlock(self->part_slices_mutex);
224 }
225
226 /*
227  * Slice Iterator
228  */
229
230 /* A struct for use in iterating over data in the slices */
231 typedef struct SliceIterator {
232     /* current slice */
233     FileSlice *slice;
234
235     /* file descriptor of the current file, or -1 if it's not open yet */
236     int cur_fd;
237
238     /* bytes remaining in this slice */
239     guint64 slice_remaining;
240 } SliceIterator;
241
242 /* Utility functions for SliceIterator */
243
244 /* Begin iterating over slices, starting at the first byte of the first slice.
245  * Initializes a pre-allocated SliceIterator.  The caller must ensure that
246  * fast_forward_slices is not called while an iteration is in
247  * progress.
248  */
249 static void
250 iterate_slices(
251         XferDestTaperSplitter *self,
252         SliceIterator *iter)
253 {
254     iter->cur_fd = -1;
255     iter->slice_remaining = 0;
256     g_mutex_lock(self->part_slices_mutex);
257     iter->slice = self->part_slices;
258     /* it's safe to unlock this because, at worst, a new entry will
259      * be appended while the iterator is in progress */
260     g_mutex_unlock(self->part_slices_mutex);
261 }
262
263
264 /* Get a block of data from the iterator, returning a pointer to a buffer
265  * containing the data; the buffer remains the property of the iterator.
266  * Returns NULL on error, after calling xfer_cancel_with_error with an
267  * appropriate error message.  This function does not block, so it does not
268  * check for cancellation.
269  */
270 static gpointer
271 iterator_get_block(
272         XferDestTaperSplitter *self,
273         SliceIterator *iter,
274         gpointer buf,
275         gsize bytes_needed)
276 {
277     gsize buf_offset = 0;
278     XferElement *elt = XFER_ELEMENT(self);
279
280     g_assert(iter != NULL);
281     g_assert(buf != NULL);
282
283     while (bytes_needed > 0) {
284         gsize read_size;
285         int bytes_read;
286
287         if (iter->cur_fd < 0) {
288             guint64 offset;
289
290             g_assert(iter->slice != NULL);
291             g_assert(iter->slice->filename != NULL);
292
293             iter->cur_fd = open(iter->slice->filename, O_RDONLY, 0);
294             if (iter->cur_fd < 0) {
295                 xfer_cancel_with_error(elt,
296                     _("Could not open '%s' for reading: %s"),
297                     iter->slice->filename, strerror(errno));
298                 return NULL;
299             }
300
301             iter->slice_remaining = iter->slice->length;
302             offset = iter->slice->offset;
303
304             if (lseek(iter->cur_fd, offset, SEEK_SET) == -1) {
305                 xfer_cancel_with_error(elt,
306                     _("Could not seek '%s' for reading: %s"),
307                     iter->slice->filename, strerror(errno));
308                 return NULL;
309             }
310         }
311
312         read_size = MIN(iter->slice_remaining, bytes_needed);
313         bytes_read = full_read(iter->cur_fd,
314                                buf + buf_offset,
315                                read_size);
316         if (bytes_read < 0 || (gsize)bytes_read < read_size) {
317             xfer_cancel_with_error(elt,
318                 _("Error reading '%s': %s"),
319                 iter->slice->filename,
320                 errno? strerror(errno) : _("Unexpected EOF"));
321             return NULL;
322         }
323
324         iter->slice_remaining -= bytes_read;
325         buf_offset += bytes_read;
326         bytes_needed -= bytes_read;
327
328         if (iter->slice_remaining <= 0) {
329             if (close(iter->cur_fd) < 0) {
330                 xfer_cancel_with_error(elt,
331                     _("Could not close fd %d: %s"),
332                     iter->cur_fd, strerror(errno));
333                 return NULL;
334             }
335             iter->cur_fd = -1;
336
337             iter->slice = iter->slice->next;
338
339             if (elt->cancelled)
340                 return NULL;
341         }
342     }
343
344     return buf;
345 }
346
347
348 /* Free the iterator's resources */
349 static void
350 iterator_free(
351         SliceIterator *iter)
352 {
353     if (iter->cur_fd >= 0)
354         close(iter->cur_fd);
355 }
356
357 /*
358  * Device Thread
359  */
360
361 /* Wait for at least one block, or EOF, to be available in the ring buffer.
362  * Called with the ring mutex held. */
363 static gsize
364 device_thread_wait_for_block(
365     XferDestTaperSplitter *self)
366 {
367     XferElement *elt = XFER_ELEMENT(self);
368     gsize bytes_needed = self->device->block_size;
369     gsize usable;
370
371     /* for any kind of streaming, we need to fill the entire buffer before the
372      * first byte */
373     if (self->part_bytes_written == 0 && self->streaming != STREAMING_REQUIREMENT_NONE)
374         bytes_needed = self->ring_length;
375
376     while (1) {
377         /* are we ready? */
378         if (elt->cancelled)
379             break;
380
381         if (self->ring_count >= bytes_needed)
382             break;
383
384         if (self->ring_head_at_eof)
385             break;
386
387         /* nope - so wait */
388         g_cond_wait(self->ring_add_cond, self->ring_mutex);
389
390         /* in STREAMING_REQUIREMENT_REQUIRED, once we decide to wait for more bytes,
391          * we need to wait for the entire buffer to fill */
392         if (self->streaming == STREAMING_REQUIREMENT_REQUIRED)
393             bytes_needed = self->ring_length;
394     }
395
396     usable = MIN(self->ring_count, bytes_needed);
397     if (self->part_size)
398        usable = MIN(usable, self->part_size - self->part_bytes_written);
399
400     return usable;
401 }
402
403 /* Mark WRITTEN bytes as free in the ring buffer.  Called with the ring mutex
404  * held. */
405 static void
406 device_thread_consume_block(
407     XferDestTaperSplitter *self,
408     gsize written)
409 {
410     self->ring_count -= written;
411     self->ring_tail += written;
412     if (self->ring_tail >= self->ring_length)
413         self->ring_tail -= self->ring_length;
414     g_cond_broadcast(self->ring_free_cond);
415 }
416
417 /* Write an entire part.  Called with the state_mutex held */
418 static XMsg *
419 device_thread_write_part(
420     XferDestTaperSplitter *self)
421 {
422     GTimer *timer = g_timer_new();
423     XferElement *elt = XFER_ELEMENT(self);
424
425     enum { PART_EOF, PART_LEOM, PART_EOP, PART_FAILED } part_status = PART_FAILED;
426     int fileno = 0;
427     XMsg *msg;
428
429     self->part_bytes_written = 0;
430
431     g_timer_start(timer);
432
433     /* write the header; if this fails or hits LEOM, we consider this a
434      * successful 0-byte part */
435     if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
436         part_status = PART_LEOM;
437         goto part_done;
438     }
439
440     fileno = self->device->file;
441     g_assert(fileno > 0);
442
443     /* free the header, now that it's written */
444     dumpfile_free(self->part_header);
445     self->part_header = NULL;
446
447     /* First, read the requisite number of bytes from the part_slices, if the part was
448      * unsuccessful. */
449     if (self->bytes_to_read_from_slices) {
450         SliceIterator iter;
451         gsize to_write = self->block_size;
452         gpointer buf = g_malloc(to_write);
453         gboolean successful = TRUE;
454         guint64 bytes_from_slices = self->bytes_to_read_from_slices;
455
456         DBG(5, "reading %ju bytes from slices", (uintmax_t)bytes_from_slices);
457
458         iterate_slices(self, &iter);
459         while (bytes_from_slices) {
460             gboolean ok;
461
462             if (!iterator_get_block(self, &iter, buf, to_write)) {
463                 part_status = PART_FAILED;
464                 successful = FALSE;
465                 break;
466             }
467
468             /* note that it's OK to reference these ring_* vars here, as they
469              * are static at this point */
470             ok = device_write_block(self->device, (guint)to_write, buf);
471
472             if (!ok) {
473                 part_status = PART_FAILED;
474                 successful = FALSE;
475                 break;
476             }
477
478             self->part_bytes_written += to_write;
479             bytes_from_slices -= to_write;
480
481             if (self->part_size && self->part_bytes_written >= self->part_size) {
482                 part_status = PART_EOP;
483                 successful = FALSE;
484                 break;
485             } else if (self->device->is_eom) {
486                 part_status = PART_LEOM;
487                 successful = FALSE;
488                 break;
489             }
490         }
491
492         iterator_free(&iter);
493         g_free(buf);
494
495         /* if we didn't finish, get out of here now */
496         if (!successful)
497             goto part_done;
498     }
499
500     g_mutex_lock(self->ring_mutex);
501     while (1) {
502         gsize to_write;
503         gboolean ok;
504
505         /* wait for at least one block, and (if necessary) prebuffer */
506         to_write = device_thread_wait_for_block(self);
507         to_write = MIN(to_write, self->device->block_size);
508         if (elt->cancelled)
509             break;
510
511         if (to_write == 0) {
512             part_status = PART_EOF;
513             break;
514         }
515
516         g_mutex_unlock(self->ring_mutex);
517         DBG(8, "writing %ju bytes to device", (uintmax_t)to_write);
518
519         /* note that it's OK to reference these ring_* vars here, as they
520          * are static at this point */
521         ok = device_write_block(self->device, (guint)to_write,
522                 self->ring_buffer + self->ring_tail);
523         g_mutex_lock(self->ring_mutex);
524
525         if (!ok) {
526             part_status = PART_FAILED;
527             break;
528         }
529
530         self->part_bytes_written += to_write;
531         device_thread_consume_block(self, to_write);
532
533         if (self->part_size && self->part_bytes_written >= self->part_size) {
534             part_status = PART_EOP;
535             break;
536         } else if (self->device->is_eom) {
537             part_status = PART_LEOM;
538             break;
539         }
540     }
541     g_mutex_unlock(self->ring_mutex);
542 part_done:
543
544     /* if we write all of the blocks, but the finish_file fails, then likely
545      * there was some buffering going on in the device driver, and the blocks
546      * did not all make it to permanent storage -- so it's a failed part.  Note
547      * that we try to finish_file even if the part failed, just to be thorough. */
548     if (self->device->in_file) {
549         if (!device_finish_file(self->device))
550             if (!elt->cancelled) {
551                 part_status = PART_FAILED;
552             }
553     }
554
555     g_timer_stop(timer);
556
557     msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
558     msg->size = self->part_bytes_written;
559     msg->duration = g_timer_elapsed(timer, NULL);
560     msg->partnum = self->partnum;
561     msg->fileno = fileno;
562     msg->successful = self->last_part_successful = part_status != PART_FAILED;
563     msg->eom = self->last_part_eom = part_status == PART_LEOM || self->device->is_eom;
564     msg->eof = self->last_part_eof = part_status == PART_EOF;
565
566     /* time runs backward on some test boxes, so make sure this is positive */
567     if (msg->duration < 0) msg->duration = 0;
568
569     if (msg->successful && msg->size > 0)
570         self->partnum++;
571     self->no_more_parts = msg->eof || (!msg->successful && !self->expect_cache_inform);
572
573     g_timer_destroy(timer);
574
575     return msg;
576 }
577
578 static gpointer
579 device_thread(
580     gpointer data)
581 {
582     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
583     XferElement *elt = XFER_ELEMENT(self);
584     XMsg *msg;
585
586     DBG(1, "(this is the device thread)");
587
588     /* This is the outer loop, that loops once for each split part written to
589      * tape. */
590     g_mutex_lock(self->state_mutex);
591     while (1) {
592         /* wait until the main thread un-pauses us, and check that we have
593          * the relevant device info available (block_size) */
594         while (self->paused && !elt->cancelled) {
595             DBG(9, "waiting to be unpaused");
596             g_cond_wait(self->state_cond, self->state_mutex);
597         }
598         DBG(9, "done waiting");
599
600         if (elt->cancelled)
601             break;
602
603         DBG(2, "beginning to write part");
604         msg = device_thread_write_part(self);
605         DBG(2, "done writing part");
606
607         if (!msg) /* cancelled */
608             break;
609
610         /* release the slices for this part, if there were any slices */
611         if (msg->successful && self->expect_cache_inform) {
612             fast_forward_slices(self, msg->size);
613         }
614
615         xfer_queue_message(elt->xfer, msg);
616
617         /* pause ourselves and await instructions from the main thread */
618         self->paused = TRUE;
619
620         /* if this is the last part, we're done with the part loop */
621         if (self->no_more_parts)
622             break;
623     }
624     g_mutex_unlock(self->state_mutex);
625
626     /* tell the main thread we're done */
627     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
628
629     return NULL;
630 }
631
632 /*
633  * Class mechanics
634  */
635
636 static void
637 push_buffer_impl(
638     XferElement *elt,
639     gpointer buf,
640     size_t size)
641 {
642     XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
643     gchar *p = buf;
644
645     DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
646
647     /* do nothing if cancelled */
648     if (G_UNLIKELY(elt->cancelled)) {
649         goto free_and_finish;
650     }
651
652     /* handle EOF */
653     if (G_UNLIKELY(buf == NULL)) {
654         /* indicate EOF to the device thread */
655         g_mutex_lock(self->ring_mutex);
656         self->ring_head_at_eof = TRUE;
657         g_cond_broadcast(self->ring_add_cond);
658         g_mutex_unlock(self->ring_mutex);
659         goto free_and_finish;
660     }
661
662     /* push the block into the ring buffer, in pieces if necessary */
663     g_mutex_lock(self->ring_mutex);
664     while (size > 0) {
665         gsize avail;
666
667         /* wait for some space */
668         while (self->ring_count == self->ring_length && !elt->cancelled) {
669             DBG(9, "waiting for any space to buffer pushed data");
670             g_cond_wait(self->ring_free_cond, self->ring_mutex);
671         }
672         DBG(9, "done waiting");
673
674         if (elt->cancelled)
675             goto unlock_and_free_and_finish;
676
677         /* only copy to the end of the buffer, if the available space wraps
678          * around to the beginning */
679         avail = MIN(size, self->ring_length - self->ring_count);
680         avail = MIN(avail, self->ring_length - self->ring_head);
681
682         /* copy AVAIL bytes into the ring buf (knowing it's contiguous) */
683         memmove(self->ring_buffer + self->ring_head, p, avail);
684
685         /* reset the ring variables to represent this state */
686         self->ring_count += avail;
687         self->ring_head += avail; /* will, at most, hit ring_length */
688         if (self->ring_head == self->ring_length)
689             self->ring_head = 0;
690         p = (gpointer)((guchar *)p + avail);
691         size -= avail;
692
693         /* and give the device thread a notice that data is ready */
694         g_cond_broadcast(self->ring_add_cond);
695     }
696
697 unlock_and_free_and_finish:
698     g_mutex_unlock(self->ring_mutex);
699
700 free_and_finish:
701     if (buf)
702         g_free(buf);
703 }
704
705 /*
706  * Element mechanics
707  */
708
709 static gboolean
710 start_impl(
711     XferElement *elt)
712 {
713     XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
714     GError *error = NULL;
715
716     self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
717     if (!self->device_thread) {
718         g_critical(_("Error creating new thread: %s (%s)"),
719             error->message, errno? strerror(errno) : _("no error code"));
720     }
721
722     return TRUE;
723 }
724
725 static gboolean
726 cancel_impl(
727     XferElement *elt,
728     gboolean expect_eof)
729 {
730     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
731     gboolean rv;
732
733     /* chain up first */
734     rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
735
736     /* then signal all of our condition variables, so that threads waiting on them
737      * wake up and see elt->cancelled. */
738     g_mutex_lock(self->ring_mutex);
739     g_cond_broadcast(self->ring_add_cond);
740     g_cond_broadcast(self->ring_free_cond);
741     g_mutex_unlock(self->ring_mutex);
742
743     g_mutex_lock(self->state_mutex);
744     g_cond_broadcast(self->state_cond);
745     g_mutex_unlock(self->state_mutex);
746
747     return rv;
748 }
749
750 static void
751 start_part_impl(
752     XferDestTaper *xdt,
753     gboolean retry_part,
754     dumpfile_t *header)
755 {
756     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
757
758     g_assert(self->device != NULL);
759     g_assert(!self->device->in_file);
760     g_assert(header != NULL);
761
762     DBG(1, "start_part()");
763
764     /* we can only retry the part if we're getting slices via cache_inform's */
765     if (retry_part) {
766         if (self->last_part_successful) {
767             xfer_cancel_with_error(XFER_ELEMENT(self),
768                 _("Previous part did not fail; cannot retry"));
769             return;
770         }
771
772         if (!self->expect_cache_inform) {
773             xfer_cancel_with_error(XFER_ELEMENT(self),
774                 _("No cache for previous failed part; cannot retry"));
775             return;
776         }
777
778         self->bytes_to_read_from_slices = self->part_bytes_written;
779     } else {
780         /* don't read any bytes from the slices, since we're not retrying */
781         self->bytes_to_read_from_slices = 0;
782     }
783
784     g_mutex_lock(self->state_mutex);
785     g_assert(self->paused);
786     g_assert(!self->no_more_parts);
787
788     if (self->part_header)
789         dumpfile_free(self->part_header);
790     self->part_header = dumpfile_copy(header);
791
792     DBG(1, "unpausing");
793     self->paused = FALSE;
794     g_cond_broadcast(self->state_cond);
795
796     g_mutex_unlock(self->state_mutex);
797 }
798
799 static void
800 use_device_impl(
801     XferDestTaper *xdtself,
802     Device *device)
803 {
804     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
805     StreamingRequirement newstreaming;
806     GValue val;
807
808     DBG(1, "use_device(%s)%s", device->device_name, (device == self->device)? " (no change)":"");
809
810     /* short-circuit if nothing is changing */
811     if (self->device == device)
812         return;
813
814     g_mutex_lock(self->state_mutex);
815     if (self->device)
816         g_object_unref(self->device);
817     self->device = device;
818     g_object_ref(device);
819
820     /* get this new device's streaming requirements */
821     bzero(&val, sizeof(val));
822     if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
823         || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
824         g_warning("Couldn't get streaming type for %s", self->device->device_name);
825     } else {
826         newstreaming = g_value_get_enum(&val);
827         if (newstreaming != self->streaming)
828             g_warning("New device has different streaming requirements from the original; "
829                     "ignoring new requirement");
830     }
831     g_value_unset(&val);
832
833     /* check that the blocksize hasn't changed */
834     if (self->block_size != device->block_size) {
835         g_mutex_unlock(self->state_mutex);
836         xfer_cancel_with_error(XFER_ELEMENT(self),
837             _("All devices used by the taper must have the same block size"));
838         return;
839     }
840     g_mutex_unlock(self->state_mutex);
841 }
842
843 static void
844 cache_inform_impl(
845     XferDestTaper *xdt,
846     const char *filename,
847     off_t offset,
848     off_t length)
849 {
850     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
851     FileSlice *slice = g_new(FileSlice, 1), *iter;
852
853     slice->next = NULL;
854     slice->filename = g_strdup(filename);
855     slice->offset = offset;
856     slice->length = length;
857
858     g_mutex_lock(self->part_slices_mutex);
859     if (self->part_slices) {
860         for (iter = self->part_slices; iter->next; iter = iter->next) {}
861         iter->next = slice;
862     } else {
863         self->part_slices = slice;
864     }
865     g_mutex_unlock(self->part_slices_mutex);
866 }
867
868 static guint64
869 get_part_bytes_written_impl(
870     XferDestTaper *xdtself)
871 {
872     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
873
874     /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
875      * partial write to the 64-bit value on a 32-bit system).  This is ok for
876      * the moment, as it's only informational, but be warned. */
877     if (self->device) {
878         return device_get_bytes_written(self->device);
879     } else {
880         return self->part_bytes_written;
881     }
882 }
883
884 static void
885 instance_init(
886     XferElement *elt)
887 {
888     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
889     elt->can_generate_eof = FALSE;
890
891     self->state_mutex = g_mutex_new();
892     self->state_cond = g_cond_new();
893     self->ring_mutex = g_mutex_new();
894     self->ring_add_cond = g_cond_new();
895     self->ring_free_cond = g_cond_new();
896     self->part_slices_mutex = g_mutex_new();
897
898     self->device = NULL;
899     self->paused = TRUE;
900     self->part_header = NULL;
901     self->partnum = 1;
902     self->part_bytes_written = 0;
903     self->part_slices = NULL;
904 }
905
906 static void
907 finalize_impl(
908     GObject * obj_self)
909 {
910     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(obj_self);
911     FileSlice *slice, *next_slice;
912
913     g_mutex_free(self->state_mutex);
914     g_cond_free(self->state_cond);
915
916     g_mutex_free(self->ring_mutex);
917     g_cond_free(self->ring_add_cond);
918     g_cond_free(self->ring_free_cond);
919
920     g_mutex_free(self->part_slices_mutex);
921
922     for (slice = self->part_slices; slice; slice = next_slice) {
923         next_slice = slice->next;
924         if (slice->filename)
925             g_free(slice->filename);
926         g_free(slice);
927     }
928
929     if (self->ring_buffer)
930         g_free(self->ring_buffer);
931
932     if (self->part_header)
933         dumpfile_free(self->part_header);
934
935     if (self->device)
936         g_object_unref(self->device);
937
938     /* chain up */
939     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
940 }
941
942 static void
943 class_init(
944     XferDestTaperSplitterClass * selfc)
945 {
946     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
947     XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
948     GObjectClass *goc = G_OBJECT_CLASS(selfc);
949     static xfer_element_mech_pair_t mech_pairs[] = {
950         { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 1},
951         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
952     };
953
954     klass->start = start_impl;
955     klass->cancel = cancel_impl;
956     klass->push_buffer = push_buffer_impl;
957     xdt_klass->start_part = start_part_impl;
958     xdt_klass->use_device = use_device_impl;
959     xdt_klass->cache_inform = cache_inform_impl;
960     xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
961     goc->finalize = finalize_impl;
962
963     klass->perl_class = "Amanda::Xfer::Dest::Taper::Splitter";
964     klass->mech_pairs = mech_pairs;
965
966     parent_class = g_type_class_peek_parent(selfc);
967 }
968
969 static GType
970 xfer_dest_taper_splitter_get_type (void)
971 {
972     static GType type = 0;
973
974     if G_UNLIKELY(type == 0) {
975         static const GTypeInfo info = {
976             sizeof (XferDestTaperSplitterClass),
977             (GBaseInitFunc) NULL,
978             (GBaseFinalizeFunc) NULL,
979             (GClassInitFunc) class_init,
980             (GClassFinalizeFunc) NULL,
981             NULL /* class_data */,
982             sizeof (XferDestTaperSplitter),
983             0 /* n_preallocs */,
984             (GInstanceInitFunc) instance_init,
985             NULL
986         };
987
988         type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperSplitter", &info, 0);
989     }
990
991     return type;
992 }
993
994 /*
995  * Constructor
996  */
997
998 XferElement *
999 xfer_dest_taper_splitter(
1000     Device *first_device,
1001     size_t max_memory,
1002     guint64 part_size,
1003     gboolean expect_cache_inform)
1004 {
1005     XferDestTaperSplitter *self = (XferDestTaperSplitter *)g_object_new(XFER_DEST_TAPER_SPLITTER_TYPE, NULL);
1006     GValue val;
1007
1008     /* max_memory and part_size get rounded up to the next multiple of
1009      * block_size */
1010     max_memory = ((max_memory + first_device->block_size - 1)
1011                         / first_device->block_size) * first_device->block_size;
1012     if (part_size)
1013         part_size = ((part_size + first_device->block_size - 1)
1014                             / first_device->block_size) * first_device->block_size;
1015
1016     self->part_size = part_size;
1017     self->partnum = 1;
1018     self->device = first_device;
1019
1020     g_object_ref(self->device);
1021     self->block_size = first_device->block_size;
1022     self->paused = TRUE;
1023     self->no_more_parts = FALSE;
1024
1025     /* set up a ring buffer of size max_memory */
1026     self->ring_length = max_memory;
1027     self->ring_buffer = g_malloc(max_memory);
1028     self->ring_head = self->ring_tail = 0;
1029     self->ring_count = 0;
1030     self->ring_head_at_eof = 0;
1031
1032     /* get this new device's streaming requirements */
1033     bzero(&val, sizeof(val));
1034     if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
1035         || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
1036         g_warning("Couldn't get streaming type for %s", self->device->device_name);
1037         self->streaming = STREAMING_REQUIREMENT_REQUIRED;
1038     } else {
1039         self->streaming = g_value_get_enum(&val);
1040     }
1041     g_value_unset(&val);
1042
1043     /* grab data from cache_inform, just in case we hit PEOM */
1044     self->expect_cache_inform = expect_cache_inform;
1045
1046     return XFER_ELEMENT(self);
1047 }