ccd7fdec5deb6dae76362f4bf55700d53e060334
[debian/amanda] / device-src / xfer-dest-taper-splitter.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amanda.h"
23 #include "amxfer.h"
24 #include "xfer-device.h"
25 #include "arglist.h"
26 #include "conffile.h"
27
28 /* A transfer destination that writes an entire dumpfile to one or more files
29  * on one or more devices, without any caching.  This destination supports both
30  * LEOM-based splitting (in which parts are never rewound) and cache_inform-based
31  * splitting (in which rewound parts are read from holding disk). */
32
33 /*
34  * File Slices - Cache Information
35  *
36  * The cache_inform implementation adds cache information to a linked list of
37  * these objects, in order.  The objects are arranged in a linked list, and
38  * describe the files in which the part data is stored.  Note that we assume
39  * that slices are added *before* they are needed: the xfer element will fail
40  * if it tries to rewind and does not find a suitable slice.
41  *
42  * The slices should be "fast forwarded" after every part, so that the first
43  * byte in part_slices is the first byte of the part; when a retry of a part is
44  * required, use the iterator methods to properly open the various files and do
45  * the buffering.
46  */
47
48 typedef struct FileSlice {
49     struct FileSlice *next;
50
51     /* fully-qualified filename to read from (or NULL to read from
52      * disk_cache_read_fd in XferDestTaperCacher) */
53     char *filename;
54
55     /* offset in file to start at */
56     guint64 offset;
57
58     /* length of data to read */
59     guint64 length;
60 } FileSlice;
61
62 /*
63  * Xfer Dest Taper
64  */
65
66 static GObjectClass *parent_class = NULL;
67
68 typedef struct XferDestTaperSplitter {
69     XferDestTaper __parent__;
70
71     /* object parameters
72      *
73      * These values are supplied to the constructor, and can be assumed
74      * constant for the lifetime of the element.
75      */
76
77     /* Maximum size of each part (bytes) */
78     guint64 part_size;
79
80     /* the device's need for streaming (it's assumed that all subsequent devices
81      * have the same needs) */
82     StreamingRequirement streaming;
83
84     /* block size expected by the target device */
85     gsize block_size;
86
87     /* TRUE if this element is expecting slices via cache_inform */
88     gboolean expect_cache_inform;
89
90     /* The thread doing the actual writes to tape; this also handles buffering
91      * for streaming */
92     GThread *device_thread;
93
94     /* Ring Buffer
95      *
96      * This buffer holds MAX_MEMORY bytes of data (rounded up to the next
97      * blocksize), and serves as the interface between the device_thread and
98      * the thread calling push_buffer.  Ring_length is the total length of the
99      * buffer in bytes, while ring_count is the number of data bytes currently
100      * in the buffer.  The ring_add_cond is signalled when data is added to the
101      * buffer, while ring_free_cond is signalled when data is removed.  Both
102      * are governed by ring_mutex, and both are signalled when the transfer is
103      * cancelled.
104      */
105
106     GMutex *ring_mutex;
107     GCond *ring_add_cond, *ring_free_cond;
108     gchar *ring_buffer;
109     gsize ring_length, ring_count;
110     gsize ring_head, ring_tail;
111     gboolean ring_head_at_eof;
112
113     /* Element State
114      *
115      * "state" includes all of the variables below (including device
116      * parameters).  Note that the device_thread holdes this mutex for the
117      * entire duration of writing a part.
118      *
119      * state_mutex should always be locked before ring_mutex, if both are to be
120      * held simultaneously.
121      */
122     GMutex *state_mutex;
123     GCond *state_cond;
124     volatile gboolean paused;
125
126     /* The device to write to, and the header to write to it */
127     Device *volatile device;
128     dumpfile_t *volatile part_header;
129
130     /* bytes to read from cached slices before reading from the ring buffer */
131     guint64 bytes_to_read_from_slices;
132
133     /* part number in progress */
134     volatile guint64 partnum;
135
136     /* status of the last part */
137     gboolean last_part_eof;
138     gboolean last_part_eom;
139     gboolean last_part_successful;
140
141     /* true if the element is done writing to devices */
142     gboolean no_more_parts;
143
144     /* total bytes written in the current part */
145     volatile guint64 part_bytes_written;
146
147     /* The list of active slices for the current part.  The cache_inform method
148      * appends to this list. It is safe to read this linked list, beginning at
149      * the head, *if* you can guarantee that slices will not be fast-forwarded
150      * in the interim.  The finalize method for this class will take care of
151      * freeing any leftover slices. Take the part_slices mutex while modifying
152      * the links in this list. */
153     FileSlice *part_slices;
154     GMutex *part_slices_mutex;
155 } XferDestTaperSplitter;
156
157 static GType xfer_dest_taper_splitter_get_type(void);
158 #define XFER_DEST_TAPER_SPLITTER_TYPE (xfer_dest_taper_splitter_get_type())
159 #define XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter)
160 #define XFER_DEST_TAPER_SPLITTER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter const)
161 #define XFER_DEST_TAPER_SPLITTER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
162 #define IS_XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_splitter_get_type ())
163 #define XFER_DEST_TAPER_SPLITTER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
164
165 typedef struct {
166     XferDestTaperClass __parent__;
167
168 } XferDestTaperSplitterClass;
169
170 /*
171  * Debug logging
172  */
173
174 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
175 static void
176 _xdt_dbg(const char *fmt, ...)
177 {
178     va_list argp;
179     char msg[1024];
180
181     arglist_start(argp, fmt);
182     g_vsnprintf(msg, sizeof(msg), fmt, argp);
183     arglist_end(argp);
184     g_debug("XDT: %s", msg);
185 }
186
187 /* "Fast forward" the slice list by the given length.  This will free any
188  * slices that are no longer necessary, and adjust the offset and length of the
189  * first remaining slice.  This assumes the state mutex is locked during its
190  * operation.
191  *
192  * @param self: element
193  * @param length: number of bytes to fast forward
194  */
195 static void
196 fast_forward_slices(
197         XferDestTaperSplitter *self,
198         guint64 length)
199 {
200     FileSlice *slice;
201
202     /* consume slices until we've eaten the whole part */
203     g_mutex_lock(self->part_slices_mutex);
204     while (length > 0) {
205         g_assert(self->part_slices);
206         slice = self->part_slices;
207
208         if (slice->length <= length) {
209             length -= slice->length;
210
211             self->part_slices = slice->next;
212             if (slice->filename)
213                 g_free(slice->filename);
214             g_free(slice);
215             slice = self->part_slices;
216         } else {
217             slice->length -= length;
218             slice->offset += length;
219             break;
220         }
221     }
222     g_mutex_unlock(self->part_slices_mutex);
223 }
224
225 /*
226  * Slice Iterator
227  */
228
229 /* A struct for use in iterating over data in the slices */
230 typedef struct SliceIterator {
231     /* current slice */
232     FileSlice *slice;
233
234     /* file descriptor of the current file, or -1 if it's not open yet */
235     int cur_fd;
236
237     /* bytes remaining in this slice */
238     guint64 slice_remaining;
239 } SliceIterator;
240
241 /* Utility functions for SliceIterator */
242
243 /* Begin iterating over slices, starting at the first byte of the first slice.
244  * Initializes a pre-allocated SliceIterator.  The caller must ensure that
245  * fast_forward_slices is not called while an iteration is in
246  * progress.
247  */
248 static void
249 iterate_slices(
250         XferDestTaperSplitter *self,
251         SliceIterator *iter)
252 {
253     iter->cur_fd = -1;
254     iter->slice_remaining = 0;
255     g_mutex_lock(self->part_slices_mutex);
256     iter->slice = self->part_slices;
257     /* it's safe to unlock this because, at worst, a new entry will
258      * be appended while the iterator is in progress */
259     g_mutex_unlock(self->part_slices_mutex);
260 }
261
262
263 /* Get a block of data from the iterator, returning a pointer to a buffer
264  * containing the data; the buffer remains the property of the iterator.
265  * Returns NULL on error, after calling xfer_cancel_with_error with an
266  * appropriate error message.  This function does not block, so it does not
267  * check for cancellation.
268  */
269 static gpointer
270 iterator_get_block(
271         XferDestTaperSplitter *self,
272         SliceIterator *iter,
273         gpointer buf,
274         gsize bytes_needed)
275 {
276     gsize buf_offset = 0;
277     XferElement *elt = XFER_ELEMENT(self);
278
279     g_assert(iter != NULL);
280     g_assert(buf != NULL);
281
282     while (bytes_needed > 0) {
283         gsize read_size;
284         int bytes_read;
285
286         if (iter->cur_fd < 0) {
287             guint64 offset;
288
289             g_assert(iter->slice != NULL);
290             g_assert(iter->slice->filename != NULL);
291
292             iter->cur_fd = open(iter->slice->filename, O_RDONLY, 0);
293             if (iter->cur_fd < 0) {
294                 xfer_cancel_with_error(elt,
295                     _("Could not open '%s' for reading: %s"),
296                     iter->slice->filename, strerror(errno));
297                 return NULL;
298             }
299
300             iter->slice_remaining = iter->slice->length;
301             offset = iter->slice->offset;
302
303             if (lseek(iter->cur_fd, offset, SEEK_SET) == -1) {
304                 xfer_cancel_with_error(elt,
305                     _("Could not seek '%s' for reading: %s"),
306                     iter->slice->filename, strerror(errno));
307                 return NULL;
308             }
309         }
310
311         read_size = MIN(iter->slice_remaining, bytes_needed);
312         bytes_read = full_read(iter->cur_fd,
313                                buf + buf_offset,
314                                read_size);
315         if (bytes_read < 0 || (gsize)bytes_read < read_size) {
316             xfer_cancel_with_error(elt,
317                 _("Error reading '%s': %s"),
318                 iter->slice->filename,
319                 errno? strerror(errno) : _("Unexpected EOF"));
320             return NULL;
321         }
322
323         iter->slice_remaining -= bytes_read;
324         buf_offset += bytes_read;
325         bytes_needed -= bytes_read;
326
327         if (iter->slice_remaining <= 0) {
328             if (close(iter->cur_fd) < 0) {
329                 xfer_cancel_with_error(elt,
330                     _("Could not close fd %d: %s"),
331                     iter->cur_fd, strerror(errno));
332                 return NULL;
333             }
334             iter->cur_fd = -1;
335
336             iter->slice = iter->slice->next;
337
338             if (elt->cancelled)
339                 return NULL;
340         }
341     }
342
343     return buf;
344 }
345
346
347 /* Free the iterator's resources */
348 static void
349 iterator_free(
350         SliceIterator *iter)
351 {
352     if (iter->cur_fd >= 0)
353         close(iter->cur_fd);
354 }
355
356 /*
357  * Device Thread
358  */
359
360 /* Wait for at least one block, or EOF, to be available in the ring buffer.
361  * Called with the ring mutex held. */
362 static gsize
363 device_thread_wait_for_block(
364     XferDestTaperSplitter *self)
365 {
366     XferElement *elt = XFER_ELEMENT(self);
367     gsize bytes_needed = self->device->block_size;
368     gsize usable;
369
370     /* for any kind of streaming, we need to fill the entire buffer before the
371      * first byte */
372     if (self->part_bytes_written == 0 && self->streaming != STREAMING_REQUIREMENT_NONE)
373         bytes_needed = self->ring_length;
374
375     while (1) {
376         /* are we ready? */
377         if (elt->cancelled)
378             break;
379
380         if (self->ring_count >= bytes_needed)
381             break;
382
383         if (self->ring_head_at_eof)
384             break;
385
386         /* nope - so wait */
387         g_cond_wait(self->ring_add_cond, self->ring_mutex);
388
389         /* in STREAMING_REQUIREMENT_REQUIRED, once we decide to wait for more bytes,
390          * we need to wait for the entire buffer to fill */
391         if (self->streaming == STREAMING_REQUIREMENT_REQUIRED)
392             bytes_needed = self->ring_length;
393     }
394
395     usable = MIN(self->ring_count, bytes_needed);
396     if (self->part_size)
397        usable = MIN(usable, self->part_size - self->part_bytes_written);
398
399     return usable;
400 }
401
402 /* Mark WRITTEN bytes as free in the ring buffer.  Called with the ring mutex
403  * held. */
404 static void
405 device_thread_consume_block(
406     XferDestTaperSplitter *self,
407     gsize written)
408 {
409     self->ring_count -= written;
410     self->ring_tail += written;
411     if (self->ring_tail >= self->ring_length)
412         self->ring_tail -= self->ring_length;
413     g_cond_broadcast(self->ring_free_cond);
414 }
415
416 /* Write an entire part.  Called with the state_mutex held */
417 static XMsg *
418 device_thread_write_part(
419     XferDestTaperSplitter *self)
420 {
421     GTimer *timer = g_timer_new();
422     XferElement *elt = XFER_ELEMENT(self);
423
424     enum { PART_EOF, PART_LEOM, PART_EOP, PART_FAILED } part_status = PART_FAILED;
425     int fileno = 0;
426     XMsg *msg;
427
428     self->part_bytes_written = 0;
429
430     g_timer_start(timer);
431
432     /* write the header; if this fails or hits LEOM, we consider this a
433      * successful 0-byte part */
434     if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
435         part_status = PART_LEOM;
436         goto part_done;
437     }
438
439     fileno = self->device->file;
440     g_assert(fileno > 0);
441
442     /* free the header, now that it's written */
443     dumpfile_free(self->part_header);
444     self->part_header = NULL;
445
446     /* First, read the requisite number of bytes from the part_slices, if the part was
447      * unsuccessful. */
448     if (self->bytes_to_read_from_slices) {
449         SliceIterator iter;
450         gsize to_write = self->block_size;
451         gpointer buf = g_malloc(to_write);
452         gboolean successful = TRUE;
453         guint64 bytes_from_slices = self->bytes_to_read_from_slices;
454
455         DBG(5, "reading %ju bytes from slices", (uintmax_t)bytes_from_slices);
456
457         iterate_slices(self, &iter);
458         while (bytes_from_slices) {
459             gboolean ok;
460
461             if (!iterator_get_block(self, &iter, buf, to_write)) {
462                 part_status = PART_FAILED;
463                 successful = FALSE;
464                 break;
465             }
466
467             /* note that it's OK to reference these ring_* vars here, as they
468              * are static at this point */
469             ok = device_write_block(self->device, (guint)to_write, buf);
470
471             if (!ok) {
472                 part_status = PART_FAILED;
473                 successful = FALSE;
474                 break;
475             }
476
477             self->part_bytes_written += to_write;
478             bytes_from_slices -= to_write;
479
480             if (self->part_size && self->part_bytes_written >= self->part_size) {
481                 part_status = PART_EOP;
482                 successful = FALSE;
483                 break;
484             } else if (self->device->is_eom) {
485                 part_status = PART_LEOM;
486                 successful = FALSE;
487                 break;
488             }
489         }
490
491         iterator_free(&iter);
492         g_free(buf);
493
494         /* if we didn't finish, get out of here now */
495         if (!successful)
496             goto part_done;
497     }
498
499     g_mutex_lock(self->ring_mutex);
500     while (1) {
501         gsize to_write;
502         gboolean ok;
503
504         /* wait for at least one block, and (if necessary) prebuffer */
505         to_write = device_thread_wait_for_block(self);
506         to_write = MIN(to_write, self->device->block_size);
507         if (elt->cancelled)
508             break;
509
510         if (to_write == 0) {
511             part_status = PART_EOF;
512             break;
513         }
514
515         g_mutex_unlock(self->ring_mutex);
516         DBG(8, "writing %ju bytes to device", (uintmax_t)to_write);
517
518         /* note that it's OK to reference these ring_* vars here, as they
519          * are static at this point */
520         ok = device_write_block(self->device, (guint)to_write,
521                 self->ring_buffer + self->ring_tail);
522         g_mutex_lock(self->ring_mutex);
523
524         if (!ok) {
525             part_status = PART_FAILED;
526             break;
527         }
528
529         self->part_bytes_written += to_write;
530         device_thread_consume_block(self, to_write);
531
532         if (self->part_size && self->part_bytes_written >= self->part_size) {
533             part_status = PART_EOP;
534             break;
535         } else if (self->device->is_eom) {
536             part_status = PART_LEOM;
537             break;
538         }
539     }
540     g_mutex_unlock(self->ring_mutex);
541 part_done:
542
543     /* if we write all of the blocks, but the finish_file fails, then likely
544      * there was some buffering going on in the device driver, and the blocks
545      * did not all make it to permanent storage -- so it's a failed part.  Note
546      * that we try to finish_file even if the part failed, just to be thorough. */
547     if (self->device->in_file) {
548         if (!device_finish_file(self->device))
549             if (!elt->cancelled) {
550                 part_status = PART_FAILED;
551             }
552     }
553
554     g_timer_stop(timer);
555
556     msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
557     msg->size = self->part_bytes_written;
558     msg->duration = g_timer_elapsed(timer, NULL);
559     msg->partnum = self->partnum;
560     msg->fileno = fileno;
561     msg->successful = self->last_part_successful = part_status != PART_FAILED;
562     msg->eom = self->last_part_eom = part_status == PART_LEOM || self->device->is_eom;
563     msg->eof = self->last_part_eof = part_status == PART_EOF;
564
565     /* time runs backward on some test boxes, so make sure this is positive */
566     if (msg->duration < 0) msg->duration = 0;
567
568     if (msg->successful)
569         self->partnum++;
570     self->no_more_parts = msg->eof || (!msg->successful && !self->expect_cache_inform);
571
572     g_timer_destroy(timer);
573
574     return msg;
575 }
576
577 static gpointer
578 device_thread(
579     gpointer data)
580 {
581     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
582     XferElement *elt = XFER_ELEMENT(self);
583     XMsg *msg;
584
585     DBG(1, "(this is the device thread)");
586
587     /* This is the outer loop, that loops once for each split part written to
588      * tape. */
589     g_mutex_lock(self->state_mutex);
590     while (1) {
591         /* wait until the main thread un-pauses us, and check that we have
592          * the relevant device info available (block_size) */
593         while (self->paused && !elt->cancelled) {
594             DBG(9, "waiting to be unpaused");
595             g_cond_wait(self->state_cond, self->state_mutex);
596         }
597         DBG(9, "done waiting");
598
599         if (elt->cancelled)
600             break;
601
602         DBG(2, "beginning to write part");
603         msg = device_thread_write_part(self);
604         DBG(2, "done writing part");
605
606         if (!msg) /* cancelled */
607             break;
608
609         /* release the slices for this part, if there were any slices */
610         if (msg->successful && self->expect_cache_inform) {
611             fast_forward_slices(self, msg->size);
612         }
613
614         xfer_queue_message(elt->xfer, msg);
615
616         /* pause ourselves and await instructions from the main thread */
617         self->paused = TRUE;
618
619         /* if this is the last part, we're done with the part loop */
620         if (self->no_more_parts)
621             break;
622     }
623     g_mutex_unlock(self->state_mutex);
624
625     /* tell the main thread we're done */
626     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
627
628     return NULL;
629 }
630
631 /*
632  * Class mechanics
633  */
634
635 static void
636 push_buffer_impl(
637     XferElement *elt,
638     gpointer buf,
639     size_t size)
640 {
641     XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
642     gchar *p = buf;
643
644     DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
645
646     /* do nothing if cancelled */
647     if (G_UNLIKELY(elt->cancelled)) {
648         goto free_and_finish;
649     }
650
651     /* handle EOF */
652     if (G_UNLIKELY(buf == NULL)) {
653         /* indicate EOF to the device thread */
654         g_mutex_lock(self->ring_mutex);
655         self->ring_head_at_eof = TRUE;
656         g_cond_broadcast(self->ring_add_cond);
657         g_mutex_unlock(self->ring_mutex);
658         goto free_and_finish;
659     }
660
661     /* push the block into the ring buffer, in pieces if necessary */
662     g_mutex_lock(self->ring_mutex);
663     while (size > 0) {
664         gsize avail;
665
666         /* wait for some space */
667         while (self->ring_count == self->ring_length && !elt->cancelled) {
668             DBG(9, "waiting for any space to buffer pushed data");
669             g_cond_wait(self->ring_free_cond, self->ring_mutex);
670         }
671         DBG(9, "done waiting");
672
673         if (elt->cancelled)
674             goto unlock_and_free_and_finish;
675
676         /* only copy to the end of the buffer, if the available space wraps
677          * around to the beginning */
678         avail = MIN(size, self->ring_length - self->ring_count);
679         avail = MIN(avail, self->ring_length - self->ring_head);
680
681         /* copy AVAIL bytes into the ring buf (knowing it's contiguous) */
682         memmove(self->ring_buffer + self->ring_head, p, avail);
683
684         /* reset the ring variables to represent this state */
685         self->ring_count += avail;
686         self->ring_head += avail; /* will, at most, hit ring_length */
687         if (self->ring_head == self->ring_length)
688             self->ring_head = 0;
689         p = (gpointer)((guchar *)p + avail);
690         size -= avail;
691
692         /* and give the device thread a notice that data is ready */
693         g_cond_broadcast(self->ring_add_cond);
694     }
695
696 unlock_and_free_and_finish:
697     g_mutex_unlock(self->ring_mutex);
698
699 free_and_finish:
700     if (buf)
701         g_free(buf);
702 }
703
704 /*
705  * Element mechanics
706  */
707
708 static gboolean
709 start_impl(
710     XferElement *elt)
711 {
712     XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
713     GError *error = NULL;
714
715     self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
716     if (!self->device_thread) {
717         g_critical(_("Error creating new thread: %s (%s)"),
718             error->message, errno? strerror(errno) : _("no error code"));
719     }
720
721     return TRUE;
722 }
723
724 static gboolean
725 cancel_impl(
726     XferElement *elt,
727     gboolean expect_eof)
728 {
729     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
730     gboolean rv;
731
732     /* chain up first */
733     rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
734
735     /* then signal all of our condition variables, so that threads waiting on them
736      * wake up and see elt->cancelled. */
737     g_mutex_lock(self->ring_mutex);
738     g_cond_broadcast(self->ring_add_cond);
739     g_cond_broadcast(self->ring_free_cond);
740     g_mutex_unlock(self->ring_mutex);
741
742     g_mutex_lock(self->state_mutex);
743     g_cond_broadcast(self->state_cond);
744     g_mutex_unlock(self->state_mutex);
745
746     return rv;
747 }
748
749 static void
750 start_part_impl(
751     XferDestTaper *xdt,
752     gboolean retry_part,
753     dumpfile_t *header)
754 {
755     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
756
757     g_assert(self->device != NULL);
758     g_assert(!self->device->in_file);
759     g_assert(header != NULL);
760
761     DBG(1, "start_part()");
762
763     /* we can only retry the part if we're getting slices via cache_inform's */
764     if (retry_part) {
765         if (self->last_part_successful) {
766             xfer_cancel_with_error(XFER_ELEMENT(self),
767                 _("Previous part did not fail; cannot retry"));
768             return;
769         }
770
771         if (!self->expect_cache_inform) {
772             xfer_cancel_with_error(XFER_ELEMENT(self),
773                 _("No cache for previous failed part; cannot retry"));
774             return;
775         }
776
777         self->bytes_to_read_from_slices = self->part_bytes_written;
778     } else {
779         /* don't read any bytes from the slices, since we're not retrying */
780         self->bytes_to_read_from_slices = 0;
781     }
782
783     g_mutex_lock(self->state_mutex);
784     g_assert(self->paused);
785     g_assert(!self->no_more_parts);
786
787     if (self->part_header)
788         dumpfile_free(self->part_header);
789     self->part_header = dumpfile_copy(header);
790
791     DBG(1, "unpausing");
792     self->paused = FALSE;
793     g_cond_broadcast(self->state_cond);
794
795     g_mutex_unlock(self->state_mutex);
796 }
797
798 static void
799 use_device_impl(
800     XferDestTaper *xdtself,
801     Device *device)
802 {
803     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
804     StreamingRequirement newstreaming;
805     GValue val;
806
807     DBG(1, "use_device(%s)%s", device->device_name, (device == self->device)? " (no change)":"");
808
809     /* short-circuit if nothing is changing */
810     if (self->device == device)
811         return;
812
813     g_mutex_lock(self->state_mutex);
814     if (self->device)
815         g_object_unref(self->device);
816     self->device = device;
817     g_object_ref(device);
818
819     /* get this new device's streaming requirements */
820     bzero(&val, sizeof(val));
821     if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
822         || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
823         g_warning("Couldn't get streaming type for %s", self->device->device_name);
824     } else {
825         newstreaming = g_value_get_enum(&val);
826         if (newstreaming != self->streaming)
827             g_warning("New device has different streaming requirements from the original; "
828                     "ignoring new requirement");
829     }
830     g_value_unset(&val);
831
832     /* check that the blocksize hasn't changed */
833     if (self->block_size != device->block_size) {
834         g_mutex_unlock(self->state_mutex);
835         xfer_cancel_with_error(XFER_ELEMENT(self),
836             _("All devices used by the taper must have the same block size"));
837         return;
838     }
839     g_mutex_unlock(self->state_mutex);
840 }
841
842 static void
843 cache_inform_impl(
844     XferDestTaper *xdt,
845     const char *filename,
846     off_t offset,
847     off_t length)
848 {
849     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
850     FileSlice *slice = g_new(FileSlice, 1), *iter;
851
852     slice->next = NULL;
853     slice->filename = g_strdup(filename);
854     slice->offset = offset;
855     slice->length = length;
856
857     g_mutex_lock(self->part_slices_mutex);
858     if (self->part_slices) {
859         for (iter = self->part_slices; iter->next; iter = iter->next) {}
860         iter->next = slice;
861     } else {
862         self->part_slices = slice;
863     }
864     g_mutex_unlock(self->part_slices_mutex);
865 }
866
867 static guint64
868 get_part_bytes_written_impl(
869     XferDestTaper *xdtself)
870 {
871     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
872
873     /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
874      * partial write to the 64-bit value on a 32-bit system).  This is ok for
875      * the moment, as it's only informational, but be warned. */
876     return self->part_bytes_written;
877 }
878
879 static void
880 instance_init(
881     XferElement *elt)
882 {
883     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
884     elt->can_generate_eof = FALSE;
885
886     self->state_mutex = g_mutex_new();
887     self->state_cond = g_cond_new();
888     self->ring_mutex = g_mutex_new();
889     self->ring_add_cond = g_cond_new();
890     self->ring_free_cond = g_cond_new();
891     self->part_slices_mutex = g_mutex_new();
892
893     self->device = NULL;
894     self->paused = TRUE;
895     self->part_header = NULL;
896     self->partnum = 1;
897     self->part_bytes_written = 0;
898     self->part_slices = NULL;
899 }
900
901 static void
902 finalize_impl(
903     GObject * obj_self)
904 {
905     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(obj_self);
906     FileSlice *slice, *next_slice;
907
908     g_mutex_free(self->state_mutex);
909     g_cond_free(self->state_cond);
910
911     g_mutex_free(self->ring_mutex);
912     g_cond_free(self->ring_add_cond);
913     g_cond_free(self->ring_free_cond);
914
915     g_mutex_free(self->part_slices_mutex);
916
917     for (slice = self->part_slices; slice; slice = next_slice) {
918         next_slice = slice->next;
919         if (slice->filename)
920             g_free(slice->filename);
921         g_free(slice);
922     }
923
924     if (self->ring_buffer)
925         g_free(self->ring_buffer);
926
927     if (self->part_header)
928         dumpfile_free(self->part_header);
929
930     if (self->device)
931         g_object_unref(self->device);
932
933     /* chain up */
934     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
935 }
936
937 static void
938 class_init(
939     XferDestTaperSplitterClass * selfc)
940 {
941     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
942     XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
943     GObjectClass *goc = G_OBJECT_CLASS(selfc);
944     static xfer_element_mech_pair_t mech_pairs[] = {
945         { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 1},
946         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
947     };
948
949     klass->start = start_impl;
950     klass->cancel = cancel_impl;
951     klass->push_buffer = push_buffer_impl;
952     xdt_klass->start_part = start_part_impl;
953     xdt_klass->use_device = use_device_impl;
954     xdt_klass->cache_inform = cache_inform_impl;
955     xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
956     goc->finalize = finalize_impl;
957
958     klass->perl_class = "Amanda::Xfer::Dest::Taper::Splitter";
959     klass->mech_pairs = mech_pairs;
960
961     parent_class = g_type_class_peek_parent(selfc);
962 }
963
964 static GType
965 xfer_dest_taper_splitter_get_type (void)
966 {
967     static GType type = 0;
968
969     if G_UNLIKELY(type == 0) {
970         static const GTypeInfo info = {
971             sizeof (XferDestTaperSplitterClass),
972             (GBaseInitFunc) NULL,
973             (GBaseFinalizeFunc) NULL,
974             (GClassInitFunc) class_init,
975             (GClassFinalizeFunc) NULL,
976             NULL /* class_data */,
977             sizeof (XferDestTaperSplitter),
978             0 /* n_preallocs */,
979             (GInstanceInitFunc) instance_init,
980             NULL
981         };
982
983         type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperSplitter", &info, 0);
984     }
985
986     return type;
987 }
988
989 /*
990  * Constructor
991  */
992
993 XferElement *
994 xfer_dest_taper_splitter(
995     Device *first_device,
996     size_t max_memory,
997     guint64 part_size,
998     gboolean expect_cache_inform)
999 {
1000     XferDestTaperSplitter *self = (XferDestTaperSplitter *)g_object_new(XFER_DEST_TAPER_SPLITTER_TYPE, NULL);
1001     GValue val;
1002
1003     /* max_memory and part_size get rounded up to the next multiple of
1004      * block_size */
1005     max_memory = ((max_memory + first_device->block_size - 1)
1006                         / first_device->block_size) * first_device->block_size;
1007     if (part_size)
1008         part_size = ((part_size + first_device->block_size - 1)
1009                             / first_device->block_size) * first_device->block_size;
1010
1011     self->part_size = part_size;
1012     self->partnum = 1;
1013     self->device = first_device;
1014
1015     g_object_ref(self->device);
1016     self->block_size = first_device->block_size;
1017     self->paused = TRUE;
1018     self->no_more_parts = FALSE;
1019
1020     /* set up a ring buffer of size max_memory */
1021     self->ring_length = max_memory;
1022     self->ring_buffer = g_malloc(max_memory);
1023     self->ring_head = self->ring_tail = 0;
1024     self->ring_count = 0;
1025     self->ring_head_at_eof = 0;
1026
1027     /* get this new device's streaming requirements */
1028     bzero(&val, sizeof(val));
1029     if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
1030         || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
1031         g_warning("Couldn't get streaming type for %s", self->device->device_name);
1032         self->streaming = STREAMING_REQUIREMENT_REQUIRED;
1033     } else {
1034         self->streaming = g_value_get_enum(&val);
1035     }
1036     g_value_unset(&val);
1037
1038     /* grab data from cache_inform, just in case we hit PEOM */
1039     self->expect_cache_inform = expect_cache_inform;
1040
1041     return XFER_ELEMENT(self);
1042 }