Imported Upstream version 3.2.0
[debian/amanda] / device-src / xfer-dest-taper-splitter.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amanda.h"
23 #include "amxfer.h"
24 #include "xfer-device.h"
25 #include "arglist.h"
26 #include "conffile.h"
27
28 /* A transfer destination that writes an entire dumpfile to one or more files
29  * on one or more devices, without any caching.  This destination supports both
30  * LEOM-based splitting (in which parts are never rewound) and cache_inform-based
31  * splitting (in which rewound parts are read from holding disk). */
32
33 /*
34  * File Slices - Cache Information
35  *
36  * The cache_inform implementation adds cache information to a linked list of
37  * these objects, in order.  The objects are arranged in a linked list, and
38  * describe the files in which the part data is stored.  Note that we assume
39  * that slices are added *before* they are needed: the xfer element will fail
40  * if it tries to rewind and does not find a suitable slice.
41  *
42  * The slices should be "fast forwarded" after every part, so that the first
43  * byte in part_slices is the first byte of the part; when a retry of a part is
44  * required, use the iterator methods to properly open the various files and do
45  * the buffering.
46  */
47
48 typedef struct FileSlice {
49     struct FileSlice *next;
50
51     /* fully-qualified filename to read from (or NULL to read from
52      * disk_cache_read_fd in XferDestTaperCacher) */
53     char *filename;
54
55     /* offset in file to start at */
56     guint64 offset;
57
58     /* length of data to read */
59     guint64 length;
60 } FileSlice;
61
62 /*
63  * Xfer Dest Taper
64  */
65
66 static GObjectClass *parent_class = NULL;
67
68 typedef struct XferDestTaperSplitter {
69     XferDestTaper __parent__;
70
71     /* object parameters
72      *
73      * These values are supplied to the constructor, and can be assumed
74      * constant for the lifetime of the element.
75      */
76
77     /* Maximum size of each part (bytes) */
78     guint64 part_size;
79
80     /* the device's need for streaming (it's assumed that all subsequent devices
81      * have the same needs) */
82     StreamingRequirement streaming;
83
84     /* block size expected by the target device */
85     gsize block_size;
86
87     /* TRUE if this element is expecting slices via cache_inform */
88     gboolean expect_cache_inform;
89
90     /* The thread doing the actual writes to tape; this also handles buffering
91      * for streaming */
92     GThread *device_thread;
93
94     /* Ring Buffer
95      *
96      * This buffer holds MAX_MEMORY bytes of data (rounded up to the next
97      * blocksize), and serves as the interface between the device_thread and
98      * the thread calling push_buffer.  Ring_length is the total length of the
99      * buffer in bytes, while ring_count is the number of data bytes currently
100      * in the buffer.  The ring_add_cond is signalled when data is added to the
101      * buffer, while ring_free_cond is signalled when data is removed.  Both
102      * are governed by ring_mutex, and both are signalled when the transfer is
103      * cancelled.
104      */
105
106     GMutex *ring_mutex;
107     GCond *ring_add_cond, *ring_free_cond;
108     gchar *ring_buffer;
109     gsize ring_length, ring_count;
110     gsize ring_head, ring_tail;
111     gboolean ring_head_at_eof;
112
113     /* Element State
114      *
115      * "state" includes all of the variables below (including device
116      * parameters).  Note that the device_thread holdes this mutex for the
117      * entire duration of writing a part.
118      *
119      * state_mutex should always be locked before ring_mutex, if both are to be
120      * held simultaneously.
121      */
122     GMutex *state_mutex;
123     GCond *state_cond;
124     volatile gboolean paused;
125
126     /* The device to write to, and the header to write to it */
127     Device *volatile device;
128     dumpfile_t *volatile part_header;
129
130     /* bytes to read from cached slices before reading from the ring buffer */
131     guint64 bytes_to_read_from_slices;
132
133     /* part number in progress */
134     volatile guint64 partnum;
135
136     /* status of the last part */
137     gboolean last_part_eof;
138     gboolean last_part_eom;
139     gboolean last_part_successful;
140
141     /* true if the element is done writing to devices */
142     gboolean no_more_parts;
143
144     /* total bytes written in the current part */
145     volatile guint64 part_bytes_written;
146
147     /* The list of active slices for the current part.  The cache_inform method
148      * appends to this list. It is safe to read this linked list, beginning at
149      * the head, *if* you can guarantee that slices will not be fast-forwarded
150      * in the interim.  The finalize method for this class will take care of
151      * freeing any leftover slices. Take the part_slices mutex while modifying
152      * the links in this list. */
153     FileSlice *part_slices;
154     GMutex *part_slices_mutex;
155 } XferDestTaperSplitter;
156
157 static GType xfer_dest_taper_splitter_get_type(void);
158 #define XFER_DEST_TAPER_SPLITTER_TYPE (xfer_dest_taper_splitter_get_type())
159 #define XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter)
160 #define XFER_DEST_TAPER_SPLITTER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter const)
161 #define XFER_DEST_TAPER_SPLITTER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
162 #define IS_XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_splitter_get_type ())
163 #define XFER_DEST_TAPER_SPLITTER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
164
165 typedef struct {
166     XferDestTaperClass __parent__;
167
168 } XferDestTaperSplitterClass;
169
170 /*
171  * Debug logging
172  */
173
174 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
175 static void
176 _xdt_dbg(const char *fmt, ...)
177 {
178     va_list argp;
179     char msg[1024];
180
181     arglist_start(argp, fmt);
182     g_vsnprintf(msg, sizeof(msg), fmt, argp);
183     arglist_end(argp);
184     g_debug("XDT thd-%p: %s", g_thread_self(), msg);
185 }
186
187 /* "Fast forward" the slice list by the given length.  This will free any
188  * slices that are no longer necessary, and adjust the offset and length of the
189  * first remaining slice.  This assumes the state mutex is locked during its
190  * operation.
191  *
192  * @param self: element
193  * @param length: number of bytes to fast forward
194  */
195 static void
196 fast_forward_slices(
197         XferDestTaperSplitter *self,
198         guint64 length)
199 {
200     FileSlice *slice;
201
202     /* consume slices until we've eaten the whole part */
203     g_mutex_lock(self->part_slices_mutex);
204     while (length > 0) {
205         g_assert(self->part_slices);
206         slice = self->part_slices;
207
208         if (slice->length <= length) {
209             length -= slice->length;
210
211             self->part_slices = slice->next;
212             if (slice->filename)
213                 g_free(slice->filename);
214             g_free(slice);
215             slice = self->part_slices;
216         } else {
217             slice->length -= length;
218             slice->offset += length;
219             break;
220         }
221     }
222     g_mutex_unlock(self->part_slices_mutex);
223 }
224
225 /*
226  * Slice Iterator
227  */
228
229 /* A struct for use in iterating over data in the slices */
230 typedef struct SliceIterator {
231     /* current slice */
232     FileSlice *slice;
233
234     /* file descriptor of the current file, or -1 if it's not open yet */
235     int cur_fd;
236
237     /* bytes remaining in this slice */
238     guint64 slice_remaining;
239 } SliceIterator;
240
241 /* Utility functions for SliceIterator */
242
243 /* Begin iterating over slices, starting at the first byte of the first slice.
244  * Initializes a pre-allocated SliceIterator.  The caller must ensure that
245  * fast_forward_slices is not called while an iteration is in
246  * progress.
247  */
248 static void
249 iterate_slices(
250         XferDestTaperSplitter *self,
251         SliceIterator *iter)
252 {
253     iter->cur_fd = -1;
254     iter->slice_remaining = 0;
255     g_mutex_lock(self->part_slices_mutex);
256     iter->slice = self->part_slices;
257     /* it's safe to unlock this because, at worst, a new entry will
258      * be appended while the iterator is in progress */
259     g_mutex_unlock(self->part_slices_mutex);
260 }
261
262
263 /* Get a block of data from the iterator, returning a pointer to a buffer
264  * containing the data; the buffer remains the property of the iterator.
265  * Returns NULL on error, after calling xfer_cancel_with_error with an
266  * appropriate error message.  This function does not block, so it does not
267  * check for cancellation.
268  */
269 static gpointer
270 iterator_get_block(
271         XferDestTaperSplitter *self,
272         SliceIterator *iter,
273         gpointer buf,
274         gsize bytes_needed)
275 {
276     gsize buf_offset = 0;
277     XferElement *elt = XFER_ELEMENT(self);
278
279     g_assert(iter != NULL);
280     g_assert(buf != NULL);
281
282     while (bytes_needed > 0) {
283         gsize read_size;
284         int bytes_read;
285
286         if (iter->cur_fd < 0) {
287             guint64 offset;
288
289             g_assert(iter->slice != NULL);
290             g_assert(iter->slice->filename != NULL);
291
292             iter->cur_fd = open(iter->slice->filename, O_RDONLY, 0);
293             if (iter->cur_fd < 0) {
294                 xfer_cancel_with_error(elt,
295                     _("Could not open '%s' for reading: %s"),
296                     iter->slice->filename, strerror(errno));
297                 return NULL;
298             }
299
300             iter->slice_remaining = iter->slice->length;
301             offset = iter->slice->offset;
302
303             if (lseek(iter->cur_fd, offset, SEEK_SET) == -1) {
304                 xfer_cancel_with_error(elt,
305                     _("Could not seek '%s' for reading: %s"),
306                     iter->slice->filename, strerror(errno));
307                 return NULL;
308             }
309         }
310
311         read_size = MIN(iter->slice_remaining, bytes_needed);
312         bytes_read = full_read(iter->cur_fd,
313                                buf + buf_offset,
314                                read_size);
315         if (bytes_read < 0 || (gsize)bytes_read < read_size) {
316             xfer_cancel_with_error(elt,
317                 _("Error reading '%s': %s"),
318                 iter->slice->filename,
319                 errno? strerror(errno) : _("Unexpected EOF"));
320             return NULL;
321         }
322
323         iter->slice_remaining -= bytes_read;
324         buf_offset += bytes_read;
325         bytes_needed -= bytes_read;
326
327         if (iter->slice_remaining <= 0) {
328             if (close(iter->cur_fd) < 0) {
329                 xfer_cancel_with_error(elt,
330                     _("Could not close fd %d: %s"),
331                     iter->cur_fd, strerror(errno));
332                 return NULL;
333             }
334             iter->cur_fd = -1;
335
336             iter->slice = iter->slice->next;
337
338             if (elt->cancelled)
339                 return NULL;
340         }
341     }
342
343     return buf;
344 }
345
346
347 /* Free the iterator's resources */
348 static void
349 iterator_free(
350         SliceIterator *iter)
351 {
352     if (iter->cur_fd >= 0)
353         close(iter->cur_fd);
354 }
355
356 /*
357  * Device Thread
358  */
359
360 /* Wait for at least one block, or EOF, to be available in the ring buffer.
361  * Called with the ring mutex held. */
362 static gsize
363 device_thread_wait_for_block(
364     XferDestTaperSplitter *self)
365 {
366     XferElement *elt = XFER_ELEMENT(self);
367     gsize bytes_needed = self->device->block_size;
368     gsize usable;
369
370     /* for any kind of streaming, we need to fill the entire buffer before the
371      * first byte */
372     if (self->part_bytes_written == 0 && self->streaming != STREAMING_REQUIREMENT_NONE)
373         bytes_needed = self->ring_length;
374
375     while (1) {
376         /* are we ready? */
377         if (elt->cancelled)
378             break;
379
380         if (self->ring_count >= bytes_needed)
381             break;
382
383         if (self->ring_head_at_eof)
384             break;
385
386         /* nope - so wait */
387         g_cond_wait(self->ring_add_cond, self->ring_mutex);
388
389         /* in STREAMING_REQUIREMENT_REQUIRED, once we decide to wait for more bytes,
390          * we need to wait for the entire buffer to fill */
391         if (self->streaming == STREAMING_REQUIREMENT_REQUIRED)
392             bytes_needed = self->ring_length;
393     }
394
395     usable = MIN(self->ring_count, bytes_needed);
396     if (self->part_size)
397        usable = MIN(usable, self->part_size - self->part_bytes_written);
398
399     return usable;
400 }
401
402 /* Mark WRITTEN bytes as free in the ring buffer.  Called with the ring mutex
403  * held. */
404 static void
405 device_thread_consume_block(
406     XferDestTaperSplitter *self,
407     gsize written)
408 {
409     self->ring_count -= written;
410     self->ring_tail += written;
411     if (self->ring_tail >= self->ring_length)
412         self->ring_tail -= self->ring_length;
413     g_cond_broadcast(self->ring_free_cond);
414 }
415
416 /* Write an entire part.  Called with the state_mutex held */
417 static XMsg *
418 device_thread_write_part(
419     XferDestTaperSplitter *self)
420 {
421     GTimer *timer = g_timer_new();
422     XferElement *elt = XFER_ELEMENT(self);
423
424     enum { PART_EOF, PART_LEOM, PART_EOP, PART_FAILED } part_status = PART_FAILED;
425     int fileno = 0;
426     XMsg *msg;
427
428     self->part_bytes_written = 0;
429
430     g_timer_start(timer);
431
432     /* write the header; if this fails or hits LEOM, we consider this a
433      * successful 0-byte part */
434     if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
435         part_status = PART_LEOM;
436         goto part_done;
437     }
438
439     fileno = self->device->file;
440     g_assert(fileno > 0);
441
442     /* free the header, now that it's written */
443     dumpfile_free(self->part_header);
444     self->part_header = NULL;
445
446     /* First, read the requisite number of bytes from the part_slices, if the part was
447      * unsuccessful. */
448     if (self->bytes_to_read_from_slices) {
449         SliceIterator iter;
450         gsize to_write = self->block_size;
451         gpointer buf = g_malloc(to_write);
452         gboolean successful = TRUE;
453         guint64 bytes_from_slices = self->bytes_to_read_from_slices;
454
455         DBG(5, "reading %ju bytes from slices", (uintmax_t)bytes_from_slices);
456
457         iterate_slices(self, &iter);
458         while (bytes_from_slices) {
459             gboolean ok;
460
461             if (!iterator_get_block(self, &iter, buf, to_write)) {
462                 part_status = PART_FAILED;
463                 successful = FALSE;
464                 break;
465             }
466
467             /* note that it's OK to reference these ring_* vars here, as they
468              * are static at this point */
469             ok = device_write_block(self->device, (guint)to_write, buf);
470
471             if (!ok) {
472                 part_status = PART_FAILED;
473                 successful = FALSE;
474                 break;
475             }
476
477             self->part_bytes_written += to_write;
478             bytes_from_slices -= to_write;
479
480             if (self->part_size && self->part_bytes_written >= self->part_size) {
481                 part_status = PART_EOP;
482                 successful = FALSE;
483                 break;
484             } else if (self->device->is_eom) {
485                 part_status = PART_LEOM;
486                 successful = FALSE;
487                 break;
488             }
489         }
490
491         iterator_free(&iter);
492         g_free(buf);
493
494         /* if we didn't finish, get out of here now */
495         if (!successful)
496             goto part_done;
497     }
498
499     g_mutex_lock(self->ring_mutex);
500     while (1) {
501         gsize to_write;
502         gboolean ok;
503
504         /* wait for at least one block, and (if necessary) prebuffer */
505         to_write = device_thread_wait_for_block(self);
506         to_write = MIN(to_write, self->device->block_size);
507         if (elt->cancelled)
508             break;
509
510         if (to_write == 0) {
511             part_status = PART_EOF;
512             break;
513         }
514
515         g_mutex_unlock(self->ring_mutex);
516         DBG(8, "writing %ju bytes to device", (uintmax_t)to_write);
517
518         /* note that it's OK to reference these ring_* vars here, as they
519          * are static at this point */
520         ok = device_write_block(self->device, (guint)to_write,
521                 self->ring_buffer + self->ring_tail);
522         g_mutex_lock(self->ring_mutex);
523
524         if (!ok) {
525             part_status = PART_FAILED;
526             break;
527         }
528
529         self->part_bytes_written += to_write;
530         device_thread_consume_block(self, to_write);
531
532         if (self->part_size && self->part_bytes_written >= self->part_size) {
533             part_status = PART_EOP;
534             break;
535         } else if (self->device->is_eom) {
536             part_status = PART_LEOM;
537             break;
538         }
539     }
540     g_mutex_unlock(self->ring_mutex);
541 part_done:
542
543     if (elt->cancelled) {
544         g_timer_destroy(timer);
545         return NULL;
546     }
547
548     /* if we write all of the blocks, but the finish_file fails, then likely
549      * there was some buffering going on in the device driver, and the blocks
550      * did not all make it to permanent storage -- so it's a failed part.  Note
551      * that we try to finish_file even if the part failed, just to be thorough. */
552     if (self->device->in_file) {
553         if (!device_finish_file(self->device))
554             part_status = PART_FAILED;
555     }
556
557     g_timer_stop(timer);
558
559     msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
560     msg->size = self->part_bytes_written;
561     msg->duration = g_timer_elapsed(timer, NULL);
562     msg->partnum = self->partnum;
563     msg->fileno = fileno;
564     msg->successful = self->last_part_successful = part_status != PART_FAILED;
565     msg->eom = self->last_part_eom = (part_status == PART_LEOM || !msg->successful);
566     msg->eof = self->last_part_eof = part_status == PART_EOF;
567
568     /* time runs backward on some test boxes, so make sure this is positive */
569     if (msg->duration < 0) msg->duration = 0;
570
571     if (msg->successful)
572         self->partnum++;
573     self->no_more_parts = msg->eof || (!msg->successful && !self->expect_cache_inform);
574
575     g_timer_destroy(timer);
576
577     return msg;
578 }
579
580 static gpointer
581 device_thread(
582     gpointer data)
583 {
584     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
585     XferElement *elt = XFER_ELEMENT(self);
586     XMsg *msg;
587
588     DBG(1, "(this is the device thread)");
589
590     /* This is the outer loop, that loops once for each split part written to
591      * tape. */
592     g_mutex_lock(self->state_mutex);
593     while (1) {
594         /* wait until the main thread un-pauses us, and check that we have
595          * the relevant device info available (block_size) */
596         while (self->paused && !elt->cancelled) {
597             DBG(9, "waiting to be unpaused");
598             g_cond_wait(self->state_cond, self->state_mutex);
599         }
600         DBG(9, "done waiting");
601
602         if (elt->cancelled)
603             break;
604
605         DBG(2, "beginning to write part");
606         msg = device_thread_write_part(self);
607         DBG(2, "done writing part");
608
609         if (!msg) /* cancelled */
610             break;
611
612         /* release the slices for this part, if there were any slices */
613         if (msg->successful && self->expect_cache_inform) {
614             fast_forward_slices(self, msg->size);
615         }
616
617         xfer_queue_message(elt->xfer, msg);
618
619         /* pause ourselves and await instructions from the main thread */
620         self->paused = TRUE;
621
622         /* if this is the last part, we're done with the part loop */
623         if (self->no_more_parts)
624             break;
625     }
626     g_mutex_unlock(self->state_mutex);
627
628     /* tell the main thread we're done */
629     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
630
631     return NULL;
632 }
633
634 /*
635  * Class mechanics
636  */
637
638 static void
639 push_buffer_impl(
640     XferElement *elt,
641     gpointer buf,
642     size_t size)
643 {
644     XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
645     gchar *p = buf;
646
647     DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
648
649     /* do nothing if cancelled */
650     if (G_UNLIKELY(elt->cancelled)) {
651         goto free_and_finish;
652     }
653
654     /* handle EOF */
655     if (G_UNLIKELY(buf == NULL)) {
656         /* indicate EOF to the device thread */
657         g_mutex_lock(self->ring_mutex);
658         self->ring_head_at_eof = TRUE;
659         g_cond_broadcast(self->ring_add_cond);
660         g_mutex_unlock(self->ring_mutex);
661         goto free_and_finish;
662     }
663
664     /* push the block into the ring buffer, in pieces if necessary */
665     g_mutex_lock(self->ring_mutex);
666     while (size > 0) {
667         gsize avail;
668
669         /* wait for some space */
670         while (self->ring_count == self->ring_length && !elt->cancelled) {
671             DBG(9, "waiting for any space to buffer pushed data");
672             g_cond_wait(self->ring_free_cond, self->ring_mutex);
673         }
674         DBG(9, "done waiting");
675
676         if (elt->cancelled)
677             goto unlock_and_free_and_finish;
678
679         /* only copy to the end of the buffer, if the available space wraps
680          * around to the beginning */
681         avail = MIN(size, self->ring_length - self->ring_count);
682         avail = MIN(avail, self->ring_length - self->ring_head);
683
684         /* copy AVAIL bytes into the ring buf (knowing it's contiguous) */
685         memmove(self->ring_buffer + self->ring_head, p, avail);
686
687         /* reset the ring variables to represent this state */
688         self->ring_count += avail;
689         self->ring_head += avail; /* will, at most, hit ring_length */
690         if (self->ring_head == self->ring_length)
691             self->ring_head = 0;
692         p = (gpointer)((guchar *)p + avail);
693         size -= avail;
694
695         /* and give the device thread a notice that data is ready */
696         g_cond_broadcast(self->ring_add_cond);
697     }
698
699 unlock_and_free_and_finish:
700     g_mutex_unlock(self->ring_mutex);
701
702 free_and_finish:
703     if (buf)
704         g_free(buf);
705 }
706
707 /*
708  * Element mechanics
709  */
710
711 static gboolean
712 start_impl(
713     XferElement *elt)
714 {
715     XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
716     GError *error = NULL;
717
718     self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
719     if (!self->device_thread) {
720         g_critical(_("Error creating new thread: %s (%s)"),
721             error->message, errno? strerror(errno) : _("no error code"));
722     }
723
724     return TRUE;
725 }
726
727 static gboolean
728 cancel_impl(
729     XferElement *elt,
730     gboolean expect_eof)
731 {
732     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
733     gboolean rv;
734
735     /* chain up first */
736     rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
737
738     /* then signal all of our condition variables, so that threads waiting on them
739      * wake up and see elt->cancelled. */
740     g_mutex_lock(self->state_mutex);
741     g_cond_broadcast(self->state_cond);
742     g_mutex_unlock(self->state_mutex);
743
744     g_mutex_lock(self->ring_mutex);
745     g_cond_broadcast(self->ring_add_cond);
746     g_cond_broadcast(self->ring_free_cond);
747     g_mutex_unlock(self->ring_mutex);
748
749     return rv;
750 }
751
752 static void
753 start_part_impl(
754     XferDestTaper *xdt,
755     gboolean retry_part,
756     dumpfile_t *header)
757 {
758     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
759
760     g_assert(self->device != NULL);
761     g_assert(!self->device->in_file);
762     g_assert(header != NULL);
763
764     DBG(1, "start_part()");
765
766     /* we can only retry the part if we're getting slices via cache_inform's */
767     if (retry_part) {
768         if (self->last_part_successful) {
769             xfer_cancel_with_error(XFER_ELEMENT(self),
770                 _("Previous part did not fail; cannot retry"));
771             return;
772         }
773
774         if (!self->expect_cache_inform) {
775             xfer_cancel_with_error(XFER_ELEMENT(self),
776                 _("No cache for previous failed part; cannot retry"));
777             return;
778         }
779
780         self->bytes_to_read_from_slices = self->part_bytes_written;
781     } else {
782         /* don't read any bytes from the slices, since we're not retrying */
783         self->bytes_to_read_from_slices = 0;
784     }
785
786     g_mutex_lock(self->state_mutex);
787     g_assert(self->paused);
788     g_assert(!self->no_more_parts);
789
790     if (self->part_header)
791         dumpfile_free(self->part_header);
792     self->part_header = dumpfile_copy(header);
793
794     DBG(1, "unpausing");
795     self->paused = FALSE;
796     g_cond_broadcast(self->state_cond);
797
798     g_mutex_unlock(self->state_mutex);
799 }
800
801 static void
802 use_device_impl(
803     XferDestTaper *xdtself,
804     Device *device)
805 {
806     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
807     StreamingRequirement newstreaming;
808     GValue val;
809
810     DBG(1, "use_device(%s)%s", device->device_name, (device == self->device)? " (no change)":"");
811
812     /* short-circuit if nothing is changing */
813     if (self->device == device)
814         return;
815
816     g_mutex_lock(self->state_mutex);
817     if (self->device)
818         g_object_unref(self->device);
819     self->device = device;
820     g_object_ref(device);
821
822     /* get this new device's streaming requirements */
823     bzero(&val, sizeof(val));
824     if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
825         || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
826         g_warning("Couldn't get streaming type for %s", self->device->device_name);
827     } else {
828         newstreaming = g_value_get_enum(&val);
829         if (newstreaming != self->streaming)
830             g_warning("New device has different streaming requirements from the original; "
831                     "ignoring new requirement");
832     }
833     g_value_unset(&val);
834
835     /* check that the blocksize hasn't changed */
836     if (self->block_size != device->block_size) {
837         g_mutex_unlock(self->state_mutex);
838         xfer_cancel_with_error(XFER_ELEMENT(self),
839             _("All devices used by the taper must have the same block size"));
840         return;
841     }
842     g_mutex_unlock(self->state_mutex);
843 }
844
845 static void
846 cache_inform_impl(
847     XferDestTaper *xdt,
848     const char *filename,
849     off_t offset,
850     off_t length)
851 {
852     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
853     FileSlice *slice = g_new(FileSlice, 1), *iter;
854
855     slice->next = NULL;
856     slice->filename = g_strdup(filename);
857     slice->offset = offset;
858     slice->length = length;
859
860     g_mutex_lock(self->part_slices_mutex);
861     if (self->part_slices) {
862         for (iter = self->part_slices; iter->next; iter = iter->next) {}
863         iter->next = slice;
864     } else {
865         self->part_slices = slice;
866     }
867     g_mutex_unlock(self->part_slices_mutex);
868 }
869
870 static guint64
871 get_part_bytes_written_impl(
872     XferDestTaper *xdtself)
873 {
874     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
875
876     /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
877      * partial write to the 64-bit value on a 32-bit system).  This is ok for
878      * the moment, as it's only informational, but be warned. */
879     return self->part_bytes_written;
880 }
881
882 static void
883 instance_init(
884     XferElement *elt)
885 {
886     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
887     elt->can_generate_eof = FALSE;
888
889     self->state_mutex = g_mutex_new();
890     self->state_cond = g_cond_new();
891     self->ring_mutex = g_mutex_new();
892     self->ring_add_cond = g_cond_new();
893     self->ring_free_cond = g_cond_new();
894     self->part_slices_mutex = g_mutex_new();
895
896     self->device = NULL;
897     self->paused = TRUE;
898     self->part_header = NULL;
899     self->partnum = 1;
900     self->part_bytes_written = 0;
901     self->part_slices = NULL;
902 }
903
904 static void
905 finalize_impl(
906     GObject * obj_self)
907 {
908     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(obj_self);
909     FileSlice *slice, *next_slice;
910
911     g_mutex_free(self->state_mutex);
912     g_cond_free(self->state_cond);
913
914     g_mutex_free(self->ring_mutex);
915     g_cond_free(self->ring_add_cond);
916     g_cond_free(self->ring_free_cond);
917
918     g_mutex_free(self->part_slices_mutex);
919
920     for (slice = self->part_slices; slice; slice = next_slice) {
921         next_slice = slice->next;
922         if (slice->filename)
923             g_free(slice->filename);
924         g_free(slice);
925     }
926
927     if (self->ring_buffer)
928         g_free(self->ring_buffer);
929
930     if (self->part_header)
931         dumpfile_free(self->part_header);
932
933     if (self->device)
934         g_object_unref(self->device);
935
936     /* chain up */
937     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
938 }
939
940 static void
941 class_init(
942     XferDestTaperSplitterClass * selfc)
943 {
944     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
945     XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
946     GObjectClass *goc = G_OBJECT_CLASS(selfc);
947     static xfer_element_mech_pair_t mech_pairs[] = {
948         { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 1},
949         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
950     };
951
952     klass->start = start_impl;
953     klass->cancel = cancel_impl;
954     klass->push_buffer = push_buffer_impl;
955     xdt_klass->start_part = start_part_impl;
956     xdt_klass->use_device = use_device_impl;
957     xdt_klass->cache_inform = cache_inform_impl;
958     xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
959     goc->finalize = finalize_impl;
960
961     klass->perl_class = "Amanda::Xfer::Dest::Taper::Splitter";
962     klass->mech_pairs = mech_pairs;
963
964     parent_class = g_type_class_peek_parent(selfc);
965 }
966
967 static GType
968 xfer_dest_taper_splitter_get_type (void)
969 {
970     static GType type = 0;
971
972     if G_UNLIKELY(type == 0) {
973         static const GTypeInfo info = {
974             sizeof (XferDestTaperSplitterClass),
975             (GBaseInitFunc) NULL,
976             (GBaseFinalizeFunc) NULL,
977             (GClassInitFunc) class_init,
978             (GClassFinalizeFunc) NULL,
979             NULL /* class_data */,
980             sizeof (XferDestTaperSplitter),
981             0 /* n_preallocs */,
982             (GInstanceInitFunc) instance_init,
983             NULL
984         };
985
986         type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperSplitter", &info, 0);
987     }
988
989     return type;
990 }
991
992 /*
993  * Constructor
994  */
995
996 XferElement *
997 xfer_dest_taper_splitter(
998     Device *first_device,
999     size_t max_memory,
1000     guint64 part_size,
1001     gboolean expect_cache_inform)
1002 {
1003     XferDestTaperSplitter *self = (XferDestTaperSplitter *)g_object_new(XFER_DEST_TAPER_SPLITTER_TYPE, NULL);
1004     GValue val;
1005
1006     /* max_memory and part_size get rounded up to the next multiple of
1007      * block_size */
1008     max_memory = ((max_memory + first_device->block_size - 1)
1009                         / first_device->block_size) * first_device->block_size;
1010     if (part_size)
1011         part_size = ((part_size + first_device->block_size - 1)
1012                             / first_device->block_size) * first_device->block_size;
1013
1014     self->part_size = part_size;
1015     self->partnum = 1;
1016     self->device = first_device;
1017
1018     g_object_ref(self->device);
1019     self->block_size = first_device->block_size;
1020     self->paused = TRUE;
1021     self->no_more_parts = FALSE;
1022
1023     /* set up a ring buffer of size max_memory */
1024     self->ring_length = max_memory;
1025     self->ring_buffer = g_malloc(max_memory);
1026     self->ring_head = self->ring_tail = 0;
1027     self->ring_count = 0;
1028     self->ring_head_at_eof = 0;
1029
1030     /* get this new device's streaming requirements */
1031     bzero(&val, sizeof(val));
1032     if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
1033         || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
1034         g_warning("Couldn't get streaming type for %s", self->device->device_name);
1035         self->streaming = STREAMING_REQUIREMENT_REQUIRED;
1036     } else {
1037         self->streaming = g_value_get_enum(&val);
1038     }
1039     g_value_unset(&val);
1040
1041     /* grab data from cache_inform, just in case we hit PEOM */
1042     self->expect_cache_inform = expect_cache_inform;
1043
1044     return XFER_ELEMENT(self);
1045 }