Imported Upstream version 3.3.3
[debian/amanda] / device-src / xfer-dest-taper-cacher.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18  *
19  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21  */
22
23 #include "amanda.h"
24 #include "amxfer.h"
25 #include "xfer-device.h"
26 #include "arglist.h"
27 #include "conffile.h"
28
29 /* A transfer destination that writes an entire dumpfile to one or more files
30  * on one or more devices, caching each part so that it can be rewritten on a
31  * subsequent volume in the event of an unexpected EOM.   This is designed to
32  * work in concert with Amanda::Taper::Scribe. */
33
34 /* Future Plans:
35  * - capture EOF early enough to avoid wasting a tape when the part size is an even multiple of the volume size - maybe reader thread can just go back and tag previous slab with EOF in that case?
36  * - use mmap to make the disk-cacher thread unnecessary, if supported, by simply mapping slabs into the disk cache file
37  * - can we find a way to fall back to mem_cache when the disk cache gets ENOSPC? Does it even make sense to try, since this would change the part size?
38  * - distinguish some permanent device errors and do not retry the part? (this will be a change of behavior)
39  */
40
41 /*
42  * Slabs
43  *
44  * Slabs are larger than blocks, and are the unit on which the element
45  * operates.  They are designed to be a few times larger than a block, to
46  * achieve a corresponding reduction in the number of locks and unlocks used
47  * per block, and similar reduction in the the amount of memory overhead
48  * required.
49  */
50
51 typedef struct Slab {
52     struct Slab *next;
53
54     /* counts incoming pointers: the preceding slab's 'next' pointer, and pointers
55      * from any processes operating on the slab */
56     gint refcount;
57
58     /* number of this slab in the sequence, global to this element's lifetime.
59      * Since this counts slabs, which are about 1M, this can address 16
60      * yottabytes of data before wrapping. */
61     guint64 serial;
62
63     /* slab size; this is only less than the element's slab size if the
64      * transfer is at EOF. */
65     gsize size;
66
67     /* base of the slab_size buffer */
68     gpointer base;
69 } Slab;
70
71 /*
72  * Xfer Dest Taper
73  */
74
75 static GObjectClass *parent_class = NULL;
76
77 typedef struct XferDestTaperCacher {
78     XferDestTaper __parent__;
79
80     /* object parameters
81      *
82      * These values are supplied to the constructor, and can be assumed
83      * constant for the lifetime of the element.
84      */
85
86     /* maximum buffer space to use for streaming; this is unrelated to the
87      * fallback_splitsize */
88     gsize max_memory;
89
90     /* split buffering info; if we're doing memory buffering, use_mem_cache is
91      * true; if we're doing disk buffering, disk_cache_dirname is non-NULL and
92      * contains the (allocated) filename of the cache file.  In any
93      * case, part_size gives the desired part size.  If part_size is zero, then
94      * no splitting takes place (so part_size is effectively infinite). */
95     gboolean use_mem_cache;
96     char *disk_cache_dirname;
97     guint64 part_size; /* (bytes) */
98
99     /*
100      * threads
101      */
102
103     /* The thread doing the actual writes to tape; this also handles buffering
104      * for streaming */
105     GThread *device_thread;
106
107     /* The thread writing slabs to the disk cache, if any */
108     GThread *disk_cache_thread;
109
110     /* slab train
111      *
112      * All in-memory data is contained in a linked list called the "slab
113      * train".  Various components are operating simultaneously at different
114      * points in this train.  Data from the upstream XferElement is appended to
115      * the head of the train, and the device thread follows along behind,
116      * writing data to the device.  When caching parts in memory, the slab
117      * train just grows to eventually contain the whole part.  When using an
118      * on-disk cache, the disk cache thread writes the tail of the train to
119      * disk, freeing slabs to be re-used at the head of the train.  Some
120      * careful coordination of these components allows them to operate as
121      * independently as possible within the limits of the user's configuration.
122      *
123      * Slabs are rarely, if ever, freed: the oldest_slab reference generally
124      * ensures that all slabs have refcount > 0, and this pointer is only
125      * advanced when re-using slabs that have been flushed to the disk cache or
126      * when freeing slabs after completion of the transfer. */
127
128     /* pointers into the slab train are all protected by this mutex.  Note that
129      * the slabs themselves can be manipulated without this lock; it's only
130      * when changing the pointers that the mutex must be held.  Furthermore, a
131      * foo_slab variable which is not NULL will not be changed except by its
132      * controlling thread (disk_cacher_slab is controlled by disk_cache_thread,
133      * and device_slab is controlled by device_thread).  This means that a
134      * controlling thread can drop the slab_mutex once it has ensured its slab
135      * is non-NULL.
136      *
137      * Slab_cond is notified when a new slab is made available from the reader.
138      * Slab_free_cond is notified when a slab becomes available for
139      * reallocation.
140      *
141      * Any thread waiting on either condition variable should also check
142      * elt->cancelled, and act appropriately if awakened in a cancelled state.
143      */
144     GMutex *slab_mutex; GCond *slab_cond; GCond *slab_free_cond;
145
146     /* slabs in progress by each thread, or NULL if the thread is waiting on
147      * slab_cond.  These can only be changed by their respective threads, except
148      * when they are NULL (in which case the reader will point them to a new
149      * slab and signal the slab_cond). */
150     Slab *volatile disk_cacher_slab;
151     Slab *volatile mem_cache_slab;
152     Slab *volatile device_slab;
153
154     /* tail and head of the slab train */
155     Slab *volatile oldest_slab;
156     Slab *volatile newest_slab;
157
158     /* thread-specific information
159      *
160      * These values are only used by one thread, and thus are not
161      * subject to any locking or concurrency constraints.
162      */
163
164     /* slab in progress by the reader (not in the slab train) */
165     Slab *reader_slab;
166
167     /* the serial to be assigned to reader_slab */
168     guint64 next_serial;
169
170     /* bytes written to the device in this part */
171     guint64 bytes_written;
172
173     /* bytes written to the device in the current slab */
174     guint64 slab_bytes_written;
175
176     /* element state
177      *
178      * "state" includes all of the variables below (including device
179      * parameters).  Note that the device_thread reads state values when
180      * paused is false without locking the mutex.  No other thread should
181      * change state when the element is not paused.
182      *
183      * If there is every any reason to lock both mutexes, acquire this one
184      * first.
185      *
186      * Any thread waiting on this condition variable should also check
187      * elt->cancelled, and act appropriately if awakened in a cancelled state.
188      */
189     GMutex *state_mutex;
190     GCond *state_cond;
191     volatile gboolean paused;
192
193     /* The device to write to, and the header to write to it */
194     Device *volatile device;
195     dumpfile_t *volatile part_header;
196
197     /* If true, when unpaused, the device should begin at the beginning of the
198      * cache; if false, it should proceed to the next part. */
199     volatile gboolean retry_part;
200
201     /* If true, the previous part was completed successfully; only used for
202      * assertions */
203     volatile gboolean last_part_successful;
204
205     /* part number in progress */
206     volatile guint64 partnum;
207
208     /* if true, the main thread should *not* call start_part */
209     volatile gboolean no_more_parts;
210
211     /* the first serial in this part, and the serial to stop at */
212     volatile guint64 part_first_serial, part_stop_serial;
213
214     /* read and write file descriptors for the disk cache file, in use by the
215      * disk_cache_thread.  If these are -1, wait on state_cond until they are
216      * not; once the value is set, it will not change. */
217     volatile int disk_cache_read_fd;
218     volatile int disk_cache_write_fd;
219
220     /* device parameters
221      *
222      * Note that these values aren't known until we begin writing to the
223      * device; if block_size is zero, threads should block on state_cond until
224      * it is nonzero, at which point all of the dependent fields will have
225      * their correct values.  Note that, since this value never changes after
226      * it has been set, it is safe to read block_size without acquiring the
227      * mutext first. */
228
229     /* this device's need for streaming */
230     StreamingRequirement streaming;
231
232     /* block size expected by the target device */
233     gsize block_size;
234
235     /* Size of a slab - some multiple of the block size */
236     gsize slab_size;
237
238     /* maximum number of slabs allowed, rounded up to the next whole slab.  If
239      * using mem cache, this is the equivalent of part_size bytes; otherwise,
240      * it is equivalent to max_memory bytes. */
241     guint64 max_slabs;
242
243     /* number of slabs in a part */
244     guint64 slabs_per_part;
245 } XferDestTaperCacher;
246
247 static GType xfer_dest_taper_cacher_get_type(void);
248 #define XFER_DEST_TAPER_CACHER_TYPE (xfer_dest_taper_cacher_get_type())
249 #define XFER_DEST_TAPER_CACHER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_cacher_get_type(), XferDestTaperCacher)
250 #define XFER_DEST_TAPER_CACHER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_cacher_get_type(), XferDestTaperCacher const)
251 #define XFER_DEST_TAPER_CACHER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_cacher_get_type(), XferDestTaperCacherClass)
252 #define IS_XFER_DEST_TAPER_CACHER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_cacher_get_type ())
253 #define XFER_DEST_TAPER_CACHER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_cacher_get_type(), XferDestTaperCacherClass)
254
255 typedef struct {
256     XferDestTaperClass __parent__;
257
258 } XferDestTaperCacherClass;
259
260 /*
261  * Debug logging
262  */
263
264 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
265 static void
266 _xdt_dbg(const char *fmt, ...)
267 {
268     va_list argp;
269     char msg[1024];
270
271     arglist_start(argp, fmt);
272     g_vsnprintf(msg, sizeof(msg), fmt, argp);
273     arglist_end(argp);
274     g_debug("XDT: %s", msg);
275 }
276
277 /*
278  * Slab handling
279  */
280
281 /* called with the slab_mutex held, this gets a new slab to write into, with
282  * refcount 1.  It will block if max_memory slabs are already in use, and mem
283  * caching is not in use, although allocation may be forced with the 'force'
284  * parameter.
285  *
286  * If the memory allocation cannot be satisfied due to system constraints,
287  * this function will send an XMSG_ERROR, wait for the transfer to cancel, and
288  * return NULL.  If the transfer is cancelled by some other means while this
289  * function is blocked awaiting a free slab, it will return NULL.
290  *
291  * @param self: the xfer element
292  * @param force: allocate a slab even if it would exceed max_memory
293  * @returns: a new slab, or NULL if the xfer is cancelled
294  */
295 static Slab *
296 alloc_slab(
297     XferDestTaperCacher *self,
298     gboolean force)
299 {
300     XferElement *elt = XFER_ELEMENT(self);
301     Slab *rv;
302
303     DBG(8, "alloc_slab(force=%d)", force);
304     if (!force) {
305         /* throttle based on maximum number of extant slabs */
306         while (G_UNLIKELY(
307             !elt->cancelled &&
308             self->oldest_slab &&
309             self->newest_slab &&
310             self->oldest_slab->refcount > 1 &&
311             (self->newest_slab->serial - self->oldest_slab->serial + 1) >= self->max_slabs)) {
312             DBG(9, "waiting for available slab");
313             g_cond_wait(self->slab_free_cond, self->slab_mutex);
314         }
315         DBG(9, "done waiting");
316
317         if (elt->cancelled)
318             return NULL;
319     }
320
321     /* if the oldest slab doesn't have anything else pointing to it, just use
322      * that */
323     if (self->oldest_slab && self->oldest_slab->refcount == 1) {
324         rv = self->oldest_slab;
325         self->oldest_slab = rv->next;
326     } else {
327         rv = g_new0(Slab, 1);
328         rv->refcount = 1;
329         rv->base = g_try_malloc(self->slab_size);
330         if (!rv->base) {
331             xfer_cancel_with_error(XFER_ELEMENT(self),
332                 _("Could not allocate %zu bytes of memory: %s"), self->slab_size, strerror(errno));
333             g_free(rv);
334             return NULL;
335         }
336     }
337
338     rv->next = NULL;
339     rv->size = 0;
340     return rv;
341 }
342
343 /* called with the slab_mutex held, this frees the given slave entirely.  The
344  * reference count is not consulted.
345  *
346  * @param slab: slab to free
347  */
348 static void
349 free_slab(
350     Slab *slab)
351 {
352     if (slab) {
353         if (slab->base)
354             g_free(slab->base);
355         g_free(slab);
356     }
357 }
358
359 /* called with the slab_mutex held, this decrements the refcount of the
360  * given slab
361  *
362  * @param self: xfer element
363  * @param slab: slab to free
364  */
365 static inline void
366 unref_slab(
367     XferDestTaperCacher *self,
368     Slab *slab)
369 {
370     g_assert(slab->refcount > 1);
371     slab->refcount--;
372     if (G_UNLIKELY(slab->refcount == 1 && slab == self->oldest_slab)) {
373         g_cond_broadcast(self->slab_free_cond);
374     } else if (G_UNLIKELY(slab->refcount == 0)) {
375         free_slab(slab);
376     }
377 }
378
379 /* called with the slab_mutex held, this sets *slabp to *slabp->next,
380  * adjusting refcounts appropriately, and returns the new value
381  *
382  * @param self: xfer element
383  * @param slabp: slab pointer to advance
384  * @returns: new value of *slabp
385  */
386 static inline Slab *
387 next_slab(
388     XferDestTaperCacher *self,
389     Slab * volatile *slabp)
390 {
391     Slab *next;
392
393     if (!slabp || !*slabp)
394         return NULL;
395
396     next = (*slabp)->next;
397     if (next)
398         next->refcount++;
399     if (*slabp)
400         unref_slab(self, *slabp);
401     *slabp = next;
402
403     return next;
404 }
405
406 /*
407  * Disk Cache
408  *
409  * The disk cache thread's job is simply to follow along the slab train at
410  * maximum speed, writing slabs to the disk cache file. */
411
412 static gboolean
413 open_disk_cache_fds(
414     XferDestTaperCacher *self)
415 {
416     char * filename;
417
418     g_assert(self->disk_cache_read_fd == -1);
419     g_assert(self->disk_cache_write_fd == -1);
420
421     g_mutex_lock(self->state_mutex);
422     filename = g_strdup_printf("%s/amanda-split-buffer-XXXXXX",
423                                self->disk_cache_dirname);
424
425     self->disk_cache_write_fd = g_mkstemp(filename);
426     if (self->disk_cache_write_fd < 0) {
427         g_mutex_unlock(self->state_mutex);
428         xfer_cancel_with_error(XFER_ELEMENT(self),
429             _("Error creating cache file in '%s': %s"), self->disk_cache_dirname,
430             strerror(errno));
431         g_free(filename);
432         return FALSE;
433     }
434
435     /* open a separate copy of the file for reading */
436     self->disk_cache_read_fd = open(filename, O_RDONLY);
437     if (self->disk_cache_read_fd < 0) {
438         g_mutex_unlock(self->state_mutex);
439         xfer_cancel_with_error(XFER_ELEMENT(self),
440             _("Error opening cache file in '%s': %s"), self->disk_cache_dirname,
441             strerror(errno));
442         g_free(filename);
443         return FALSE;
444     }
445
446     /* signal anyone waiting for this value */
447     g_cond_broadcast(self->state_cond);
448     g_mutex_unlock(self->state_mutex);
449
450     /* errors from unlink are not fatal */
451     if (unlink(filename) < 0) {
452         g_warning("While unlinking '%s': %s (ignored)", filename, strerror(errno));
453     }
454
455     g_free(filename);
456     return TRUE;
457 }
458
459 static gpointer
460 disk_cache_thread(
461     gpointer data)
462 {
463     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(data);
464     XferElement *elt = XFER_ELEMENT(self);
465
466     DBG(1, "(this is the disk cache thread)");
467
468     /* open up the disk cache file first */
469     if (!open_disk_cache_fds(self))
470         return NULL;
471
472     while (!elt->cancelled) {
473         gboolean eof, eop;
474         guint64 stop_serial;
475         Slab *slab;
476
477         /* rewind to the begining of the disk cache file */
478         if (lseek(self->disk_cache_write_fd, 0, SEEK_SET) == -1) {
479             xfer_cancel_with_error(XFER_ELEMENT(self),
480                 _("Error seeking disk cache file in '%s': %s"), self->disk_cache_dirname,
481                 strerror(errno));
482             return NULL;
483         }
484
485         /* we need to sit and wait for the next part to begin, first making sure
486          * we have a slab .. */
487         g_mutex_lock(self->slab_mutex);
488         while (!self->disk_cacher_slab && !elt->cancelled) {
489             DBG(9, "waiting for a disk slab");
490             g_cond_wait(self->slab_cond, self->slab_mutex);
491         }
492         DBG(9, "done waiting");
493         g_mutex_unlock(self->slab_mutex);
494
495         if (elt->cancelled)
496             break;
497
498         /* this slab is now fixed until this thread changes it */
499         g_assert(self->disk_cacher_slab != NULL);
500
501         /* and then making sure we're ready to write that slab. */
502         g_mutex_lock(self->state_mutex);
503         while ((self->paused ||
504                     (self->disk_cacher_slab && self->disk_cacher_slab->serial > self->part_first_serial))
505                 && !elt->cancelled) {
506             DBG(9, "waiting for the disk slab to become current and un-paused");
507             g_cond_wait(self->state_cond, self->state_mutex);
508         }
509         DBG(9, "done waiting");
510
511         stop_serial = self->part_stop_serial;
512         g_mutex_unlock(self->state_mutex);
513
514         if (elt->cancelled)
515             break;
516
517         g_mutex_lock(self->slab_mutex);
518         slab = self->disk_cacher_slab;
519         eop = eof = FALSE;
520         while (!eop && !eof) {
521             /* if we're at the head of the slab train, wait for more data */
522             while (!self->disk_cacher_slab && !elt->cancelled) {
523                 DBG(9, "waiting for the next disk slab");
524                 g_cond_wait(self->slab_cond, self->slab_mutex);
525             }
526             DBG(9, "done waiting");
527
528             if (elt->cancelled)
529                 break;
530
531             /* drop the lock long enough to write the slab; the refcount
532              * protects the slab during this time */
533             slab = self->disk_cacher_slab;
534             g_mutex_unlock(self->slab_mutex);
535
536             if (full_write(self->disk_cache_write_fd, slab->base, slab->size) < slab->size) {
537                 xfer_cancel_with_error(XFER_ELEMENT(self),
538                     _("Error writing to disk cache file in '%s': %s"), self->disk_cache_dirname,
539                     strerror(errno));
540                 return NULL;
541             }
542
543             eof = slab->size < self->slab_size;
544             eop = (slab->serial + 1 == stop_serial);
545
546             g_mutex_lock(self->slab_mutex);
547             next_slab(self, &self->disk_cacher_slab);
548         }
549         g_mutex_unlock(self->slab_mutex);
550
551         if (eof) {
552             /* this very thread should have just set this value to NULL, and since it's
553              * EOF, there should not be any 'next' slab */
554             g_assert(self->disk_cacher_slab == NULL);
555             break;
556         }
557     }
558
559     return NULL;
560 }
561
562 /*
563  * Device Thread
564  *
565  * The device thread's job is to write slabs to self->device, applying whatever
566  * streaming algorithms are required.  It does this by alternately getting the
567  * next slab from a "slab source" and writing that slab to the device.  Most of
568  * the slab source functions assume that self->slab_mutex is held, but may
569  * release the mutex (either explicitly or via a g_cond_wait), so it is not
570  * valid to assume that any slab pointers remain unchanged after a slab_source
571  * function invocation.
572  */
573
574 /* This struct tracks the current state of the slab source */
575 typedef struct slab_source_state {
576     /* temporary slab used for reading from disk */
577     Slab *tmp_slab;
578
579     /* next serial to read from disk */
580     guint64 next_serial;
581 } slab_source_state;
582
583 /* Called with the slab_mutex held, this function pre-buffers enough data into the slab
584  * train to meet the device's streaming needs. */
585 static gboolean
586 slab_source_prebuffer(
587     XferDestTaperCacher *self)
588 {
589     XferElement *elt = XFER_ELEMENT(self);
590     guint64 prebuffer_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
591     guint64 i;
592     Slab *slab;
593
594     /* always prebuffer at least one slab, even if max_memory is 0 */
595     if (prebuffer_slabs == 0) prebuffer_slabs = 1;
596
597     /* pre-buffering is not necessary if we're retrying a part */
598     if (self->retry_part)
599         return TRUE;
600
601     /* pre-buffering means waiting until we have at least prebuffer_slabs in the
602      * slab train ahead of the device_slab, or the newest slab is at EOF. */
603     while (!elt->cancelled) {
604         gboolean eof_or_eop = FALSE;
605
606         /* see if there's enough data yet */
607         for (i = 0, slab = self->device_slab;
608              i < prebuffer_slabs && slab != NULL;
609              i++, slab = slab->next) {
610             eof_or_eop = (slab->size < self->slab_size)
611                 || (slab->serial + 1 == self->part_stop_serial);
612         }
613         if (i == prebuffer_slabs || eof_or_eop)
614             break;
615
616         DBG(9, "prebuffering wait");
617         g_cond_wait(self->slab_cond, self->slab_mutex);
618     }
619     DBG(9, "done waiting");
620
621     if (elt->cancelled) {
622         self->last_part_successful = FALSE;
623         self->no_more_parts = TRUE;
624         return FALSE;
625     }
626
627     return TRUE;
628 }
629
630 /* Called without the slab_mutex held, this function sets up a new slab_source_state
631  * object based on the configuratino of the Xfer Element. */
632 static inline gboolean
633 slab_source_setup(
634     XferDestTaperCacher *self,
635     slab_source_state *state)
636 {
637     XferElement *elt = XFER_ELEMENT(self);
638     state->tmp_slab = NULL;
639     state->next_serial = G_MAXUINT64;
640
641     /* if we're to retry the part, rewind to the beginning */
642     if (self->retry_part) {
643         if (self->use_mem_cache) {
644             /* rewind device_slab to point to the mem_cache_slab */
645             g_mutex_lock(self->slab_mutex);
646             if (self->device_slab)
647                 unref_slab(self, self->device_slab);
648             self->device_slab = self->mem_cache_slab;
649             if(self->device_slab != NULL)
650                 self->device_slab->refcount++;
651             g_mutex_unlock(self->slab_mutex);
652         } else {
653             g_mutex_lock(self->slab_mutex);
654
655             /* we're going to read from the disk cache until we get to the oldest useful
656              * slab in memory, so it had best exist */
657             g_assert(self->oldest_slab != NULL);
658
659             /* point device_slab at the oldest slab we have */
660             self->oldest_slab->refcount++;
661             if (self->device_slab)
662                 unref_slab(self, self->device_slab);
663             self->device_slab = self->oldest_slab;
664
665             /* and increment it until it is at least the slab we want to start from */
666             while (self->device_slab->serial < self->part_first_serial) {
667                 next_slab(self, &self->device_slab);
668             }
669
670             /* get a new, temporary slab for use while reading */
671             state->tmp_slab = alloc_slab(self, TRUE);
672
673             g_mutex_unlock(self->slab_mutex);
674
675             if (!state->tmp_slab) {
676                 /* if we couldn't allocate a slab, then we're cancelled, so we're done with
677                  * this part. */
678                 self->last_part_successful = FALSE;
679                 self->no_more_parts = TRUE;
680                 return FALSE;
681             }
682
683             state->tmp_slab->size = self->slab_size;
684             state->next_serial = self->part_first_serial;
685
686             /* We're reading from the disk cache, so we need a file descriptor
687              * to read from, so wait for disk_cache_thread to open the
688              * disk_cache_read_fd */
689             g_assert(self->disk_cache_dirname);
690             g_mutex_lock(self->state_mutex);
691             while (self->disk_cache_read_fd == -1 && !elt->cancelled) {
692                 DBG(9, "waiting for disk_cache_thread to set disk_cache_read_fd");
693                 g_cond_wait(self->state_cond, self->state_mutex);
694             }
695             DBG(9, "done waiting");
696             g_mutex_unlock(self->state_mutex);
697
698             if (elt->cancelled) {
699                 self->last_part_successful = FALSE;
700                 self->no_more_parts = TRUE;
701                 return FALSE;
702             }
703
704             /* rewind to the beginning */
705             if (lseek(self->disk_cache_read_fd, 0, SEEK_SET) == -1) {
706                 xfer_cancel_with_error(XFER_ELEMENT(self),
707                     _("Could not seek disk cache file for reading: %s"),
708                     strerror(errno));
709                 self->last_part_successful = FALSE;
710                 self->no_more_parts = TRUE;
711                 return FALSE;
712             }
713         }
714     }
715
716     /* if the streaming mode requires it, pre-buffer */
717     if (self->streaming == STREAMING_REQUIREMENT_DESIRED ||
718         self->streaming == STREAMING_REQUIREMENT_REQUIRED) {
719         gboolean prebuffer_ok;
720
721         g_mutex_lock(self->slab_mutex);
722         prebuffer_ok = slab_source_prebuffer(self);
723         g_mutex_unlock(self->slab_mutex);
724         if (!prebuffer_ok)
725             return FALSE;
726     }
727
728     return TRUE;
729 }
730
731 /* Called with the slab_mutex held, this does the work of slab_source_get when
732  * reading from the disk cache.  Note that this explicitly releases the
733  * slab_mutex during execution - do not depend on any protected values across a
734  * call to this function.  The mutex is held on return. */
735 static Slab *
736 slab_source_get_from_disk(
737     XferDestTaperCacher *self,
738     slab_source_state *state,
739     guint64 serial)
740 {
741     XferDestTaper *xdt = XFER_DEST_TAPER(self);
742     gsize bytes_read;
743
744     g_assert(state->next_serial == serial);
745
746     /* NOTE: slab_mutex is held, but we don't need it here, so release it for the moment */
747     g_mutex_unlock(self->slab_mutex);
748
749     bytes_read = full_read(self->disk_cache_read_fd,
750                            state->tmp_slab->base,
751                            self->slab_size);
752     if ((gsize)bytes_read < self->slab_size) {
753         xfer_cancel_with_error(XFER_ELEMENT(xdt),
754             _("Error reading disk cache: %s"),
755             errno? strerror(errno) : _("Unexpected EOF"));
756         goto fatal_error;
757     }
758
759     state->tmp_slab->serial = state->next_serial++;
760     g_mutex_lock(self->slab_mutex);
761     return state->tmp_slab;
762
763 fatal_error:
764     g_mutex_lock(self->slab_mutex);
765     self->last_part_successful = FALSE;
766     self->no_more_parts = TRUE;
767     return NULL;
768 }
769
770 /* Called with the slab_mutex held, this function gets the slab with the given
771  * serial number, waiting if necessary for that slab to be available.  Note
772  * that the slab_mutex may be released during execution, although it is always
773  * held on return. */
774 static inline Slab *
775 slab_source_get(
776     XferDestTaperCacher *self,
777     slab_source_state *state,
778     guint64 serial)
779 {
780     XferElement *elt = (XferElement *)self;
781
782     /* device_slab is only NULL if we're following the slab train, so wait for
783      * a new slab */
784     if (!self->device_slab) {
785         /* if the streaming mode requires it, pre-buffer */
786         if (self->streaming == STREAMING_REQUIREMENT_DESIRED) {
787             if (!slab_source_prebuffer(self))
788                 return NULL;
789
790             /* fall through to make sure we have a device_slab;
791              * slab_source_prebuffer doesn't guarantee device_slab != NULL */
792         }
793
794         while (self->device_slab == NULL && !elt->cancelled) {
795             DBG(9, "waiting for the next slab");
796             g_cond_wait(self->slab_cond, self->slab_mutex);
797         }
798         DBG(9, "done waiting");
799
800         if (elt->cancelled)
801             goto fatal_error;
802     }
803
804     /* device slab is now set, and only this thread can change it */
805     g_assert(self->device_slab);
806
807     /* if the next item in the device slab is the one we want, then the job is
808      * pretty easy */
809     if (G_LIKELY(serial == self->device_slab->serial))
810         return self->device_slab;
811
812     /* otherwise, we're reading from disk */
813     g_assert(serial < self->device_slab->serial);
814     return slab_source_get_from_disk(self, state, serial);
815
816 fatal_error:
817     self->last_part_successful = FALSE;
818     self->no_more_parts = TRUE;
819     return NULL;
820 }
821
822 /* Called without the slab_mutex held, this frees any resources assigned
823  * to the slab source state */
824 static inline void
825 slab_source_free(
826     XferDestTaperCacher *self,
827     slab_source_state *state)
828 {
829     if (state->tmp_slab) {
830         g_mutex_lock(self->slab_mutex);
831         free_slab(state->tmp_slab);
832         g_mutex_unlock(self->slab_mutex);
833     }
834 }
835
836 /* Called without the slab_mutex, this writes the given slab to the device */
837 static gboolean
838 write_slab_to_device(
839     XferDestTaperCacher *self,
840     Slab *slab)
841 {
842     XferElement *elt = XFER_ELEMENT(self);
843     gpointer buf = slab->base;
844     gsize remaining = slab->size;
845
846     while (remaining && !elt->cancelled) {
847         gsize write_size = MIN(self->block_size, remaining);
848         gboolean ok;
849         ok = device_write_block(self->device, write_size, buf);
850         if (!ok) {
851             self->bytes_written += slab->size - remaining;
852
853             /* TODO: handle an error without is_eom
854              * differently/fatally? or at least with a warning? */
855             self->last_part_successful = FALSE;
856             self->no_more_parts = FALSE;
857             return FALSE;
858         }
859
860         buf += write_size;
861         self->slab_bytes_written += write_size;
862         remaining -= write_size;
863     }
864
865     if (elt->cancelled) {
866         self->last_part_successful = FALSE;
867         self->no_more_parts = TRUE;
868         return FALSE;
869     }
870
871     self->bytes_written += slab->size;
872     self->slab_bytes_written = 0;
873     return TRUE;
874 }
875
876 static XMsg *
877 device_thread_write_part(
878     XferDestTaperCacher *self)
879 {
880     GTimer *timer = g_timer_new();
881     XMsg *msg;
882     slab_source_state src_state = {0, 0};
883     guint64 serial, stop_serial;
884     gboolean eof = FALSE;
885     int fileno = 0;
886     int failed = 0;
887     int slab_source_set = 0;
888
889     self->last_part_successful = FALSE;
890     self->bytes_written = 0;
891
892     if (!device_start_file(self->device, self->part_header)) {
893         failed = 1;
894         goto part_done;
895     }
896
897     dumpfile_free(self->part_header);
898     self->part_header = NULL;
899
900     fileno = self->device->file;
901     g_assert(fileno > 0);
902
903     if (!slab_source_setup(self, &src_state))
904         goto part_done;
905     slab_source_set = 1;
906
907     g_timer_start(timer);
908
909     stop_serial = self->part_stop_serial;
910     g_mutex_lock(self->slab_mutex);
911     for (serial = self->part_first_serial; serial < stop_serial && !eof; serial++) {
912         Slab *slab = slab_source_get(self, &src_state, serial);
913         DBG(8, "writing slab %p (serial %ju) to device", slab, serial);
914         g_mutex_unlock(self->slab_mutex);
915         if (!slab) {
916             failed = 1;
917             goto part_done;
918         }
919
920         eof = slab->size < self->slab_size;
921
922         if (!write_slab_to_device(self, slab)) {
923             failed = 1;
924             goto part_done;
925         }
926
927         g_mutex_lock(self->slab_mutex);
928         DBG(8, "wrote slab %p to device", slab);
929
930         /* if we're reading from the slab train, advance self->device_slab. */
931         if (slab == self->device_slab) {
932             next_slab(self, &self->device_slab);
933         }
934     }
935     g_mutex_unlock(self->slab_mutex);
936
937 part_done:
938     /* if we write all of the blocks, but the finish_file fails, then likely
939      * there was some buffering going on in the device driver, and the blocks
940      * did not all make it to permanent storage -- so it's a failed part. */
941     if (self->device->in_file && !device_finish_file(self->device))
942         failed = 1;
943
944     if (slab_source_set) {
945         slab_source_free(self, &src_state);
946     }
947
948     if (!failed) {
949         self->last_part_successful = TRUE;
950         self->no_more_parts = eof;
951     }
952
953     g_timer_stop(timer);
954
955     msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
956     msg->size = self->bytes_written;
957     msg->duration = g_timer_elapsed(timer, NULL);
958     msg->partnum = self->partnum;
959     msg->fileno = fileno;
960     msg->successful = self->last_part_successful;
961     msg->eom = !self->last_part_successful;
962     msg->eof = self->no_more_parts;
963
964     /* time runs backward on some test boxes, so make sure this is positive */
965     if (msg->duration < 0) msg->duration = 0;
966
967     if (self->last_part_successful)
968         self->partnum++;
969
970     g_timer_destroy(timer);
971
972     return msg;
973 }
974
975 /* Called with the status_mutex held, this frees any cached data for
976  * a successful part */
977 static void
978 release_part_cache(
979     XferDestTaperCacher *self)
980 {
981     if (self->use_mem_cache && self->mem_cache_slab) {
982         /* move up the mem_cache_slab to point to the first slab in
983          * the next part (probably NULL at this point), so that the
984          * reader can continue reading data into the new mem cache
985          * immediately. */
986         g_mutex_lock(self->slab_mutex);
987         unref_slab(self, self->mem_cache_slab);
988         self->mem_cache_slab = self->device_slab;
989         if (self->mem_cache_slab)
990             self->mem_cache_slab->refcount++;
991         g_mutex_unlock(self->slab_mutex);
992     }
993
994     /* the disk cache gets reused automatically (rewinding to offset 0), so
995      * there's nothing else to do */
996 }
997
998 static gpointer
999 device_thread(
1000     gpointer data)
1001 {
1002     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(data);
1003     XferElement *elt = XFER_ELEMENT(self);
1004     XMsg *msg;
1005
1006     DBG(1, "(this is the device thread)");
1007
1008     if (self->disk_cache_dirname) {
1009         GError *error = NULL;
1010         self->disk_cache_thread = g_thread_create(disk_cache_thread, (gpointer)self, TRUE, &error);
1011         if (!self->disk_cache_thread) {
1012             g_critical(_("Error creating new thread: %s (%s)"),
1013                 error->message, errno? strerror(errno) : _("no error code"));
1014         }
1015     }
1016
1017     /* This is the outer loop, that loops once for each split part written to
1018      * tape. */
1019     g_mutex_lock(self->state_mutex);
1020     while (1) {
1021         /* wait until the main thread un-pauses us, and check that we have
1022          * the relevant device info available (block_size) */
1023         while (self->paused && !elt->cancelled) {
1024             DBG(9, "waiting to be unpaused");
1025             g_cond_wait(self->state_cond, self->state_mutex);
1026         }
1027         DBG(9, "done waiting");
1028
1029         if (elt->cancelled)
1030             break;
1031
1032         g_mutex_unlock(self->state_mutex);
1033         self->slab_bytes_written = 0;
1034         DBG(2, "beginning to write part");
1035         msg = device_thread_write_part(self);
1036         DBG(2, "done writing part");
1037         g_mutex_lock(self->state_mutex);
1038
1039         /* release any cache of a successful part, but don't bother at EOF */
1040         if (msg->successful && !msg->eof)
1041             release_part_cache(self);
1042
1043         xfer_queue_message(elt->xfer, msg);
1044
1045         /* if this is the last part, we're done with the part loop */
1046         if (self->no_more_parts)
1047             break;
1048
1049         /* pause ourselves and await instructions from the main thread */
1050         self->paused = TRUE;
1051     }
1052
1053     g_mutex_unlock(self->state_mutex);
1054
1055     /* make sure the other thread is done before we send XMSG_DONE */
1056     if (self->disk_cache_thread)
1057         g_thread_join(self->disk_cache_thread);
1058
1059     /* tell the main thread we're done */
1060     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1061
1062     return NULL;
1063 }
1064
1065 /*
1066  * Class mechanics
1067  */
1068
1069 /* called with the slab_mutex held, this adds the reader_slab to the head of
1070  * the slab train and signals the condition variable. */
1071 static void
1072 add_reader_slab_to_train(
1073     XferDestTaperCacher *self)
1074 {
1075     Slab *slab = self->reader_slab;
1076
1077     DBG(3, "adding slab of new data to the slab train");
1078
1079     if (self->newest_slab) {
1080         self->newest_slab->next = slab;
1081         slab->refcount++;
1082
1083         self->newest_slab->refcount--;
1084     }
1085
1086     self->newest_slab = slab; /* steal reader_slab's ref */
1087     self->reader_slab = NULL;
1088
1089     /* steal reader_slab's reference for newest_slab */
1090
1091     /* if any of the other pointers are waiting for this slab, update them */
1092     if (self->disk_cache_dirname && !self->disk_cacher_slab) {
1093         self->disk_cacher_slab = slab;
1094         slab->refcount++;
1095     }
1096     if (self->use_mem_cache && !self->mem_cache_slab) {
1097         self->mem_cache_slab = slab;
1098         slab->refcount++;
1099     }
1100     if (!self->device_slab) {
1101         self->device_slab = slab;
1102         slab->refcount++;
1103     }
1104     if (!self->oldest_slab) {
1105         self->oldest_slab = slab;
1106         slab->refcount++;
1107     }
1108
1109     g_cond_broadcast(self->slab_cond);
1110 }
1111
1112 static void
1113 push_buffer_impl(
1114     XferElement *elt,
1115     gpointer buf,
1116     size_t size)
1117 {
1118     XferDestTaperCacher *self = (XferDestTaperCacher *)elt;
1119     gpointer p;
1120
1121     DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
1122
1123     /* do nothing if cancelled */
1124     if (G_UNLIKELY(elt->cancelled)) {
1125         goto free_and_finish;
1126     }
1127
1128     /* handle EOF */
1129     if (G_UNLIKELY(buf == NULL)) {
1130         /* send off the last, probably partial slab */
1131         g_mutex_lock(self->slab_mutex);
1132
1133         /* create a new, empty slab if necessary */
1134         if (!self->reader_slab) {
1135             self->reader_slab = alloc_slab(self, FALSE);
1136             if (!self->reader_slab) {
1137                 /* we've been cancelled while waiting for a slab */
1138                 g_mutex_unlock(self->slab_mutex);
1139
1140                 /* wait for the xfer to cancel, so we don't get another buffer
1141                  * pushed to us (and do so *without* the mutex held) */
1142                 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1143
1144                 goto free_and_finish;
1145             }
1146             self->reader_slab->serial = self->next_serial++;
1147         }
1148
1149         add_reader_slab_to_train(self);
1150         g_mutex_unlock(self->slab_mutex);
1151
1152         goto free_and_finish;
1153     }
1154
1155     p = buf;
1156     while (1) {
1157         gsize copy_size;
1158
1159         /* get a fresh slab, if needed */
1160         if (G_UNLIKELY(!self->reader_slab) || self->reader_slab->size == self->slab_size) {
1161             g_mutex_lock(self->slab_mutex);
1162             if (self->reader_slab)
1163                 add_reader_slab_to_train(self);
1164             self->reader_slab = alloc_slab(self, FALSE);
1165             if (!self->reader_slab) {
1166                 /* we've been cancelled while waiting for a slab */
1167                 g_mutex_unlock(self->slab_mutex);
1168
1169                 /* wait for the xfer to cancel, so we don't get another buffer
1170                  * pushed to us (and do so *without* the mutex held) */
1171                 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1172
1173                 goto free_and_finish;
1174             }
1175             self->reader_slab->serial = self->next_serial++;
1176             g_mutex_unlock(self->slab_mutex);
1177         }
1178
1179         if (size == 0)
1180             break;
1181
1182         copy_size = MIN(self->slab_size - self->reader_slab->size, size);
1183         memcpy(self->reader_slab->base+self->reader_slab->size, p, copy_size);
1184
1185         self->reader_slab->size += copy_size;
1186         p += copy_size;
1187         size -= copy_size;
1188     }
1189
1190 free_and_finish:
1191     if (buf)
1192         g_free(buf);
1193 }
1194
1195 /*
1196  * Element mechanics
1197  */
1198
1199 static gboolean
1200 start_impl(
1201     XferElement *elt)
1202 {
1203     XferDestTaperCacher *self = (XferDestTaperCacher *)elt;
1204     GError *error = NULL;
1205
1206     self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
1207     if (!self->device_thread) {
1208         g_critical(_("Error creating new thread: %s (%s)"),
1209             error->message, errno? strerror(errno) : _("no error code"));
1210     }
1211
1212     return TRUE;
1213 }
1214
1215 static gboolean
1216 cancel_impl(
1217     XferElement *elt,
1218     gboolean expect_eof)
1219 {
1220     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(elt);
1221     gboolean rv;
1222
1223     /* chain up first */
1224     rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
1225
1226     /* then signal all of our condition variables, so that threads waiting on them
1227      * wake up and see elt->cancelled. */
1228     g_mutex_lock(self->slab_mutex);
1229     g_cond_broadcast(self->slab_cond);
1230     g_cond_broadcast(self->slab_free_cond);
1231     g_mutex_unlock(self->slab_mutex);
1232
1233     g_mutex_lock(self->state_mutex);
1234     g_cond_broadcast(self->state_cond);
1235     g_mutex_unlock(self->state_mutex);
1236
1237     return rv;
1238 }
1239
1240 static void
1241 start_part_impl(
1242     XferDestTaper *xdt,
1243     gboolean retry_part,
1244     dumpfile_t *header)
1245 {
1246     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(xdt);
1247
1248     g_assert(self->device != NULL);
1249     g_assert(!self->device->in_file);
1250     g_assert(header != NULL);
1251
1252     DBG(1, "start_part(retry_part=%d)", retry_part);
1253
1254     g_mutex_lock(self->state_mutex);
1255     g_assert(self->paused);
1256     g_assert(!self->no_more_parts);
1257
1258     if (self->part_header)
1259         dumpfile_free(self->part_header);
1260     self->part_header = dumpfile_copy(header);
1261
1262     if (retry_part) {
1263         g_assert(!self->last_part_successful);
1264         self->retry_part = TRUE;
1265     } else {
1266         g_assert(self->last_part_successful);
1267         self->retry_part = FALSE;
1268         self->part_first_serial = self->part_stop_serial;
1269         if (self->part_size != 0) {
1270             self->part_stop_serial = self->part_first_serial + self->slabs_per_part;
1271         } else {
1272             /* set part_stop_serial to an effectively infinite value */
1273             self->part_stop_serial = G_MAXUINT64;
1274         }
1275     }
1276
1277     DBG(1, "unpausing");
1278     self->paused = FALSE;
1279     g_cond_broadcast(self->state_cond);
1280
1281     g_mutex_unlock(self->state_mutex);
1282 }
1283
1284 static void
1285 use_device_impl(
1286     XferDestTaper *xdt,
1287     Device *device)
1288 {
1289     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(xdt);
1290     GValue val;
1291
1292     /* short-circuit if nothing is changing */
1293     if (self->device == device)
1294         return;
1295
1296     g_mutex_lock(self->state_mutex);
1297     if (self->device)
1298         g_object_unref(self->device);
1299     self->device = device;
1300     g_object_ref(device);
1301
1302     /* get this new device's streaming requirements */
1303     bzero(&val, sizeof(val));
1304     if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
1305         || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
1306         g_warning("Couldn't get streaming type for %s", self->device->device_name);
1307         self->streaming = STREAMING_REQUIREMENT_REQUIRED;
1308     } else {
1309         self->streaming = g_value_get_enum(&val);
1310     }
1311     g_value_unset(&val);
1312
1313     /* check that the blocksize hasn't changed */
1314     if (self->block_size != device->block_size) {
1315         g_mutex_unlock(self->state_mutex);
1316         xfer_cancel_with_error(XFER_ELEMENT(self),
1317             _("All devices used by the taper must have the same block size"));
1318         return;
1319     }
1320     g_mutex_unlock(self->state_mutex);
1321 }
1322
1323 static guint64
1324 get_part_bytes_written_impl(
1325     XferDestTaper *xdt)
1326 {
1327     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(xdt);
1328
1329     /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
1330      * partial write to the 64-bit value on a 32-bit system).  This is ok for
1331      * the moment, as it's only informational, but be warned. */
1332     if (self->device) {
1333         return device_get_bytes_written(self->device);
1334     } else {
1335         return self->bytes_written + self->slab_bytes_written;
1336     }
1337
1338 }
1339
1340 static void
1341 instance_init(
1342     XferElement *elt)
1343 {
1344     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(elt);
1345     elt->can_generate_eof = FALSE;
1346
1347     self->state_mutex = g_mutex_new();
1348     self->state_cond = g_cond_new();
1349     self->slab_mutex = g_mutex_new();
1350     self->slab_cond = g_cond_new();
1351     self->slab_free_cond = g_cond_new();
1352
1353     self->last_part_successful = TRUE;
1354     self->paused = TRUE;
1355     self->part_stop_serial = 0;
1356     self->disk_cache_read_fd = -1;
1357     self->disk_cache_write_fd = -1;
1358 }
1359
1360 static void
1361 finalize_impl(
1362     GObject * obj_self)
1363 {
1364     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(obj_self);
1365     Slab *slab, *next_slab;
1366
1367     if (self->disk_cache_dirname)
1368         g_free(self->disk_cache_dirname);
1369
1370     g_mutex_free(self->state_mutex);
1371     g_cond_free(self->state_cond);
1372
1373     g_mutex_free(self->slab_mutex);
1374     g_cond_free(self->slab_cond);
1375     g_cond_free(self->slab_free_cond);
1376
1377     /* free the slab train, without reference to the refcounts */
1378     for (slab = self->oldest_slab; slab != NULL; slab = next_slab) {
1379         next_slab = slab->next;
1380         free_slab(slab);
1381     }
1382     self->disk_cacher_slab = NULL;
1383     self->mem_cache_slab = NULL;
1384     self->device_slab = NULL;
1385     self->oldest_slab = NULL;
1386     self->newest_slab = NULL;
1387
1388     if (self->reader_slab) {
1389         free_slab(self->reader_slab);
1390         self->reader_slab = NULL;
1391     }
1392
1393     if (self->part_header)
1394         dumpfile_free(self->part_header);
1395
1396     if (self->disk_cache_read_fd != -1)
1397         close(self->disk_cache_read_fd); /* ignore error */
1398     if (self->disk_cache_write_fd != -1)
1399         close(self->disk_cache_write_fd); /* ignore error */
1400
1401     if (self->device)
1402         g_object_unref(self->device);
1403
1404     /* chain up */
1405     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1406 }
1407
1408 static void
1409 class_init(
1410     XferDestTaperCacherClass * selfc)
1411 {
1412     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1413     XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
1414     GObjectClass *goc = G_OBJECT_CLASS(selfc);
1415     static xfer_element_mech_pair_t mech_pairs[] = {
1416         { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 1},
1417         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1418     };
1419
1420     klass->start = start_impl;
1421     klass->cancel = cancel_impl;
1422     klass->push_buffer = push_buffer_impl;
1423     xdt_klass->start_part = start_part_impl;
1424     xdt_klass->use_device = use_device_impl;
1425     xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
1426     goc->finalize = finalize_impl;
1427
1428     klass->perl_class = "Amanda::Xfer::Dest::Taper::Cacher";
1429     klass->mech_pairs = mech_pairs;
1430
1431     parent_class = g_type_class_peek_parent(selfc);
1432 }
1433
1434 static GType
1435 xfer_dest_taper_cacher_get_type (void)
1436 {
1437     static GType type = 0;
1438
1439     if G_UNLIKELY(type == 0) {
1440         static const GTypeInfo info = {
1441             sizeof (XferDestTaperCacherClass),
1442             (GBaseInitFunc) NULL,
1443             (GBaseFinalizeFunc) NULL,
1444             (GClassInitFunc) class_init,
1445             (GClassFinalizeFunc) NULL,
1446             NULL /* class_data */,
1447             sizeof (XferDestTaperCacher),
1448             0 /* n_preallocs */,
1449             (GInstanceInitFunc) instance_init,
1450             NULL
1451         };
1452
1453         type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperCacher", &info, 0);
1454     }
1455
1456     return type;
1457 }
1458
1459 /*
1460  * Constructor
1461  */
1462
1463 XferElement *
1464 xfer_dest_taper_cacher(
1465     Device *first_device,
1466     size_t max_memory,
1467     guint64 part_size,
1468     gboolean use_mem_cache,
1469     const char *disk_cache_dirname)
1470 {
1471     XferDestTaperCacher *self = (XferDestTaperCacher *)g_object_new(XFER_DEST_TAPER_CACHER_TYPE, NULL);
1472
1473     self->max_memory = max_memory;
1474     self->part_size = part_size;
1475     self->partnum = 1;
1476     self->device = first_device;
1477     g_object_ref(self->device);
1478
1479     /* pick only one caching mechanism, caller! */
1480     if (use_mem_cache)
1481         g_assert(!disk_cache_dirname);
1482     if (disk_cache_dirname)
1483         g_assert(!use_mem_cache);
1484
1485     /* and if part size is zero, then we don't do any caching */
1486     g_assert(part_size != 0 || (!use_mem_cache && !disk_cache_dirname));
1487
1488     self->use_mem_cache = use_mem_cache;
1489     if (disk_cache_dirname)
1490         self->disk_cache_dirname = g_strdup(disk_cache_dirname);
1491
1492     /* calculate the device-dependent parameters */
1493     self->block_size = first_device->block_size;
1494
1495     /* The slab size should be large enough to justify the overhead of all
1496      * of the mutexes, but it needs to be small enough to have a few slabs
1497      * available so that the threads are not constantly waiting on one
1498      * another.  The choice is sixteen blocks, not more than a quarter of
1499      * the part size, and not more than 10MB.  If we're not using the mem
1500      * cache, then avoid exceeding max_memory by keeping the slab size less
1501      * than a quarter of max_memory. */
1502
1503     self->slab_size = self->block_size * 16;
1504     if (self->part_size)
1505         self->slab_size = MIN(self->slab_size, self->part_size / 4);
1506     self->slab_size = MIN(self->slab_size, 10*1024*1024);
1507     if (!use_mem_cache)
1508         self->slab_size = MIN(self->slab_size, self->max_memory / 4);
1509
1510     /* round slab size up to the nearest multiple of the block size */
1511     self->slab_size =
1512         ((self->slab_size + self->block_size - 1) / self->block_size) * self->block_size;
1513
1514     /* round part size up to a multiple of the slab size */
1515     if (self->part_size != 0) {
1516         self->slabs_per_part = (self->part_size + self->slab_size - 1) / self->slab_size;
1517         self->part_size = self->slabs_per_part * self->slab_size;
1518     } else {
1519         self->slabs_per_part = 0;
1520     }
1521
1522     /* set max_slabs */
1523     if (use_mem_cache) {
1524         self->max_slabs = self->slabs_per_part; /* increase max_slabs to serve as mem buf */
1525     } else {
1526         self->max_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
1527     }
1528
1529     /* Note that max_slabs == 1 will cause deadlocks, due to some assumptions in
1530         * alloc_slab, so we check here that it's at least 2. */
1531     if (self->max_slabs < 2)
1532         self->max_slabs = 2;
1533
1534     DBG(1, "using slab_size %zu and max_slabs %ju", self->slab_size, (uintmax_t)self->max_slabs);
1535
1536     return XFER_ELEMENT(self);
1537 }