c0aa5301f324388af6493eb688d60e87a479f087
[debian/amanda] / device-src / xfer-dest-taper-splitter.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amxfer.h"
23 #include "amanda.h"
24 #include "xfer-device.h"
25 #include "arglist.h"
26 #include "conffile.h"
27
28 /* A transfer destination that writes and entire dumpfile to one or more files on one
29  * or more devices.   This is designed to work in concert with Amanda::Taper::Scribe. */
30
31 /* Future Plans:
32  * - capture EOF early enough to avoid wasting a tape when the part size is an even multiple of the volume size - maybe reader thread can just go back and tag previous slab with EOF in that case?
33  * - use mmap to make the disk-cacher thread unnecessary, if supported, by simply mapping slabs into the disk cache file
34  * - can we find a way to fall back to mem_cache when the disk cache gets ENOSPC? Does it even make sense to try, since this would change the part size?
35  * - distinguish some permanent device errors and do not retry the part? (this will be a change of behavior)
36  */
37
38 /*
39  * Slabs
40  *
41  * Slabs are larger than blocks, and are the unit on which the element
42  * operates.  They are designed to be a few times larger than a block, to
43  * achieve a corresponding reduction in the number of locks and unlocks used
44  * per block, and similar reduction in the the amount of memory overhead
45  * required.
46  */
47
48 typedef struct Slab {
49     struct Slab *next;
50
51     /* counts incoming pointers: the preceding slab's 'next' pointer, and pointers
52      * from any processes operating on the slab */
53     gint refcount;
54
55     /* number of this slab in the sequence, global to this element's lifetime.
56      * Since this counts slabs, which are about 1M, this can address 16
57      * yottabytes of data before wrapping. */
58     guint64 serial;
59
60     /* slab size; this is only less than the element's slab size if the
61      * transfer is at EOF. */
62     gsize size;
63
64     /* base of the slab_size buffer */
65     gpointer base;
66 } Slab;
67
68 /*
69  * File Slices
70  *
71  * These objects are arranged in a linked list, and describe the files in which
72  * the disk cache is stored.  Note that we assume that slices are added *before*
73  * they are needed: the xfer element will fail if it tries to rewind and does not
74  * find a suitable slice.
75  */
76
77 typedef struct FileSlice {
78     struct FileSlice *next;
79
80     /* fully-qualified filename to read from, or NULL to read from disk_cache_read_fd */
81     char *filename;
82
83     /* offset in file to start at */
84     off_t offset;
85
86     /* length of data to read */
87     gsize length;
88 } FileSlice;
89
90 /*
91  * Xfer Dest Taper
92  */
93
94 static GObjectClass *parent_class = NULL;
95
96 typedef struct XferDestTaperSplitter {
97     XferDestTaper __parent__;
98
99     /* object parameters
100      *
101      * These values are supplied to the constructor, and can be assumed
102      * constant for the lifetime of the element.
103      */
104
105     /* maximum buffer space to use for streaming; this is unrelated to the
106      * fallback_splitsize */
107     gsize max_memory;
108
109     /* split buffering info; if we're doing memory buffering, use_mem_cache is
110      * true; if we're doing disk buffering, disk_cache_dirname is non-NULL
111      * and contains the (allocated) filename of the cache file.  Either way,
112      * part_size gives the desired cache size.  If part_size is zero, then
113      * no splitting takes place (so part_size is effectively infinite) */
114     gboolean use_mem_cache;
115     char *disk_cache_dirname;
116     guint64 part_size; /* (bytes) */
117
118     /*
119      * threads
120      */
121
122     /* The thread doing the actual writes to tape; this also handles buffering
123      * for streaming */
124     GThread *device_thread;
125
126     /* The thread writing slabs to the disk cache, if any */
127     GThread *disk_cache_thread;
128
129     /* slab train
130      *
131      * All in-memory data is contained in a linked list called the "slab
132      * train".  Various components are operating simultaneously at different
133      * points in this train.  Data from the upstream XferElement is appended to
134      * the head of the train, and the device thread follows along behind,
135      * writing data to the device.  When caching parts in memory, the slab
136      * train just grows to eventually contain the whole part.  When using an
137      * on-disk cache, the disk cache thread writes the tail of the train to
138      * disk, freeing slabs to be re-used at the head of the train.  Some
139      * careful coordination of these components allows them to operate as
140      * independently as possible within the limits of the user's configuration.
141      *
142      * Slabs are rarely, if ever, freed: the oldest_slab reference generally
143      * ensures that all slabs have refcount > 0, and this pointer is only
144      * advanced when re-using slabs that have been flushed to the disk cache or
145      * when freeing slabs after completion of the transfer. */
146
147     /* pointers into the slab train are all protected by this mutex.  Note that
148      * the slabs themselves can be manipulated without this lock; it's only
149      * when changing the pointers that the mutex must be held.  Furthermore, a
150      * foo_slab variable which is not NULL will not be changed except by its
151      * controlling thread (disk_cacher_slab is controlled by disk_cache_thread,
152      * and device_slab is controlled by device_thread).  This means that a
153      * controlling thread can drop the slab_mutex once it has ensured its slab
154      * is non-NULL.
155      *
156      * Slab_cond is notified when a new slab is made available from the reader.
157      * Slab_free_cond is notified when a slab becomes available for
158      * reallocation.
159      *
160      * Any thread waiting on either condition variable should also check
161      * elt->cancelled, and act appropriately if awakened in a cancelled state.
162      */
163     GMutex *slab_mutex; GCond *slab_cond; GCond *slab_free_cond;
164
165     /* slabs in progress by each thread, or NULL if the thread is waiting on
166      * slab_cond.  These can only be changed by their respective threads, except
167      * when they are NULL (in which case the reader will point them to a new
168      * slab and signal the slab_cond). */
169     Slab *volatile disk_cacher_slab;
170     Slab *volatile mem_cache_slab;
171     Slab *volatile device_slab;
172
173     /* tail and head of the slab train */
174     Slab *volatile oldest_slab;
175     Slab *volatile newest_slab;
176
177     /* thread-specific information
178      *
179      * These values are only used by one thread, and thus are not
180      * subject to any locking or concurrency constraints.
181      */
182
183     /* slab in progress by the reader (not in the slab train) */
184     Slab *reader_slab;
185
186     /* the serial to be assigned to reader_slab */
187     guint64 next_serial;
188
189     /* bytes written to the device in this part */
190     guint64 bytes_written;
191
192     /* bytes written to the device in the current slab */
193     guint64 slab_bytes_written;
194
195     /* element state
196      *
197      * "state" includes all of the variables below (including device
198      * parameters).  Note that the device_thread reads state values when
199      * paused is false without locking the mutex.  No other thread should
200      * change state when the element is not paused.
201      *
202      * If there is every any reason to lock both mutexes, acquire this one
203      * first.
204      *
205      * Any thread waiting on this condition variable should also check
206      * elt->cancelled, and act appropriately if awakened in a cancelled state.
207      */
208     GMutex *state_mutex;
209     GCond *state_cond;
210     volatile gboolean paused;
211
212     /* The device to write to, and the header to write to it */
213     Device *volatile device;
214     dumpfile_t *volatile part_header;
215
216     /* If true, when unpaused, the device should begin at the beginning of the
217      * cache; if false, it should proceed to the next part. */
218     volatile gboolean retry_part;
219
220     /* If true, the previous part was completed successfully; only used for
221      * assertions */
222     volatile gboolean last_part_successful;
223
224     /* part number in progress */
225     volatile guint64 partnum;
226
227     /* if true, the main thread should *not* call start_part */
228     volatile gboolean no_more_parts;
229
230     /* the first serial in this part, and the serial to stop at */
231     volatile guint64 part_first_serial, part_stop_serial;
232
233     /* file slices for the current part */
234     FileSlice *volatile part_slices;
235
236     /* read and write file descriptors for the disk cache file, in use by the
237      * disk_cache_thread.  If these are -1, wait on state_cond until they are
238      * not; once the value is set, it will not change. */
239     volatile int disk_cache_read_fd;
240     volatile int disk_cache_write_fd;
241
242     /* device parameters
243      *
244      * Note that these values aren't known until we begin writing to the
245      * device; if block_size is zero, threads should block on state_cond until
246      * it is nonzero, at which point all of the dependent fields will have
247      * their correct values.  Note that, since this value never changes after
248      * it has been set, it is safe to read block_size without acquiring the
249      * mutext first. */
250
251     /* this device's need for streaming */
252     StreamingRequirement streaming;
253
254     /* block size expected by the target device */
255     gsize block_size;
256
257     /* Size of a slab - some multiple of the block size */
258     gsize slab_size;
259
260     /* maximum number of slabs allowed, rounded up to the next whole slab.  If
261      * using mem cache, this is the equivalent of part_size bytes; otherwise,
262      * it is equivalent to max_memory bytes. */
263     guint64 max_slabs;
264
265     /* number of slabs in a part */
266     guint64 slabs_per_part;
267 } XferDestTaperSplitter;
268
269 static GType xfer_dest_taper_splitter_get_type(void);
270 #define XFER_DEST_TAPER_SPLITTER_TYPE (xfer_dest_taper_splitter_get_type())
271 #define XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter)
272 #define XFER_DEST_TAPER_SPLITTER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter const)
273 #define XFER_DEST_TAPER_SPLITTER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
274 #define IS_XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_splitter_get_type ())
275 #define XFER_DEST_TAPER_SPLITTER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
276
277 typedef struct {
278     XferDestTaperClass __parent__;
279
280 } XferDestTaperSplitterClass;
281
282 /*
283  * Debug logging
284  */
285
286 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
287 static void
288 _xdt_dbg(const char *fmt, ...)
289 {
290     va_list argp;
291     char msg[1024];
292
293     arglist_start(argp, fmt);
294     g_vsnprintf(msg, sizeof(msg), fmt, argp);
295     arglist_end(argp);
296     g_debug("XDT thd-%p: %s", g_thread_self(), msg);
297 }
298
299 /*
300  * Slab handling
301  */
302
303 /* called with the slab_mutex held, this gets a new slab to write into, with
304  * refcount 1.  It will block if max_memory slabs are already in use, and mem
305  * caching is not in use, although allocation may be forced with the 'force'
306  * parameter.
307  *
308  * If the memory allocation cannot be satisfied due to system constraints,
309  * this function will send an XMSG_ERROR, wait for the transfer to cancel, and
310  * return NULL.  If the transfer is cancelled by some other means while this
311  * function is blocked awaiting a free slab, it will return NULL.
312  *
313  * @param self: the xfer element
314  * @param force: allocate a slab even if it would exceed max_memory
315  * @returns: a new slab, or NULL if the xfer is cancelled
316  */
317 static Slab *
318 alloc_slab(
319     XferDestTaperSplitter *self,
320     gboolean force)
321 {
322     XferElement *elt = XFER_ELEMENT(self);
323     Slab *rv;
324
325     DBG(8, "alloc_slab(force=%d)", force);
326     if (!force) {
327         /* throttle based on maximum number of extant slabs */
328         while (G_UNLIKELY(
329             !elt->cancelled &&
330             self->oldest_slab &&
331             self->newest_slab &&
332             self->oldest_slab->refcount > 1 &&
333             (self->newest_slab->serial - self->oldest_slab->serial + 1) >= self->max_slabs)) {
334             DBG(9, "waiting for available slab");
335             g_cond_wait(self->slab_free_cond, self->slab_mutex);
336         }
337         DBG(9, "done waiting");
338
339         if (elt->cancelled)
340             return NULL;
341     }
342
343     /* if the oldest slab doesn't have anything else pointing to it, just use
344      * that */
345     if (self->oldest_slab && self->oldest_slab->refcount == 1) {
346         rv = self->oldest_slab;
347         self->oldest_slab = rv->next;
348     } else {
349         rv = g_new0(Slab, 1);
350         rv->refcount = 1;
351         rv->base = g_try_malloc(self->slab_size);
352         if (!rv->base) {
353             g_free(rv);
354             xfer_cancel_with_error(XFER_ELEMENT(self),
355                 _("Could not allocate %zu bytes of memory"), self->slab_size);
356             return NULL;
357         }
358     }
359
360     rv->next = NULL;
361     rv->size = 0;
362     return rv;
363 }
364
365 /* called with the slab_mutex held, this frees the given slave entirely.  The
366  * reference count is not consulted.
367  *
368  * @param slab: slab to free
369  */
370 static void
371 free_slab(
372     Slab *slab)
373 {
374     if (slab) {
375         if (slab->base)
376             g_free(slab->base);
377         g_free(slab);
378     }
379 }
380
381 /* called with the slab_mutex held, this decrements the refcount of the
382  * given slab
383  *
384  * @param self: xfer element
385  * @param slab: slab to free
386  */
387 static inline void
388 unref_slab(
389     XferDestTaperSplitter *self,
390     Slab *slab)
391 {
392     g_assert(slab->refcount > 1);
393     slab->refcount--;
394     if (G_UNLIKELY(slab->refcount == 1 && slab == self->oldest_slab)) {
395         g_cond_broadcast(self->slab_free_cond);
396     } else if (G_UNLIKELY(slab->refcount == 0)) {
397         free_slab(slab);
398     }
399 }
400
401 /* called with the slab_mutex held, this sets *slabp to *slabp->next,
402  * adjusting refcounts appropriately, and returns the new value
403  *
404  * @param self: xfer element
405  * @param slabp: slab pointer to advance
406  * @returns: new value of *slabp
407  */
408 static inline Slab *
409 next_slab(
410     XferDestTaperSplitter *self,
411     Slab * volatile *slabp)
412 {
413     Slab *next;
414
415     if (!slabp || !*slabp)
416         return NULL;
417
418     next = (*slabp)->next;
419     if (next)
420         next->refcount++;
421     if (*slabp)
422         unref_slab(self, *slabp);
423     *slabp = next;
424
425     return next;
426 }
427
428 /*
429  * Disk Cache
430  *
431  * The disk cache thread's job is simply to follow along the slab train at
432  * maximum speed, writing slabs to the disk cache file. */
433
434 static gboolean
435 open_disk_cache_fds(
436     XferDestTaperSplitter *self)
437 {
438     char * filename;
439
440     g_assert(self->disk_cache_read_fd == -1);
441     g_assert(self->disk_cache_write_fd == -1);
442
443     g_mutex_lock(self->state_mutex);
444     filename = g_strdup_printf("%s/amanda-split-buffer-XXXXXX",
445                                self->disk_cache_dirname);
446
447     self->disk_cache_write_fd = g_mkstemp(filename);
448     if (self->disk_cache_write_fd < 0) {
449         g_mutex_unlock(self->state_mutex);
450         xfer_cancel_with_error(XFER_ELEMENT(self),
451             _("Error creating cache file in '%s': %s"), self->disk_cache_dirname,
452             strerror(errno));
453         g_free(filename);
454         return FALSE;
455     }
456
457     /* open a separate copy of the file for reading */
458     self->disk_cache_read_fd = open(filename, O_RDONLY);
459     if (self->disk_cache_read_fd < 0) {
460         g_mutex_unlock(self->state_mutex);
461         xfer_cancel_with_error(XFER_ELEMENT(self),
462             _("Error opening cache file in '%s': %s"), self->disk_cache_dirname,
463             strerror(errno));
464         g_free(filename);
465         return FALSE;
466     }
467
468     /* signal anyone waiting for this value */
469     g_cond_broadcast(self->state_cond);
470     g_mutex_unlock(self->state_mutex);
471
472     /* errors from unlink are not fatal */
473     if (unlink(filename) < 0) {
474         g_warning("While unlinking '%s': %s (ignored)", filename, strerror(errno));
475     }
476
477     g_free(filename);
478     return TRUE;
479 }
480
481 static gpointer
482 disk_cache_thread(
483     gpointer data)
484 {
485     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
486     XferElement *elt = XFER_ELEMENT(self);
487
488     DBG(1, "(this is the disk cache thread)");
489
490     /* open up the disk cache file first */
491     if (!open_disk_cache_fds(self))
492         return NULL;
493
494     while (!elt->cancelled) {
495         gboolean eof, eop;
496         guint64 stop_serial;
497         Slab *slab;
498
499         /* rewind to the begining of the disk cache file */
500         if (lseek(self->disk_cache_write_fd, 0, SEEK_SET) == -1) {
501             xfer_cancel_with_error(XFER_ELEMENT(self),
502                 _("Error seeking disk cache file in '%s': %s"), self->disk_cache_dirname,
503                 strerror(errno));
504             return NULL;
505         }
506
507         /* we need to sit and wait for the next part to begin, first making sure
508          * we have a slab .. */
509         g_mutex_lock(self->slab_mutex);
510         while (!self->disk_cacher_slab && !elt->cancelled) {
511             DBG(9, "waiting for a disk slab");
512             g_cond_wait(self->slab_cond, self->slab_mutex);
513         }
514         DBG(9, "done waiting");
515         g_mutex_unlock(self->slab_mutex);
516
517         if (elt->cancelled)
518             break;
519
520         /* this slab is now fixed until this thread changes it */
521         g_assert(self->disk_cacher_slab != NULL);
522
523         /* and then making sure we're ready to write that slab. */
524         g_mutex_lock(self->state_mutex);
525         while ((self->paused ||
526                     (self->disk_cacher_slab && self->disk_cacher_slab->serial > self->part_first_serial))
527                 && !elt->cancelled) {
528             DBG(9, "waiting for the disk slab to become current and un-paused");
529             g_cond_wait(self->state_cond, self->state_mutex);
530         }
531         DBG(9, "done waiting");
532
533         stop_serial = self->part_stop_serial;
534         g_mutex_unlock(self->state_mutex);
535
536         if (elt->cancelled)
537             break;
538
539         g_mutex_lock(self->slab_mutex);
540         slab = self->disk_cacher_slab;
541         eop = eof = FALSE;
542         while (!eop && !eof) {
543             /* if we're at the head of the slab train, wait for more data */
544             while (!self->disk_cacher_slab && !elt->cancelled) {
545                 DBG(9, "waiting for the next disk slab");
546                 g_cond_wait(self->slab_cond, self->slab_mutex);
547             }
548             DBG(9, "done waiting");
549
550             if (elt->cancelled)
551                 break;
552
553             /* drop the lock long enough to write the slab; the refcount
554              * protects the slab during this time */
555             slab = self->disk_cacher_slab;
556             g_mutex_unlock(self->slab_mutex);
557
558             if (full_write(self->disk_cache_write_fd, slab->base, slab->size) < slab->size) {
559                 xfer_cancel_with_error(XFER_ELEMENT(self),
560                     _("Error writing to disk cache file in '%s': %s"), self->disk_cache_dirname,
561                     strerror(errno));
562                 return NULL;
563             }
564
565             eof = slab->size < self->slab_size;
566             eop = (slab->serial + 1 == stop_serial);
567
568             g_mutex_lock(self->slab_mutex);
569             next_slab(self, &self->disk_cacher_slab);
570         }
571         g_mutex_unlock(self->slab_mutex);
572
573         if (eof) {
574             /* this very thread should have just set this value to NULL, and since it's
575              * EOF, there should not be any 'next' slab */
576             g_assert(self->disk_cacher_slab == NULL);
577             break;
578         }
579     }
580
581     return NULL;
582 }
583
584 /*
585  * Device Thread
586  *
587  * The device thread's job is to write slabs to self->device, applying whatever
588  * streaming algorithms are required.  It does this by alternately getting the
589  * next slab from a "slab source" and writing that slab to the device.  Most of
590  * the slab source functions assume that self->slab_mutex is held, but may
591  * release the mutex (either explicitly or via a g_cond_wait), so it is not
592  * valid to assume that any slab pointers remain unchanged after a slab_source
593  * function invication.
594  */
595
596 /* This struct tracks the current state of the slab source */
597 typedef struct slab_source_state {
598     /* temporary slab used for reading from disk */
599     Slab *tmp_slab;
600
601     /* current source slice */
602     FileSlice *slice;
603
604     /* open fd in current slice, or -1 */
605     int slice_fd;
606
607     /* next serial to read from disk */
608     guint64 next_serial;
609
610     /* bytes remaining in this slice */
611     gsize slice_remaining;
612 } slab_source_state;
613
614 /* Called with the slab_mutex held, this function pre-buffers enough data into the slab
615  * train to meet the device's streaming needs. */
616 static gboolean
617 slab_source_prebuffer(
618     XferDestTaperSplitter *self)
619 {
620     XferElement *elt = XFER_ELEMENT(self);
621     guint64 prebuffer_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
622     guint64 i;
623     Slab *slab;
624
625     /* always prebuffer at least one slab, even if max_memory is 0 */
626     if (prebuffer_slabs == 0) prebuffer_slabs = 1;
627
628     /* pre-buffering is not necessary if we're reading from a disk cache */
629     if (self->retry_part && self->part_slices)
630         return TRUE;
631
632     /* pre-buffering means waiting until we have at least prebuffer_slabs in the
633      * slab train ahead of the device_slab, or the newest slab is at EOF. */
634     while (!elt->cancelled) {
635         gboolean eof_or_eop = FALSE;
636
637         /* see if there's enough data yet */
638         for (i = 0, slab = self->device_slab;
639              i < prebuffer_slabs && slab != NULL;
640              i++, slab = slab->next) {
641             eof_or_eop = (slab->size < self->slab_size)
642                 || (slab->serial + 1 == self->part_stop_serial);
643         }
644         if (i == prebuffer_slabs || eof_or_eop)
645             break;
646
647         DBG(9, "prebuffering wait");
648         g_cond_wait(self->slab_cond, self->slab_mutex);
649     }
650     DBG(9, "done waiting");
651
652     if (elt->cancelled) {
653         self->last_part_successful = FALSE;
654         self->no_more_parts = TRUE;
655         return FALSE;
656     }
657
658     return TRUE;
659 }
660
661 /* Called without the slab_mutex held, this function sets up a new slab_source_state
662  * object based on the configuratino of the Xfer Element. */
663 static inline gboolean
664 slab_source_setup(
665     XferDestTaperSplitter *self,
666     slab_source_state *state)
667 {
668     state->tmp_slab = NULL;
669     state->slice_fd = -1;
670     state->slice = NULL;
671     state->slice_remaining = 0;
672     state->next_serial = G_MAXUINT64;
673
674     /* if we're to retry the part, rewind to the beginning */
675     if (self->retry_part) {
676         if (self->use_mem_cache) {
677             /* rewind device_slab to point to the mem_cache_slab */
678             g_mutex_lock(self->slab_mutex);
679             if (self->device_slab)
680                 unref_slab(self, self->device_slab);
681             self->device_slab = self->mem_cache_slab;
682             if(self->device_slab != NULL)
683                 self->device_slab->refcount++;
684             g_mutex_unlock(self->slab_mutex);
685         } else {
686             g_assert(self->part_slices);
687
688             g_mutex_lock(self->slab_mutex);
689
690             /* we're going to read from the disk cache until we get to the oldest useful
691              * slab in memory, so it had best exist */
692             g_assert(self->oldest_slab != NULL);
693
694             /* point device_slab at the oldest slab we have */
695             self->oldest_slab->refcount++;
696             if (self->device_slab)
697                 unref_slab(self, self->device_slab);
698             self->device_slab = self->oldest_slab;
699
700             /* and increment it until it is at least the slab we want to start from */
701             while (self->device_slab->serial < self->part_first_serial) {
702                 next_slab(self, &self->device_slab);
703             }
704
705             /* get a new, temporary slab for use while reading */
706             state->tmp_slab = alloc_slab(self, TRUE);
707
708             g_mutex_unlock(self->slab_mutex);
709
710             if (!state->tmp_slab) {
711                 /* if we couldn't allocate a slab, then we're cancelled, so we're done with
712                  * this part. */
713                 self->last_part_successful = FALSE;
714                 self->no_more_parts = TRUE;
715                 return FALSE;
716             }
717
718             state->tmp_slab->size = self->slab_size;
719             state->slice = self->part_slices;
720             state->next_serial = self->part_first_serial;
721         }
722     }
723
724     /* if the streaming mode requires it, pre-buffer */
725     if (self->streaming == STREAMING_REQUIREMENT_DESIRED ||
726         self->streaming == STREAMING_REQUIREMENT_REQUIRED) {
727         gboolean prebuffer_ok;
728
729         g_mutex_lock(self->slab_mutex);
730         prebuffer_ok = slab_source_prebuffer(self);
731         g_mutex_unlock(self->slab_mutex);
732         if (!prebuffer_ok)
733             return FALSE;
734     }
735
736     return TRUE;
737 }
738
739 /* Called with the slab_mutex held, this does the work of slab_source_get when
740  * reading from the disk cache.  Note that this explicitly releases the
741  * slab_mutex during execution - do not depend on any protected values across a
742  * call to this function.  The mutex is held on return. */
743 static Slab *
744 slab_source_get_from_disk(
745     XferDestTaperSplitter *self,
746     slab_source_state *state,
747     guint64 serial)
748 {
749     XferElement *elt = XFER_ELEMENT(self);
750     gsize bytes_needed = self->slab_size;
751     gsize slab_offset = 0;
752
753     /* NOTE: slab_mutex is held, but we don't need it here, so release it for the moment */
754     g_mutex_unlock(self->slab_mutex);
755
756     g_assert(state->next_serial == serial);
757
758     while (bytes_needed > 0) {
759         gsize read_size, bytes_read;
760
761         if (state->slice_fd < 0) {
762             g_assert(state->slice);
763             if (state->slice->filename) {
764                 /* regular cache_inform file - just open it */
765                 state->slice_fd = open(state->slice->filename, O_RDONLY, 0);
766                 if (state->slice_fd < 0) {
767                     xfer_cancel_with_error(XFER_ELEMENT(self),
768                         _("Could not open '%s' for reading: %s"),
769                         state->slice->filename, strerror(errno));
770                     goto fatal_error;
771                 }
772             } else {
773                 /* wait for the disk_cache_thread to open the disk_cache_read_fd, and then copy it */
774                 g_mutex_lock(self->state_mutex);
775                 while (self->disk_cache_read_fd == -1 && !elt->cancelled) {
776                     DBG(9, "waiting for disk_cache_thread to start up");
777                     g_cond_wait(self->state_cond, self->state_mutex);
778                 }
779                 DBG(9, "done waiting");
780                 state->slice_fd = self->disk_cache_read_fd;
781                 g_mutex_unlock(self->state_mutex);
782             }
783
784             if (lseek(state->slice_fd, state->slice->offset, SEEK_SET) == -1) {
785                 xfer_cancel_with_error(XFER_ELEMENT(self),
786                     _("Could not seek '%s' for reading: %s"),
787                     state->slice->filename? state->slice->filename : "(cache file)",
788                     strerror(errno));
789                 goto fatal_error;
790             }
791
792             state->slice_remaining = state->slice->length;
793         }
794
795         read_size = MIN(state->slice_remaining, bytes_needed);
796         bytes_read = full_read(state->slice_fd,
797                                state->tmp_slab->base + slab_offset,
798                                read_size);
799         if (bytes_read < read_size) {
800             xfer_cancel_with_error(XFER_ELEMENT(self),
801                 _("Error reading '%s': %s"),
802                 state->slice->filename? state->slice->filename : "(cache file)",
803                 errno? strerror(errno) : _("Unexpected EOF"));
804             goto fatal_error;
805         }
806
807         state->slice_remaining -= bytes_read;
808         if (state->slice_remaining == 0) {
809             if (close(state->slice_fd) < 0) {
810                 xfer_cancel_with_error(XFER_ELEMENT(self),
811                     _("Could not close fd %d: %s"),
812                     state->slice_fd, strerror(errno));
813                 goto fatal_error;
814             }
815             state->slice_fd = -1;
816             state->slice = state->slice->next;
817         }
818
819         bytes_needed -= bytes_read;
820         slab_offset += bytes_read;
821     }
822
823     state->tmp_slab->serial = state->next_serial++;
824
825     g_mutex_lock(self->slab_mutex);
826     return state->tmp_slab;
827
828 fatal_error:
829     g_mutex_lock(self->slab_mutex);
830
831     self->last_part_successful = FALSE;
832     self->no_more_parts = TRUE;
833     return NULL;
834 }
835
836 /* Called with the slab_mutex held, this function gets the slab with the given
837  * serial number, waiting if necessary for that slab to be available.  Note
838  * that the slab_mutex may be released during execution, although it is always
839  * held on return. */
840 static inline Slab *
841 slab_source_get(
842     XferDestTaperSplitter *self,
843     slab_source_state *state,
844     guint64 serial)
845 {
846     XferElement *elt = (XferElement *)self;
847
848     /* device_slab is only NULL if we're following the slab train, so wait for
849      * a new slab */
850     if (!self->device_slab) {
851         /* if the streaming mode requires it, pre-buffer */
852         if (self->streaming == STREAMING_REQUIREMENT_DESIRED) {
853             if (!slab_source_prebuffer(self))
854                 return NULL;
855
856             /* fall through to make sure we have a device_slab;
857              * slab_source_prebuffer doesn't guarantee device_slab != NULL */
858         }
859
860         while (self->device_slab == NULL && !elt->cancelled) {
861             DBG(9, "waiting for the next slab");
862             g_cond_wait(self->slab_cond, self->slab_mutex);
863         }
864         DBG(9, "done waiting");
865
866         if (elt->cancelled)
867             goto fatal_error;
868     }
869
870     /* device slab is now set, and only this thread can change it */
871     g_assert(self->device_slab);
872
873     /* if the next item in the device slab is the one we want, then the job is
874      * pretty easy */
875     if (G_LIKELY(serial == self->device_slab->serial))
876         return self->device_slab;
877
878     /* otherwise, we're reading from disk */
879     g_assert(serial < self->device_slab->serial);
880     return slab_source_get_from_disk(self, state, serial);
881
882 fatal_error:
883     self->last_part_successful = FALSE;
884     self->no_more_parts = TRUE;
885     return NULL;
886 }
887
888 /* Called without the slab_mutex held, this frees any resources assigned
889  * to the slab source state */
890 static inline void
891 slab_source_free(
892     XferDestTaperSplitter *self,
893     slab_source_state *state)
894 {
895     if (state->slice_fd != -1)
896         close(state->slice_fd);
897
898     if (state->tmp_slab) {
899         g_mutex_lock(self->slab_mutex);
900         free_slab(state->tmp_slab);
901         g_mutex_unlock(self->slab_mutex);
902     }
903 }
904
905 /* Called without the slab_mutex, this writes the given slab to the device */
906 static gboolean
907 write_slab_to_device(
908     XferDestTaperSplitter *self,
909     Slab *slab)
910 {
911     XferElement *elt = XFER_ELEMENT(self);
912     gpointer buf = slab->base;
913     gsize remaining = slab->size;
914
915     while (remaining && !elt->cancelled) {
916         gsize write_size = MIN(self->block_size, remaining);
917         gboolean ok;
918         ok = device_write_block(self->device, write_size, buf);
919         if (!ok) {
920             self->bytes_written += slab->size - remaining;
921
922             /* TODO: handle an error without is_eom
923              * differently/fatally? or at least with a warning? */
924             self->last_part_successful = FALSE;
925             self->no_more_parts = FALSE;
926             return FALSE;
927         }
928
929         buf += write_size;
930         self->slab_bytes_written += write_size;
931         remaining -= write_size;
932     }
933
934     if (elt->cancelled) {
935         self->last_part_successful = FALSE;
936         self->no_more_parts = TRUE;
937         return FALSE;
938     }
939
940     self->bytes_written += slab->size;
941     self->slab_bytes_written = 0;
942     return TRUE;
943 }
944
945 static XMsg *
946 device_thread_write_part(
947     XferDestTaperSplitter *self)
948 {
949     GTimer *timer = g_timer_new();
950     XMsg *msg;
951     slab_source_state src_state;
952     guint64 serial, stop_serial;
953     gboolean eof = FALSE;
954     int fileno = 0;
955
956     self->last_part_successful = FALSE;
957     self->bytes_written = 0;
958
959     if (!device_start_file(self->device, self->part_header))
960         goto part_done;
961
962     dumpfile_free(self->part_header);
963     self->part_header = NULL;
964
965     fileno = self->device->file;
966     g_assert(fileno > 0);
967
968     if (!slab_source_setup(self, &src_state))
969         goto part_done;
970
971     g_timer_start(timer);
972
973     stop_serial = self->part_stop_serial;
974     g_mutex_lock(self->slab_mutex);
975     for (serial = self->part_first_serial; serial < stop_serial && !eof; serial++) {
976         Slab *slab = slab_source_get(self, &src_state, serial);
977         DBG(8, "writing slab %p (serial %ju) to device", slab, serial);
978         g_mutex_unlock(self->slab_mutex);
979         if (!slab)
980             goto part_done;
981
982         eof = slab->size < self->slab_size;
983
984         if (!write_slab_to_device(self, slab))
985             goto part_done;
986
987         g_mutex_lock(self->slab_mutex);
988         DBG(8, "wrote slab %p to device", slab);
989
990         /* if we're reading from the slab train, advance self->device_slab. */
991         if (slab == self->device_slab) {
992             next_slab(self, &self->device_slab);
993         }
994     }
995     g_mutex_unlock(self->slab_mutex);
996
997     /* if we write all of the blocks, but the finish_file fails, then likely
998      * there was some buffering going on in the device driver, and the blocks
999      * did not all make it to permanent storage -- so it's a failed part. */
1000     if (!device_finish_file(self->device))
1001         goto part_done;
1002
1003     slab_source_free(self, &src_state);
1004
1005     self->last_part_successful = TRUE;
1006     self->no_more_parts = eof;
1007
1008 part_done:
1009     g_timer_stop(timer);
1010
1011     msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
1012     msg->size = self->bytes_written;
1013     msg->duration = g_timer_elapsed(timer, NULL);
1014     msg->partnum = self->partnum;
1015     msg->fileno = fileno;
1016     msg->successful = self->last_part_successful;
1017     msg->eom = !self->last_part_successful;
1018     msg->eof = self->no_more_parts;
1019
1020     if (self->last_part_successful)
1021         self->partnum++;
1022
1023     g_timer_destroy(timer);
1024
1025     return msg;
1026 }
1027
1028 /* Called with the status_mutex held, this frees any cached data for
1029  * a successful part */
1030 static void
1031 release_part_cache(
1032     XferDestTaperSplitter *self)
1033 {
1034     if (self->use_mem_cache && self->mem_cache_slab) {
1035         /* move up the mem_cache_slab to point to the first slab in
1036          * the next part (probably NULL at this point), so that the
1037          * reader can continue reading data into the new mem cache
1038          * immediately. */
1039         g_mutex_lock(self->slab_mutex);
1040         unref_slab(self, self->mem_cache_slab);
1041         self->mem_cache_slab = self->device_slab;
1042         if (self->mem_cache_slab)
1043             self->mem_cache_slab->refcount++;
1044         g_mutex_unlock(self->slab_mutex);
1045     }
1046
1047     /* the disk_cache_thread takes care of freeing its cache */
1048     else if (self->disk_cache_dirname)
1049         return;
1050
1051     /* if we have part_slices, fast-forward them. Note that we should have a
1052      * full part's worth of slices by now. */
1053     else if (self->part_slices) {
1054         guint64 bytes_remaining = self->slabs_per_part * self->slab_size;
1055         FileSlice *slice = self->part_slices;
1056
1057         /* consume slices until we've eaten the whole part */
1058         while (bytes_remaining > 0) {
1059             if (slice == NULL)
1060                 g_critical("Not all data in part was represented to cache_inform");
1061
1062             if (slice->length <= bytes_remaining) {
1063                 bytes_remaining -= slice->length;
1064
1065                 self->part_slices = slice->next;
1066                 g_free(slice->filename);
1067                 g_free(slice);
1068                 slice = self->part_slices;
1069             } else {
1070                 slice->length -= bytes_remaining;
1071                 slice->offset += bytes_remaining;
1072                 break;
1073             }
1074         }
1075     }
1076 }
1077
1078 static gpointer
1079 device_thread(
1080     gpointer data)
1081 {
1082     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
1083     XferElement *elt = XFER_ELEMENT(self);
1084     XMsg *msg;
1085
1086     DBG(1, "(this is the device thread)");
1087
1088     if (self->disk_cache_dirname) {
1089         GError *error = NULL;
1090         self->disk_cache_thread = g_thread_create(disk_cache_thread, (gpointer)self, TRUE, &error);
1091         if (!self->disk_cache_thread) {
1092             g_critical(_("Error creating new thread: %s (%s)"),
1093                 error->message, errno? strerror(errno) : _("no error code"));
1094         }
1095     }
1096
1097     /* This is the outer loop, that loops once for each split part written to
1098      * tape. */
1099     g_mutex_lock(self->state_mutex);
1100     while (1) {
1101         /* wait until the main thread un-pauses us, and check that we have
1102          * the relevant device info available (block_size) */
1103         while (self->paused && !elt->cancelled) {
1104             DBG(9, "waiting to be unpaused");
1105             g_cond_wait(self->state_cond, self->state_mutex);
1106         }
1107         DBG(9, "done waiting");
1108
1109         if (elt->cancelled)
1110             break;
1111
1112         g_mutex_unlock(self->state_mutex);
1113         self->slab_bytes_written = 0;
1114         DBG(2, "beginning to write part");
1115         msg = device_thread_write_part(self);
1116         DBG(2, "done writing part");
1117         g_mutex_lock(self->state_mutex);
1118
1119         /* release any cache of a successful part, but don't bother at EOF */
1120         if (msg->successful && !msg->eof)
1121             release_part_cache(self);
1122
1123         xfer_queue_message(elt->xfer, msg);
1124
1125         /* if this is the last part, we're done with the part loop */
1126         if (self->no_more_parts)
1127             break;
1128
1129         /* pause ourselves and await instructions from the main thread */
1130         self->paused = TRUE;
1131     }
1132
1133     g_mutex_unlock(self->state_mutex);
1134
1135     /* make sure the other thread is done before we send XMSG_DONE */
1136     if (self->disk_cache_thread)
1137         g_thread_join(self->disk_cache_thread);
1138
1139     /* tell the main thread we're done */
1140     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1141
1142     return NULL;
1143 }
1144
1145 /*
1146  * Class mechanics
1147  */
1148
1149 /* called with the slab_mutex held, this adds the reader_slab to the head of
1150  * the slab train and signals the condition variable. */
1151 static void
1152 add_reader_slab_to_train(
1153     XferDestTaperSplitter *self)
1154 {
1155     Slab *slab = self->reader_slab;
1156
1157     DBG(3, "adding slab of new data to the slab train");
1158
1159     if (self->newest_slab) {
1160         self->newest_slab->next = slab;
1161         slab->refcount++;
1162
1163         self->newest_slab->refcount--;
1164     }
1165
1166     self->newest_slab = slab; /* steal reader_slab's ref */
1167     self->reader_slab = NULL;
1168
1169     /* steal reader_slab's reference for newest_slab */
1170
1171     /* if any of the other pointers are waiting for this slab, update them */
1172     if (self->disk_cache_dirname && !self->disk_cacher_slab) {
1173         self->disk_cacher_slab = slab;
1174         slab->refcount++;
1175     }
1176     if (self->use_mem_cache && !self->mem_cache_slab) {
1177         self->mem_cache_slab = slab;
1178         slab->refcount++;
1179     }
1180     if (!self->device_slab) {
1181         self->device_slab = slab;
1182         slab->refcount++;
1183     }
1184     if (!self->oldest_slab) {
1185         self->oldest_slab = slab;
1186         slab->refcount++;
1187     }
1188
1189     g_cond_broadcast(self->slab_cond);
1190 }
1191
1192 static void
1193 push_buffer_impl(
1194     XferElement *elt,
1195     gpointer buf,
1196     size_t size)
1197 {
1198     XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
1199     gpointer p;
1200
1201     DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
1202
1203     /* do nothing if cancelled */
1204     if (G_UNLIKELY(elt->cancelled)) {
1205         goto free_and_finish;
1206     }
1207
1208     /* handle EOF */
1209     if (G_UNLIKELY(buf == NULL)) {
1210         /* send off the last, probably partial slab */
1211         g_mutex_lock(self->slab_mutex);
1212
1213         /* create a new, empty slab if necessary */
1214         if (!self->reader_slab) {
1215             self->reader_slab = alloc_slab(self, FALSE);
1216             if (!self->reader_slab) {
1217                 /* we've been cancelled while waiting for a slab */
1218                 g_mutex_unlock(self->slab_mutex);
1219
1220                 /* wait for the xfer to cancel, so we don't get another buffer
1221                  * pushed to us (and do so *without* the mutex held) */
1222                 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1223
1224                 goto free_and_finish;
1225             }
1226             self->reader_slab->serial = self->next_serial++;
1227         }
1228
1229         add_reader_slab_to_train(self);
1230         g_mutex_unlock(self->slab_mutex);
1231
1232         goto free_and_finish;
1233     }
1234
1235     p = buf;
1236     while (1) {
1237         gsize copy_size;
1238
1239         /* get a fresh slab, if needed */
1240         if (G_UNLIKELY(!self->reader_slab) || self->reader_slab->size == self->slab_size) {
1241             g_mutex_lock(self->slab_mutex);
1242             if (self->reader_slab)
1243                 add_reader_slab_to_train(self);
1244             self->reader_slab = alloc_slab(self, FALSE);
1245             if (!self->reader_slab) {
1246                 /* we've been cancelled while waiting for a slab */
1247                 g_mutex_unlock(self->slab_mutex);
1248
1249                 /* wait for the xfer to cancel, so we don't get another buffer
1250                  * pushed to us (and do so *without* the mutex held) */
1251                 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1252
1253                 goto free_and_finish;
1254             }
1255             self->reader_slab->serial = self->next_serial++;
1256             g_mutex_unlock(self->slab_mutex);
1257         }
1258
1259         if (size == 0)
1260             break;
1261
1262         copy_size = MIN(self->slab_size - self->reader_slab->size, size);
1263         memcpy(self->reader_slab->base+self->reader_slab->size, p, copy_size);
1264
1265         self->reader_slab->size += copy_size;
1266         p += copy_size;
1267         size -= copy_size;
1268     }
1269
1270 free_and_finish:
1271     if (buf)
1272         g_free(buf);
1273 }
1274
1275 /*
1276  * Element mechanics
1277  */
1278
1279 static gboolean
1280 start_impl(
1281     XferElement *elt)
1282 {
1283     XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
1284     GError *error = NULL;
1285
1286     self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
1287     if (!self->device_thread) {
1288         g_critical(_("Error creating new thread: %s (%s)"),
1289             error->message, errno? strerror(errno) : _("no error code"));
1290     }
1291
1292     return TRUE;
1293 }
1294
1295 static gboolean
1296 cancel_impl(
1297     XferElement *elt,
1298     gboolean expect_eof)
1299 {
1300     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
1301     gboolean rv;
1302
1303     /* chain up first */
1304     rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
1305
1306     /* then signal all of our condition variables, so that threads waiting on them
1307      * wake up and see elt->cancelled. */
1308     g_mutex_lock(self->state_mutex);
1309     g_cond_broadcast(self->state_cond);
1310     g_mutex_unlock(self->state_mutex);
1311
1312     g_mutex_lock(self->slab_mutex);
1313     g_cond_broadcast(self->slab_cond);
1314     g_cond_broadcast(self->slab_free_cond);
1315     g_mutex_unlock(self->slab_mutex);
1316
1317     return rv;
1318 }
1319
1320 static void
1321 start_part_impl(
1322     XferDestTaper *xdtself,
1323     gboolean retry_part,
1324     dumpfile_t *header)
1325 {
1326     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
1327
1328     g_assert(self->device != NULL);
1329     g_assert(!self->device->in_file);
1330     g_assert(header != NULL);
1331
1332     DBG(1, "start_part(retry_part=%d)", retry_part);
1333
1334     g_mutex_lock(self->state_mutex);
1335     g_assert(self->paused);
1336     g_assert(!self->no_more_parts);
1337
1338     if (self->part_header)
1339         dumpfile_free(self->part_header);
1340     self->part_header = dumpfile_copy(header);
1341
1342     if (retry_part) {
1343         if (!self->use_mem_cache && !self->part_slices) {
1344             g_mutex_unlock(self->state_mutex);
1345             xfer_cancel_with_error(XFER_ELEMENT(self),
1346                 _("Failed part was not cached; cannot retry"));
1347             return;
1348         }
1349         g_assert(!self->last_part_successful);
1350         self->retry_part = TRUE;
1351     } else {
1352         g_assert(self->last_part_successful);
1353         self->retry_part = FALSE;
1354         self->part_first_serial = self->part_stop_serial;
1355         if (self->part_size != 0) {
1356             self->part_stop_serial = self->part_first_serial + self->slabs_per_part;
1357         } else {
1358             /* set part_stop_serial to an effectively infinite value */
1359             self->part_stop_serial = G_MAXUINT64;
1360         }
1361     }
1362
1363     DBG(1, "unpausing");
1364     self->paused = FALSE;
1365     g_cond_broadcast(self->state_cond);
1366
1367     g_mutex_unlock(self->state_mutex);
1368 }
1369
1370 static void
1371 use_device_impl(
1372     XferDestTaper *xdtself,
1373     Device *device)
1374 {
1375     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
1376     GValue val;
1377
1378     /* short-circuit if nothing is changing */
1379     if (self->device == device)
1380         return;
1381
1382     g_mutex_lock(self->state_mutex);
1383     if (self->device)
1384         g_object_unref(self->device);
1385     self->device = device;
1386     g_object_ref(device);
1387
1388     /* get this new device's streaming requirements */
1389     bzero(&val, sizeof(val));
1390     if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
1391         || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
1392         g_warning("Couldn't get streaming type for %s", self->device->device_name);
1393         self->streaming = STREAMING_REQUIREMENT_REQUIRED;
1394     } else {
1395         self->streaming = g_value_get_enum(&val);
1396     }
1397     g_value_unset(&val);
1398
1399     /* check that the blocksize hasn't changed */
1400     if (self->block_size != device->block_size) {
1401         g_mutex_unlock(self->state_mutex);
1402         xfer_cancel_with_error(XFER_ELEMENT(self),
1403             _("All devices used by the taper must have the same block size"));
1404         return;
1405     }
1406     g_mutex_unlock(self->state_mutex);
1407 }
1408
1409 static void
1410 cache_inform_impl(
1411     XferDestTaper *xdtself,
1412     const char *filename,
1413     off_t offset,
1414     off_t length)
1415 {
1416     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
1417     FileSlice *slice, *iter;
1418
1419     DBG(1, "cache_inform(\"%s\", %jd, %jd)", filename, (intmax_t)offset, (intmax_t)length);
1420
1421     /* do we even need this info? */
1422     if (self->disk_cache_dirname || self->use_mem_cache || self->part_size == 0)
1423         return;
1424
1425     /* handle the (admittedly unlikely) event that length is larger than gsize.
1426      * Hopefully if sizeof(off_t) = sizeof(gsize), this will get optimized out */
1427     while (sizeof(off_t) > sizeof(gsize) && length > (off_t)SIZE_MAX) {
1428         cache_inform_impl(xdtself, filename, offset, (off_t)SIZE_MAX);
1429         offset += (off_t)SIZE_MAX;
1430         length -= (off_t)SIZE_MAX;
1431     }
1432
1433     slice = g_new0(FileSlice, 1);
1434     slice->filename = g_strdup(filename);
1435     slice->offset = offset;
1436     slice->length = (gsize)length;
1437
1438     g_mutex_lock(self->state_mutex);
1439     if (self->part_slices) {
1440         for (iter = self->part_slices; iter->next; iter = iter->next) {}
1441         iter->next = slice;
1442     } else {
1443         self->part_slices = slice;
1444     }
1445     g_mutex_unlock(self->state_mutex);
1446 }
1447
1448 static guint64
1449 get_part_bytes_written_impl(
1450     XferDestTaper *xdtself)
1451 {
1452     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
1453     return self->bytes_written + self->slab_bytes_written;
1454 }
1455
1456 static void
1457 instance_init(
1458     XferElement *elt)
1459 {
1460     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
1461     elt->can_generate_eof = FALSE;
1462
1463     self->state_mutex = g_mutex_new();
1464     self->state_cond = g_cond_new();
1465     self->slab_mutex = g_mutex_new();
1466     self->slab_cond = g_cond_new();
1467     self->slab_free_cond = g_cond_new();
1468
1469     self->last_part_successful = TRUE;
1470     self->paused = TRUE;
1471     self->part_stop_serial = 0;
1472     self->disk_cache_read_fd = -1;
1473     self->disk_cache_write_fd = -1;
1474 }
1475
1476 static void
1477 finalize_impl(
1478     GObject * obj_self)
1479 {
1480     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(obj_self);
1481     Slab *slab, *next_slab;
1482     FileSlice *slice, *next_slice;
1483
1484     if (self->disk_cache_dirname)
1485         g_free(self->disk_cache_dirname);
1486
1487     g_mutex_free(self->state_mutex);
1488     g_cond_free(self->state_cond);
1489
1490     g_mutex_free(self->slab_mutex);
1491     g_cond_free(self->slab_cond);
1492     g_cond_free(self->slab_free_cond);
1493
1494     /* free the slab train, without reference to the refcounts */
1495     for (slab = self->oldest_slab; slab != NULL; slab = next_slab) {
1496         next_slab = slab->next;
1497         free_slab(slab);
1498     }
1499     self->disk_cacher_slab = NULL;
1500     self->mem_cache_slab = NULL;
1501     self->device_slab = NULL;
1502     self->oldest_slab = NULL;
1503     self->newest_slab = NULL;
1504
1505     if (self->reader_slab) {
1506         free_slab(self->reader_slab);
1507         self->reader_slab = NULL;
1508     }
1509
1510     for (slice = self->part_slices; slice; slice = next_slice) {
1511         next_slice = slice->next;
1512         g_free(slice->filename);
1513         g_free(slice);
1514     }
1515
1516     if (self->part_header)
1517         dumpfile_free(self->part_header);
1518
1519     if (self->disk_cache_read_fd != -1)
1520         close(self->disk_cache_read_fd); /* ignore error */
1521     if (self->disk_cache_write_fd != -1)
1522         close(self->disk_cache_write_fd); /* ignore error */
1523
1524     if (self->device)
1525         g_object_unref(self->device);
1526
1527     /* chain up */
1528     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1529 }
1530
1531 static void
1532 class_init(
1533     XferDestTaperSplitterClass * selfc)
1534 {
1535     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1536     XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
1537     GObjectClass *goc = G_OBJECT_CLASS(selfc);
1538     static xfer_element_mech_pair_t mech_pairs[] = {
1539         { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 1},
1540         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1541     };
1542
1543     klass->start = start_impl;
1544     klass->cancel = cancel_impl;
1545     klass->push_buffer = push_buffer_impl;
1546     xdt_klass->start_part = start_part_impl;
1547     xdt_klass->use_device = use_device_impl;
1548     xdt_klass->cache_inform = cache_inform_impl;
1549     xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
1550     goc->finalize = finalize_impl;
1551
1552     klass->perl_class = "Amanda::Xfer::Dest::Taper::Splitter";
1553     klass->mech_pairs = mech_pairs;
1554
1555     parent_class = g_type_class_peek_parent(selfc);
1556 }
1557
1558 static GType
1559 xfer_dest_taper_splitter_get_type (void)
1560 {
1561     static GType type = 0;
1562
1563     if G_UNLIKELY(type == 0) {
1564         static const GTypeInfo info = {
1565             sizeof (XferDestTaperSplitterClass),
1566             (GBaseInitFunc) NULL,
1567             (GBaseFinalizeFunc) NULL,
1568             (GClassInitFunc) class_init,
1569             (GClassFinalizeFunc) NULL,
1570             NULL /* class_data */,
1571             sizeof (XferDestTaperSplitter),
1572             0 /* n_preallocs */,
1573             (GInstanceInitFunc) instance_init,
1574             NULL
1575         };
1576
1577         type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperSplitter", &info, 0);
1578     }
1579
1580     return type;
1581 }
1582
1583 /*
1584  * Constructor
1585  */
1586
1587 XferElement *
1588 xfer_dest_taper_splitter(
1589     Device *first_device,
1590     size_t max_memory,
1591     guint64 part_size,
1592     gboolean use_mem_cache,
1593     const char *disk_cache_dirname)
1594 {
1595     XferDestTaperSplitter *self = (XferDestTaperSplitter *)g_object_new(XFER_DEST_TAPER_SPLITTER_TYPE, NULL);
1596
1597     self->max_memory = max_memory;
1598     self->part_size = part_size;
1599     self->partnum = 1;
1600     self->device = first_device;
1601     g_object_ref(self->device);
1602
1603     /* pick only one caching mechanism, caller! */
1604     g_assert(!use_mem_cache || !disk_cache_dirname);
1605
1606     /* and if part size is zero, then we don't do any caching */
1607     if (part_size == 0) {
1608         g_assert(!use_mem_cache && !disk_cache_dirname);
1609     }
1610
1611     self->use_mem_cache = use_mem_cache;
1612     if (disk_cache_dirname) {
1613         self->disk_cache_dirname = g_strdup(disk_cache_dirname);
1614
1615         self->part_slices = g_new0(FileSlice, 1);
1616         self->part_slices->filename = NULL; /* indicates "use disk_cache_read_fd" */
1617         self->part_slices->offset = 0;
1618         self->part_slices->length = 0; /* will be filled in in start_part */
1619     }
1620
1621     /* calculate the device-dependent parameters */
1622     self->block_size = first_device->block_size;
1623
1624     /* The slab size should be large enough to justify the overhead of all
1625      * of the mutexes, but it needs to be small enough to have a few slabs
1626      * available so that the threads are not constantly waiting on one
1627      * another.  The choice is sixteen blocks, not more than a quarter of
1628      * the part size, and not more than 10MB.  If we're not using the mem
1629      * cache, then avoid exceeding max_memory by keeping the slab size less
1630      * than a quarter of max_memory. */
1631
1632     self->slab_size = self->block_size * 16;
1633     if (self->part_size)
1634         self->slab_size = MIN(self->slab_size, self->part_size / 4);
1635     self->slab_size = MIN(self->slab_size, 10*1024*1024);
1636     if (!self->use_mem_cache)
1637         self->slab_size = MIN(self->slab_size, self->max_memory / 4);
1638
1639     /* round slab size up to the nearest multiple of the block size */
1640     self->slab_size =
1641         ((self->slab_size + self->block_size - 1) / self->block_size) * self->block_size;
1642
1643     /* round part size up to a multiple of the slab size */
1644     if (self->part_size != 0) {
1645         self->slabs_per_part = (self->part_size + self->slab_size - 1) / self->slab_size;
1646         self->part_size = self->slabs_per_part * self->slab_size;
1647     } else {
1648         self->slabs_per_part = 0;
1649     }
1650
1651     /* fill in the file slice's length, now that we know the real part size */
1652     if (self->disk_cache_dirname)
1653         self->part_slices->length = self->part_size;
1654
1655     if (self->use_mem_cache) {
1656         self->max_slabs = self->slabs_per_part;
1657     } else {
1658         self->max_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
1659     }
1660
1661     /* Note that max_slabs == 1 will cause deadlocks, due to some assumptions in
1662         * alloc_slab, so we check here that it's at least 2. */
1663     if (self->max_slabs < 2)
1664         self->max_slabs = 2;
1665
1666     DBG(1, "using slab_size %zu and max_slabs %ju", self->slab_size, (uintmax_t)self->max_slabs);
1667
1668     return XFER_ELEMENT(self);
1669 }