Imported Upstream version 3.2.0
[debian/amanda] / device-src / xfer-dest-taper-cacher.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amanda.h"
23 #include "amxfer.h"
24 #include "xfer-device.h"
25 #include "arglist.h"
26 #include "conffile.h"
27
28 /* A transfer destination that writes an entire dumpfile to one or more files
29  * on one or more devices, caching each part so that it can be rewritten on a
30  * subsequent volume in the event of an unexpected EOM.   This is designed to
31  * work in concert with Amanda::Taper::Scribe. */
32
33 /* Future Plans:
34  * - capture EOF early enough to avoid wasting a tape when the part size is an even multiple of the volume size - maybe reader thread can just go back and tag previous slab with EOF in that case?
35  * - use mmap to make the disk-cacher thread unnecessary, if supported, by simply mapping slabs into the disk cache file
36  * - can we find a way to fall back to mem_cache when the disk cache gets ENOSPC? Does it even make sense to try, since this would change the part size?
37  * - distinguish some permanent device errors and do not retry the part? (this will be a change of behavior)
38  */
39
40 /*
41  * Slabs
42  *
43  * Slabs are larger than blocks, and are the unit on which the element
44  * operates.  They are designed to be a few times larger than a block, to
45  * achieve a corresponding reduction in the number of locks and unlocks used
46  * per block, and similar reduction in the the amount of memory overhead
47  * required.
48  */
49
50 typedef struct Slab {
51     struct Slab *next;
52
53     /* counts incoming pointers: the preceding slab's 'next' pointer, and pointers
54      * from any processes operating on the slab */
55     gint refcount;
56
57     /* number of this slab in the sequence, global to this element's lifetime.
58      * Since this counts slabs, which are about 1M, this can address 16
59      * yottabytes of data before wrapping. */
60     guint64 serial;
61
62     /* slab size; this is only less than the element's slab size if the
63      * transfer is at EOF. */
64     gsize size;
65
66     /* base of the slab_size buffer */
67     gpointer base;
68 } Slab;
69
70 /*
71  * Xfer Dest Taper
72  */
73
74 static GObjectClass *parent_class = NULL;
75
76 typedef struct XferDestTaperCacher {
77     XferDestTaper __parent__;
78
79     /* object parameters
80      *
81      * These values are supplied to the constructor, and can be assumed
82      * constant for the lifetime of the element.
83      */
84
85     /* maximum buffer space to use for streaming; this is unrelated to the
86      * fallback_splitsize */
87     gsize max_memory;
88
89     /* split buffering info; if we're doing memory buffering, use_mem_cache is
90      * true; if we're doing disk buffering, disk_cache_dirname is non-NULL and
91      * contains the (allocated) filename of the cache file.  In any
92      * case, part_size gives the desired part size.  If part_size is zero, then
93      * no splitting takes place (so part_size is effectively infinite). */
94     gboolean use_mem_cache;
95     char *disk_cache_dirname;
96     guint64 part_size; /* (bytes) */
97
98     /*
99      * threads
100      */
101
102     /* The thread doing the actual writes to tape; this also handles buffering
103      * for streaming */
104     GThread *device_thread;
105
106     /* The thread writing slabs to the disk cache, if any */
107     GThread *disk_cache_thread;
108
109     /* slab train
110      *
111      * All in-memory data is contained in a linked list called the "slab
112      * train".  Various components are operating simultaneously at different
113      * points in this train.  Data from the upstream XferElement is appended to
114      * the head of the train, and the device thread follows along behind,
115      * writing data to the device.  When caching parts in memory, the slab
116      * train just grows to eventually contain the whole part.  When using an
117      * on-disk cache, the disk cache thread writes the tail of the train to
118      * disk, freeing slabs to be re-used at the head of the train.  Some
119      * careful coordination of these components allows them to operate as
120      * independently as possible within the limits of the user's configuration.
121      *
122      * Slabs are rarely, if ever, freed: the oldest_slab reference generally
123      * ensures that all slabs have refcount > 0, and this pointer is only
124      * advanced when re-using slabs that have been flushed to the disk cache or
125      * when freeing slabs after completion of the transfer. */
126
127     /* pointers into the slab train are all protected by this mutex.  Note that
128      * the slabs themselves can be manipulated without this lock; it's only
129      * when changing the pointers that the mutex must be held.  Furthermore, a
130      * foo_slab variable which is not NULL will not be changed except by its
131      * controlling thread (disk_cacher_slab is controlled by disk_cache_thread,
132      * and device_slab is controlled by device_thread).  This means that a
133      * controlling thread can drop the slab_mutex once it has ensured its slab
134      * is non-NULL.
135      *
136      * Slab_cond is notified when a new slab is made available from the reader.
137      * Slab_free_cond is notified when a slab becomes available for
138      * reallocation.
139      *
140      * Any thread waiting on either condition variable should also check
141      * elt->cancelled, and act appropriately if awakened in a cancelled state.
142      */
143     GMutex *slab_mutex; GCond *slab_cond; GCond *slab_free_cond;
144
145     /* slabs in progress by each thread, or NULL if the thread is waiting on
146      * slab_cond.  These can only be changed by their respective threads, except
147      * when they are NULL (in which case the reader will point them to a new
148      * slab and signal the slab_cond). */
149     Slab *volatile disk_cacher_slab;
150     Slab *volatile mem_cache_slab;
151     Slab *volatile device_slab;
152
153     /* tail and head of the slab train */
154     Slab *volatile oldest_slab;
155     Slab *volatile newest_slab;
156
157     /* thread-specific information
158      *
159      * These values are only used by one thread, and thus are not
160      * subject to any locking or concurrency constraints.
161      */
162
163     /* slab in progress by the reader (not in the slab train) */
164     Slab *reader_slab;
165
166     /* the serial to be assigned to reader_slab */
167     guint64 next_serial;
168
169     /* bytes written to the device in this part */
170     guint64 bytes_written;
171
172     /* bytes written to the device in the current slab */
173     guint64 slab_bytes_written;
174
175     /* element state
176      *
177      * "state" includes all of the variables below (including device
178      * parameters).  Note that the device_thread reads state values when
179      * paused is false without locking the mutex.  No other thread should
180      * change state when the element is not paused.
181      *
182      * If there is every any reason to lock both mutexes, acquire this one
183      * first.
184      *
185      * Any thread waiting on this condition variable should also check
186      * elt->cancelled, and act appropriately if awakened in a cancelled state.
187      */
188     GMutex *state_mutex;
189     GCond *state_cond;
190     volatile gboolean paused;
191
192     /* The device to write to, and the header to write to it */
193     Device *volatile device;
194     dumpfile_t *volatile part_header;
195
196     /* If true, when unpaused, the device should begin at the beginning of the
197      * cache; if false, it should proceed to the next part. */
198     volatile gboolean retry_part;
199
200     /* If true, the previous part was completed successfully; only used for
201      * assertions */
202     volatile gboolean last_part_successful;
203
204     /* part number in progress */
205     volatile guint64 partnum;
206
207     /* if true, the main thread should *not* call start_part */
208     volatile gboolean no_more_parts;
209
210     /* the first serial in this part, and the serial to stop at */
211     volatile guint64 part_first_serial, part_stop_serial;
212
213     /* read and write file descriptors for the disk cache file, in use by the
214      * disk_cache_thread.  If these are -1, wait on state_cond until they are
215      * not; once the value is set, it will not change. */
216     volatile int disk_cache_read_fd;
217     volatile int disk_cache_write_fd;
218
219     /* device parameters
220      *
221      * Note that these values aren't known until we begin writing to the
222      * device; if block_size is zero, threads should block on state_cond until
223      * it is nonzero, at which point all of the dependent fields will have
224      * their correct values.  Note that, since this value never changes after
225      * it has been set, it is safe to read block_size without acquiring the
226      * mutext first. */
227
228     /* this device's need for streaming */
229     StreamingRequirement streaming;
230
231     /* block size expected by the target device */
232     gsize block_size;
233
234     /* Size of a slab - some multiple of the block size */
235     gsize slab_size;
236
237     /* maximum number of slabs allowed, rounded up to the next whole slab.  If
238      * using mem cache, this is the equivalent of part_size bytes; otherwise,
239      * it is equivalent to max_memory bytes. */
240     guint64 max_slabs;
241
242     /* number of slabs in a part */
243     guint64 slabs_per_part;
244 } XferDestTaperCacher;
245
246 static GType xfer_dest_taper_cacher_get_type(void);
247 #define XFER_DEST_TAPER_CACHER_TYPE (xfer_dest_taper_cacher_get_type())
248 #define XFER_DEST_TAPER_CACHER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_cacher_get_type(), XferDestTaperCacher)
249 #define XFER_DEST_TAPER_CACHER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_cacher_get_type(), XferDestTaperCacher const)
250 #define XFER_DEST_TAPER_CACHER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_cacher_get_type(), XferDestTaperCacherClass)
251 #define IS_XFER_DEST_TAPER_CACHER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_cacher_get_type ())
252 #define XFER_DEST_TAPER_CACHER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_cacher_get_type(), XferDestTaperCacherClass)
253
254 typedef struct {
255     XferDestTaperClass __parent__;
256
257 } XferDestTaperCacherClass;
258
259 /*
260  * Debug logging
261  */
262
263 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
264 static void
265 _xdt_dbg(const char *fmt, ...)
266 {
267     va_list argp;
268     char msg[1024];
269
270     arglist_start(argp, fmt);
271     g_vsnprintf(msg, sizeof(msg), fmt, argp);
272     arglist_end(argp);
273     g_debug("XDT thd-%p: %s", g_thread_self(), msg);
274 }
275
276 /*
277  * Slab handling
278  */
279
280 /* called with the slab_mutex held, this gets a new slab to write into, with
281  * refcount 1.  It will block if max_memory slabs are already in use, and mem
282  * caching is not in use, although allocation may be forced with the 'force'
283  * parameter.
284  *
285  * If the memory allocation cannot be satisfied due to system constraints,
286  * this function will send an XMSG_ERROR, wait for the transfer to cancel, and
287  * return NULL.  If the transfer is cancelled by some other means while this
288  * function is blocked awaiting a free slab, it will return NULL.
289  *
290  * @param self: the xfer element
291  * @param force: allocate a slab even if it would exceed max_memory
292  * @returns: a new slab, or NULL if the xfer is cancelled
293  */
294 static Slab *
295 alloc_slab(
296     XferDestTaperCacher *self,
297     gboolean force)
298 {
299     XferElement *elt = XFER_ELEMENT(self);
300     Slab *rv;
301
302     DBG(8, "alloc_slab(force=%d)", force);
303     if (!force) {
304         /* throttle based on maximum number of extant slabs */
305         while (G_UNLIKELY(
306             !elt->cancelled &&
307             self->oldest_slab &&
308             self->newest_slab &&
309             self->oldest_slab->refcount > 1 &&
310             (self->newest_slab->serial - self->oldest_slab->serial + 1) >= self->max_slabs)) {
311             DBG(9, "waiting for available slab");
312             g_cond_wait(self->slab_free_cond, self->slab_mutex);
313         }
314         DBG(9, "done waiting");
315
316         if (elt->cancelled)
317             return NULL;
318     }
319
320     /* if the oldest slab doesn't have anything else pointing to it, just use
321      * that */
322     if (self->oldest_slab && self->oldest_slab->refcount == 1) {
323         rv = self->oldest_slab;
324         self->oldest_slab = rv->next;
325     } else {
326         rv = g_new0(Slab, 1);
327         rv->refcount = 1;
328         rv->base = g_try_malloc(self->slab_size);
329         if (!rv->base) {
330             g_free(rv);
331             xfer_cancel_with_error(XFER_ELEMENT(self),
332                 _("Could not allocate %zu bytes of memory"), self->slab_size);
333             return NULL;
334         }
335     }
336
337     rv->next = NULL;
338     rv->size = 0;
339     return rv;
340 }
341
342 /* called with the slab_mutex held, this frees the given slave entirely.  The
343  * reference count is not consulted.
344  *
345  * @param slab: slab to free
346  */
347 static void
348 free_slab(
349     Slab *slab)
350 {
351     if (slab) {
352         if (slab->base)
353             g_free(slab->base);
354         g_free(slab);
355     }
356 }
357
358 /* called with the slab_mutex held, this decrements the refcount of the
359  * given slab
360  *
361  * @param self: xfer element
362  * @param slab: slab to free
363  */
364 static inline void
365 unref_slab(
366     XferDestTaperCacher *self,
367     Slab *slab)
368 {
369     g_assert(slab->refcount > 1);
370     slab->refcount--;
371     if (G_UNLIKELY(slab->refcount == 1 && slab == self->oldest_slab)) {
372         g_cond_broadcast(self->slab_free_cond);
373     } else if (G_UNLIKELY(slab->refcount == 0)) {
374         free_slab(slab);
375     }
376 }
377
378 /* called with the slab_mutex held, this sets *slabp to *slabp->next,
379  * adjusting refcounts appropriately, and returns the new value
380  *
381  * @param self: xfer element
382  * @param slabp: slab pointer to advance
383  * @returns: new value of *slabp
384  */
385 static inline Slab *
386 next_slab(
387     XferDestTaperCacher *self,
388     Slab * volatile *slabp)
389 {
390     Slab *next;
391
392     if (!slabp || !*slabp)
393         return NULL;
394
395     next = (*slabp)->next;
396     if (next)
397         next->refcount++;
398     if (*slabp)
399         unref_slab(self, *slabp);
400     *slabp = next;
401
402     return next;
403 }
404
405 /*
406  * Disk Cache
407  *
408  * The disk cache thread's job is simply to follow along the slab train at
409  * maximum speed, writing slabs to the disk cache file. */
410
411 static gboolean
412 open_disk_cache_fds(
413     XferDestTaperCacher *self)
414 {
415     char * filename;
416
417     g_assert(self->disk_cache_read_fd == -1);
418     g_assert(self->disk_cache_write_fd == -1);
419
420     g_mutex_lock(self->state_mutex);
421     filename = g_strdup_printf("%s/amanda-split-buffer-XXXXXX",
422                                self->disk_cache_dirname);
423
424     self->disk_cache_write_fd = g_mkstemp(filename);
425     if (self->disk_cache_write_fd < 0) {
426         g_mutex_unlock(self->state_mutex);
427         xfer_cancel_with_error(XFER_ELEMENT(self),
428             _("Error creating cache file in '%s': %s"), self->disk_cache_dirname,
429             strerror(errno));
430         g_free(filename);
431         return FALSE;
432     }
433
434     /* open a separate copy of the file for reading */
435     self->disk_cache_read_fd = open(filename, O_RDONLY);
436     if (self->disk_cache_read_fd < 0) {
437         g_mutex_unlock(self->state_mutex);
438         xfer_cancel_with_error(XFER_ELEMENT(self),
439             _("Error opening cache file in '%s': %s"), self->disk_cache_dirname,
440             strerror(errno));
441         g_free(filename);
442         return FALSE;
443     }
444
445     /* signal anyone waiting for this value */
446     g_cond_broadcast(self->state_cond);
447     g_mutex_unlock(self->state_mutex);
448
449     /* errors from unlink are not fatal */
450     if (unlink(filename) < 0) {
451         g_warning("While unlinking '%s': %s (ignored)", filename, strerror(errno));
452     }
453
454     g_free(filename);
455     return TRUE;
456 }
457
458 static gpointer
459 disk_cache_thread(
460     gpointer data)
461 {
462     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(data);
463     XferElement *elt = XFER_ELEMENT(self);
464
465     DBG(1, "(this is the disk cache thread)");
466
467     /* open up the disk cache file first */
468     if (!open_disk_cache_fds(self))
469         return NULL;
470
471     while (!elt->cancelled) {
472         gboolean eof, eop;
473         guint64 stop_serial;
474         Slab *slab;
475
476         /* rewind to the begining of the disk cache file */
477         if (lseek(self->disk_cache_write_fd, 0, SEEK_SET) == -1) {
478             xfer_cancel_with_error(XFER_ELEMENT(self),
479                 _("Error seeking disk cache file in '%s': %s"), self->disk_cache_dirname,
480                 strerror(errno));
481             return NULL;
482         }
483
484         /* we need to sit and wait for the next part to begin, first making sure
485          * we have a slab .. */
486         g_mutex_lock(self->slab_mutex);
487         while (!self->disk_cacher_slab && !elt->cancelled) {
488             DBG(9, "waiting for a disk slab");
489             g_cond_wait(self->slab_cond, self->slab_mutex);
490         }
491         DBG(9, "done waiting");
492         g_mutex_unlock(self->slab_mutex);
493
494         if (elt->cancelled)
495             break;
496
497         /* this slab is now fixed until this thread changes it */
498         g_assert(self->disk_cacher_slab != NULL);
499
500         /* and then making sure we're ready to write that slab. */
501         g_mutex_lock(self->state_mutex);
502         while ((self->paused ||
503                     (self->disk_cacher_slab && self->disk_cacher_slab->serial > self->part_first_serial))
504                 && !elt->cancelled) {
505             DBG(9, "waiting for the disk slab to become current and un-paused");
506             g_cond_wait(self->state_cond, self->state_mutex);
507         }
508         DBG(9, "done waiting");
509
510         stop_serial = self->part_stop_serial;
511         g_mutex_unlock(self->state_mutex);
512
513         if (elt->cancelled)
514             break;
515
516         g_mutex_lock(self->slab_mutex);
517         slab = self->disk_cacher_slab;
518         eop = eof = FALSE;
519         while (!eop && !eof) {
520             /* if we're at the head of the slab train, wait for more data */
521             while (!self->disk_cacher_slab && !elt->cancelled) {
522                 DBG(9, "waiting for the next disk slab");
523                 g_cond_wait(self->slab_cond, self->slab_mutex);
524             }
525             DBG(9, "done waiting");
526
527             if (elt->cancelled)
528                 break;
529
530             /* drop the lock long enough to write the slab; the refcount
531              * protects the slab during this time */
532             slab = self->disk_cacher_slab;
533             g_mutex_unlock(self->slab_mutex);
534
535             if (full_write(self->disk_cache_write_fd, slab->base, slab->size) < slab->size) {
536                 xfer_cancel_with_error(XFER_ELEMENT(self),
537                     _("Error writing to disk cache file in '%s': %s"), self->disk_cache_dirname,
538                     strerror(errno));
539                 return NULL;
540             }
541
542             eof = slab->size < self->slab_size;
543             eop = (slab->serial + 1 == stop_serial);
544
545             g_mutex_lock(self->slab_mutex);
546             next_slab(self, &self->disk_cacher_slab);
547         }
548         g_mutex_unlock(self->slab_mutex);
549
550         if (eof) {
551             /* this very thread should have just set this value to NULL, and since it's
552              * EOF, there should not be any 'next' slab */
553             g_assert(self->disk_cacher_slab == NULL);
554             break;
555         }
556     }
557
558     return NULL;
559 }
560
561 /*
562  * Device Thread
563  *
564  * The device thread's job is to write slabs to self->device, applying whatever
565  * streaming algorithms are required.  It does this by alternately getting the
566  * next slab from a "slab source" and writing that slab to the device.  Most of
567  * the slab source functions assume that self->slab_mutex is held, but may
568  * release the mutex (either explicitly or via a g_cond_wait), so it is not
569  * valid to assume that any slab pointers remain unchanged after a slab_source
570  * function invocation.
571  */
572
573 /* This struct tracks the current state of the slab source */
574 typedef struct slab_source_state {
575     /* temporary slab used for reading from disk */
576     Slab *tmp_slab;
577
578     /* next serial to read from disk */
579     guint64 next_serial;
580 } slab_source_state;
581
582 /* Called with the slab_mutex held, this function pre-buffers enough data into the slab
583  * train to meet the device's streaming needs. */
584 static gboolean
585 slab_source_prebuffer(
586     XferDestTaperCacher *self)
587 {
588     XferElement *elt = XFER_ELEMENT(self);
589     guint64 prebuffer_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
590     guint64 i;
591     Slab *slab;
592
593     /* always prebuffer at least one slab, even if max_memory is 0 */
594     if (prebuffer_slabs == 0) prebuffer_slabs = 1;
595
596     /* pre-buffering is not necessary if we're retrying a part */
597     if (self->retry_part)
598         return TRUE;
599
600     /* pre-buffering means waiting until we have at least prebuffer_slabs in the
601      * slab train ahead of the device_slab, or the newest slab is at EOF. */
602     while (!elt->cancelled) {
603         gboolean eof_or_eop = FALSE;
604
605         /* see if there's enough data yet */
606         for (i = 0, slab = self->device_slab;
607              i < prebuffer_slabs && slab != NULL;
608              i++, slab = slab->next) {
609             eof_or_eop = (slab->size < self->slab_size)
610                 || (slab->serial + 1 == self->part_stop_serial);
611         }
612         if (i == prebuffer_slabs || eof_or_eop)
613             break;
614
615         DBG(9, "prebuffering wait");
616         g_cond_wait(self->slab_cond, self->slab_mutex);
617     }
618     DBG(9, "done waiting");
619
620     if (elt->cancelled) {
621         self->last_part_successful = FALSE;
622         self->no_more_parts = TRUE;
623         return FALSE;
624     }
625
626     return TRUE;
627 }
628
629 /* Called without the slab_mutex held, this function sets up a new slab_source_state
630  * object based on the configuratino of the Xfer Element. */
631 static inline gboolean
632 slab_source_setup(
633     XferDestTaperCacher *self,
634     slab_source_state *state)
635 {
636     XferElement *elt = XFER_ELEMENT(self);
637     state->tmp_slab = NULL;
638     state->next_serial = G_MAXUINT64;
639
640     /* if we're to retry the part, rewind to the beginning */
641     if (self->retry_part) {
642         if (self->use_mem_cache) {
643             /* rewind device_slab to point to the mem_cache_slab */
644             g_mutex_lock(self->slab_mutex);
645             if (self->device_slab)
646                 unref_slab(self, self->device_slab);
647             self->device_slab = self->mem_cache_slab;
648             if(self->device_slab != NULL)
649                 self->device_slab->refcount++;
650             g_mutex_unlock(self->slab_mutex);
651         } else {
652             g_mutex_lock(self->slab_mutex);
653
654             /* we're going to read from the disk cache until we get to the oldest useful
655              * slab in memory, so it had best exist */
656             g_assert(self->oldest_slab != NULL);
657
658             /* point device_slab at the oldest slab we have */
659             self->oldest_slab->refcount++;
660             if (self->device_slab)
661                 unref_slab(self, self->device_slab);
662             self->device_slab = self->oldest_slab;
663
664             /* and increment it until it is at least the slab we want to start from */
665             while (self->device_slab->serial < self->part_first_serial) {
666                 next_slab(self, &self->device_slab);
667             }
668
669             /* get a new, temporary slab for use while reading */
670             state->tmp_slab = alloc_slab(self, TRUE);
671
672             g_mutex_unlock(self->slab_mutex);
673
674             if (!state->tmp_slab) {
675                 /* if we couldn't allocate a slab, then we're cancelled, so we're done with
676                  * this part. */
677                 self->last_part_successful = FALSE;
678                 self->no_more_parts = TRUE;
679                 return FALSE;
680             }
681
682             state->tmp_slab->size = self->slab_size;
683             state->next_serial = self->part_first_serial;
684
685             /* We're reading from the disk cache, so we need a file descriptor
686              * to read from, so wait for disk_cache_thread to open the
687              * disk_cache_read_fd */
688             g_assert(self->disk_cache_dirname);
689             g_mutex_lock(self->state_mutex);
690             while (self->disk_cache_read_fd == -1 && !elt->cancelled) {
691                 DBG(9, "waiting for disk_cache_thread to set disk_cache_read_fd");
692                 g_cond_wait(self->state_cond, self->state_mutex);
693             }
694             DBG(9, "done waiting");
695             g_mutex_unlock(self->state_mutex);
696
697             if (elt->cancelled) {
698                 self->last_part_successful = FALSE;
699                 self->no_more_parts = TRUE;
700                 return FALSE;
701             }
702
703             /* rewind to the beginning */
704             if (lseek(self->disk_cache_read_fd, 0, SEEK_SET) == -1) {
705                 xfer_cancel_with_error(XFER_ELEMENT(self),
706                     _("Could not seek disk cache file for reading: %s"),
707                     strerror(errno));
708                 self->last_part_successful = FALSE;
709                 self->no_more_parts = TRUE;
710                 return FALSE;
711             }
712         }
713     }
714
715     /* if the streaming mode requires it, pre-buffer */
716     if (self->streaming == STREAMING_REQUIREMENT_DESIRED ||
717         self->streaming == STREAMING_REQUIREMENT_REQUIRED) {
718         gboolean prebuffer_ok;
719
720         g_mutex_lock(self->slab_mutex);
721         prebuffer_ok = slab_source_prebuffer(self);
722         g_mutex_unlock(self->slab_mutex);
723         if (!prebuffer_ok)
724             return FALSE;
725     }
726
727     return TRUE;
728 }
729
730 /* Called with the slab_mutex held, this does the work of slab_source_get when
731  * reading from the disk cache.  Note that this explicitly releases the
732  * slab_mutex during execution - do not depend on any protected values across a
733  * call to this function.  The mutex is held on return. */
734 static Slab *
735 slab_source_get_from_disk(
736     XferDestTaperCacher *self,
737     slab_source_state *state,
738     guint64 serial)
739 {
740     XferDestTaper *xdt = XFER_DEST_TAPER(self);
741     gsize bytes_read;
742
743     g_assert(state->next_serial == serial);
744
745     /* NOTE: slab_mutex is held, but we don't need it here, so release it for the moment */
746     g_mutex_unlock(self->slab_mutex);
747
748     bytes_read = full_read(self->disk_cache_read_fd,
749                            state->tmp_slab->base,
750                            self->slab_size);
751     if ((gsize)bytes_read < self->slab_size) {
752         xfer_cancel_with_error(XFER_ELEMENT(xdt),
753             _("Error reading disk cache: %s"),
754             errno? strerror(errno) : _("Unexpected EOF"));
755         goto fatal_error;
756     }
757
758     state->tmp_slab->serial = state->next_serial++;
759     g_mutex_lock(self->slab_mutex);
760     return state->tmp_slab;
761
762 fatal_error:
763     g_mutex_lock(self->slab_mutex);
764     self->last_part_successful = FALSE;
765     self->no_more_parts = TRUE;
766     return NULL;
767 }
768
769 /* Called with the slab_mutex held, this function gets the slab with the given
770  * serial number, waiting if necessary for that slab to be available.  Note
771  * that the slab_mutex may be released during execution, although it is always
772  * held on return. */
773 static inline Slab *
774 slab_source_get(
775     XferDestTaperCacher *self,
776     slab_source_state *state,
777     guint64 serial)
778 {
779     XferElement *elt = (XferElement *)self;
780
781     /* device_slab is only NULL if we're following the slab train, so wait for
782      * a new slab */
783     if (!self->device_slab) {
784         /* if the streaming mode requires it, pre-buffer */
785         if (self->streaming == STREAMING_REQUIREMENT_DESIRED) {
786             if (!slab_source_prebuffer(self))
787                 return NULL;
788
789             /* fall through to make sure we have a device_slab;
790              * slab_source_prebuffer doesn't guarantee device_slab != NULL */
791         }
792
793         while (self->device_slab == NULL && !elt->cancelled) {
794             DBG(9, "waiting for the next slab");
795             g_cond_wait(self->slab_cond, self->slab_mutex);
796         }
797         DBG(9, "done waiting");
798
799         if (elt->cancelled)
800             goto fatal_error;
801     }
802
803     /* device slab is now set, and only this thread can change it */
804     g_assert(self->device_slab);
805
806     /* if the next item in the device slab is the one we want, then the job is
807      * pretty easy */
808     if (G_LIKELY(serial == self->device_slab->serial))
809         return self->device_slab;
810
811     /* otherwise, we're reading from disk */
812     g_assert(serial < self->device_slab->serial);
813     return slab_source_get_from_disk(self, state, serial);
814
815 fatal_error:
816     self->last_part_successful = FALSE;
817     self->no_more_parts = TRUE;
818     return NULL;
819 }
820
821 /* Called without the slab_mutex held, this frees any resources assigned
822  * to the slab source state */
823 static inline void
824 slab_source_free(
825     XferDestTaperCacher *self,
826     slab_source_state *state)
827 {
828     if (state->tmp_slab) {
829         g_mutex_lock(self->slab_mutex);
830         free_slab(state->tmp_slab);
831         g_mutex_unlock(self->slab_mutex);
832     }
833 }
834
835 /* Called without the slab_mutex, this writes the given slab to the device */
836 static gboolean
837 write_slab_to_device(
838     XferDestTaperCacher *self,
839     Slab *slab)
840 {
841     XferElement *elt = XFER_ELEMENT(self);
842     gpointer buf = slab->base;
843     gsize remaining = slab->size;
844
845     while (remaining && !elt->cancelled) {
846         gsize write_size = MIN(self->block_size, remaining);
847         gboolean ok;
848         ok = device_write_block(self->device, write_size, buf);
849         if (!ok) {
850             self->bytes_written += slab->size - remaining;
851
852             /* TODO: handle an error without is_eom
853              * differently/fatally? or at least with a warning? */
854             self->last_part_successful = FALSE;
855             self->no_more_parts = FALSE;
856             return FALSE;
857         }
858
859         buf += write_size;
860         self->slab_bytes_written += write_size;
861         remaining -= write_size;
862     }
863
864     if (elt->cancelled) {
865         self->last_part_successful = FALSE;
866         self->no_more_parts = TRUE;
867         return FALSE;
868     }
869
870     self->bytes_written += slab->size;
871     self->slab_bytes_written = 0;
872     return TRUE;
873 }
874
875 static XMsg *
876 device_thread_write_part(
877     XferDestTaperCacher *self)
878 {
879     GTimer *timer = g_timer_new();
880     XMsg *msg;
881     slab_source_state src_state;
882     guint64 serial, stop_serial;
883     gboolean eof = FALSE;
884     int fileno = 0;
885
886     self->last_part_successful = FALSE;
887     self->bytes_written = 0;
888
889     if (!device_start_file(self->device, self->part_header))
890         goto part_done;
891
892     dumpfile_free(self->part_header);
893     self->part_header = NULL;
894
895     fileno = self->device->file;
896     g_assert(fileno > 0);
897
898     if (!slab_source_setup(self, &src_state))
899         goto part_done;
900
901     g_timer_start(timer);
902
903     stop_serial = self->part_stop_serial;
904     g_mutex_lock(self->slab_mutex);
905     for (serial = self->part_first_serial; serial < stop_serial && !eof; serial++) {
906         Slab *slab = slab_source_get(self, &src_state, serial);
907         DBG(8, "writing slab %p (serial %ju) to device", slab, serial);
908         g_mutex_unlock(self->slab_mutex);
909         if (!slab)
910             goto part_done;
911
912         eof = slab->size < self->slab_size;
913
914         if (!write_slab_to_device(self, slab))
915             goto part_done;
916
917         g_mutex_lock(self->slab_mutex);
918         DBG(8, "wrote slab %p to device", slab);
919
920         /* if we're reading from the slab train, advance self->device_slab. */
921         if (slab == self->device_slab) {
922             next_slab(self, &self->device_slab);
923         }
924     }
925     g_mutex_unlock(self->slab_mutex);
926
927     /* if we write all of the blocks, but the finish_file fails, then likely
928      * there was some buffering going on in the device driver, and the blocks
929      * did not all make it to permanent storage -- so it's a failed part. */
930     if (!device_finish_file(self->device))
931         goto part_done;
932
933     slab_source_free(self, &src_state);
934
935     self->last_part_successful = TRUE;
936     self->no_more_parts = eof;
937
938 part_done:
939     g_timer_stop(timer);
940
941     msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
942     msg->size = self->bytes_written;
943     msg->duration = g_timer_elapsed(timer, NULL);
944     msg->partnum = self->partnum;
945     msg->fileno = fileno;
946     msg->successful = self->last_part_successful;
947     msg->eom = !self->last_part_successful;
948     msg->eof = self->no_more_parts;
949
950     /* time runs backward on some test boxes, so make sure this is positive */
951     if (msg->duration < 0) msg->duration = 0;
952
953     if (self->last_part_successful)
954         self->partnum++;
955
956     g_timer_destroy(timer);
957
958     return msg;
959 }
960
961 /* Called with the status_mutex held, this frees any cached data for
962  * a successful part */
963 static void
964 release_part_cache(
965     XferDestTaperCacher *self)
966 {
967     if (self->use_mem_cache && self->mem_cache_slab) {
968         /* move up the mem_cache_slab to point to the first slab in
969          * the next part (probably NULL at this point), so that the
970          * reader can continue reading data into the new mem cache
971          * immediately. */
972         g_mutex_lock(self->slab_mutex);
973         unref_slab(self, self->mem_cache_slab);
974         self->mem_cache_slab = self->device_slab;
975         if (self->mem_cache_slab)
976             self->mem_cache_slab->refcount++;
977         g_mutex_unlock(self->slab_mutex);
978     }
979
980     /* the disk cache gets reused automatically (rewinding to offset 0), so
981      * there's nothing else to do */
982 }
983
984 static gpointer
985 device_thread(
986     gpointer data)
987 {
988     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(data);
989     XferElement *elt = XFER_ELEMENT(self);
990     XMsg *msg;
991
992     DBG(1, "(this is the device thread)");
993
994     if (self->disk_cache_dirname) {
995         GError *error = NULL;
996         self->disk_cache_thread = g_thread_create(disk_cache_thread, (gpointer)self, TRUE, &error);
997         if (!self->disk_cache_thread) {
998             g_critical(_("Error creating new thread: %s (%s)"),
999                 error->message, errno? strerror(errno) : _("no error code"));
1000         }
1001     }
1002
1003     /* This is the outer loop, that loops once for each split part written to
1004      * tape. */
1005     g_mutex_lock(self->state_mutex);
1006     while (1) {
1007         /* wait until the main thread un-pauses us, and check that we have
1008          * the relevant device info available (block_size) */
1009         while (self->paused && !elt->cancelled) {
1010             DBG(9, "waiting to be unpaused");
1011             g_cond_wait(self->state_cond, self->state_mutex);
1012         }
1013         DBG(9, "done waiting");
1014
1015         if (elt->cancelled)
1016             break;
1017
1018         g_mutex_unlock(self->state_mutex);
1019         self->slab_bytes_written = 0;
1020         DBG(2, "beginning to write part");
1021         msg = device_thread_write_part(self);
1022         DBG(2, "done writing part");
1023         g_mutex_lock(self->state_mutex);
1024
1025         /* release any cache of a successful part, but don't bother at EOF */
1026         if (msg->successful && !msg->eof)
1027             release_part_cache(self);
1028
1029         xfer_queue_message(elt->xfer, msg);
1030
1031         /* if this is the last part, we're done with the part loop */
1032         if (self->no_more_parts)
1033             break;
1034
1035         /* pause ourselves and await instructions from the main thread */
1036         self->paused = TRUE;
1037     }
1038
1039     g_mutex_unlock(self->state_mutex);
1040
1041     /* make sure the other thread is done before we send XMSG_DONE */
1042     if (self->disk_cache_thread)
1043         g_thread_join(self->disk_cache_thread);
1044
1045     /* tell the main thread we're done */
1046     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1047
1048     return NULL;
1049 }
1050
1051 /*
1052  * Class mechanics
1053  */
1054
1055 /* called with the slab_mutex held, this adds the reader_slab to the head of
1056  * the slab train and signals the condition variable. */
1057 static void
1058 add_reader_slab_to_train(
1059     XferDestTaperCacher *self)
1060 {
1061     Slab *slab = self->reader_slab;
1062
1063     DBG(3, "adding slab of new data to the slab train");
1064
1065     if (self->newest_slab) {
1066         self->newest_slab->next = slab;
1067         slab->refcount++;
1068
1069         self->newest_slab->refcount--;
1070     }
1071
1072     self->newest_slab = slab; /* steal reader_slab's ref */
1073     self->reader_slab = NULL;
1074
1075     /* steal reader_slab's reference for newest_slab */
1076
1077     /* if any of the other pointers are waiting for this slab, update them */
1078     if (self->disk_cache_dirname && !self->disk_cacher_slab) {
1079         self->disk_cacher_slab = slab;
1080         slab->refcount++;
1081     }
1082     if (self->use_mem_cache && !self->mem_cache_slab) {
1083         self->mem_cache_slab = slab;
1084         slab->refcount++;
1085     }
1086     if (!self->device_slab) {
1087         self->device_slab = slab;
1088         slab->refcount++;
1089     }
1090     if (!self->oldest_slab) {
1091         self->oldest_slab = slab;
1092         slab->refcount++;
1093     }
1094
1095     g_cond_broadcast(self->slab_cond);
1096 }
1097
1098 static void
1099 push_buffer_impl(
1100     XferElement *elt,
1101     gpointer buf,
1102     size_t size)
1103 {
1104     XferDestTaperCacher *self = (XferDestTaperCacher *)elt;
1105     gpointer p;
1106
1107     DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
1108
1109     /* do nothing if cancelled */
1110     if (G_UNLIKELY(elt->cancelled)) {
1111         goto free_and_finish;
1112     }
1113
1114     /* handle EOF */
1115     if (G_UNLIKELY(buf == NULL)) {
1116         /* send off the last, probably partial slab */
1117         g_mutex_lock(self->slab_mutex);
1118
1119         /* create a new, empty slab if necessary */
1120         if (!self->reader_slab) {
1121             self->reader_slab = alloc_slab(self, FALSE);
1122             if (!self->reader_slab) {
1123                 /* we've been cancelled while waiting for a slab */
1124                 g_mutex_unlock(self->slab_mutex);
1125
1126                 /* wait for the xfer to cancel, so we don't get another buffer
1127                  * pushed to us (and do so *without* the mutex held) */
1128                 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1129
1130                 goto free_and_finish;
1131             }
1132             self->reader_slab->serial = self->next_serial++;
1133         }
1134
1135         add_reader_slab_to_train(self);
1136         g_mutex_unlock(self->slab_mutex);
1137
1138         goto free_and_finish;
1139     }
1140
1141     p = buf;
1142     while (1) {
1143         gsize copy_size;
1144
1145         /* get a fresh slab, if needed */
1146         if (G_UNLIKELY(!self->reader_slab) || self->reader_slab->size == self->slab_size) {
1147             g_mutex_lock(self->slab_mutex);
1148             if (self->reader_slab)
1149                 add_reader_slab_to_train(self);
1150             self->reader_slab = alloc_slab(self, FALSE);
1151             if (!self->reader_slab) {
1152                 /* we've been cancelled while waiting for a slab */
1153                 g_mutex_unlock(self->slab_mutex);
1154
1155                 /* wait for the xfer to cancel, so we don't get another buffer
1156                  * pushed to us (and do so *without* the mutex held) */
1157                 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1158
1159                 goto free_and_finish;
1160             }
1161             self->reader_slab->serial = self->next_serial++;
1162             g_mutex_unlock(self->slab_mutex);
1163         }
1164
1165         if (size == 0)
1166             break;
1167
1168         copy_size = MIN(self->slab_size - self->reader_slab->size, size);
1169         memcpy(self->reader_slab->base+self->reader_slab->size, p, copy_size);
1170
1171         self->reader_slab->size += copy_size;
1172         p += copy_size;
1173         size -= copy_size;
1174     }
1175
1176 free_and_finish:
1177     if (buf)
1178         g_free(buf);
1179 }
1180
1181 /*
1182  * Element mechanics
1183  */
1184
1185 static gboolean
1186 start_impl(
1187     XferElement *elt)
1188 {
1189     XferDestTaperCacher *self = (XferDestTaperCacher *)elt;
1190     GError *error = NULL;
1191
1192     self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
1193     if (!self->device_thread) {
1194         g_critical(_("Error creating new thread: %s (%s)"),
1195             error->message, errno? strerror(errno) : _("no error code"));
1196     }
1197
1198     return TRUE;
1199 }
1200
1201 static gboolean
1202 cancel_impl(
1203     XferElement *elt,
1204     gboolean expect_eof)
1205 {
1206     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(elt);
1207     gboolean rv;
1208
1209     /* chain up first */
1210     rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
1211
1212     /* then signal all of our condition variables, so that threads waiting on them
1213      * wake up and see elt->cancelled. */
1214     g_mutex_lock(self->state_mutex);
1215     g_cond_broadcast(self->state_cond);
1216     g_mutex_unlock(self->state_mutex);
1217
1218     g_mutex_lock(self->slab_mutex);
1219     g_cond_broadcast(self->slab_cond);
1220     g_cond_broadcast(self->slab_free_cond);
1221     g_mutex_unlock(self->slab_mutex);
1222
1223     return rv;
1224 }
1225
1226 static void
1227 start_part_impl(
1228     XferDestTaper *xdt,
1229     gboolean retry_part,
1230     dumpfile_t *header)
1231 {
1232     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(xdt);
1233
1234     g_assert(self->device != NULL);
1235     g_assert(!self->device->in_file);
1236     g_assert(header != NULL);
1237
1238     DBG(1, "start_part(retry_part=%d)", retry_part);
1239
1240     g_mutex_lock(self->state_mutex);
1241     g_assert(self->paused);
1242     g_assert(!self->no_more_parts);
1243
1244     if (self->part_header)
1245         dumpfile_free(self->part_header);
1246     self->part_header = dumpfile_copy(header);
1247
1248     if (retry_part) {
1249         g_assert(!self->last_part_successful);
1250         self->retry_part = TRUE;
1251     } else {
1252         g_assert(self->last_part_successful);
1253         self->retry_part = FALSE;
1254         self->part_first_serial = self->part_stop_serial;
1255         if (self->part_size != 0) {
1256             self->part_stop_serial = self->part_first_serial + self->slabs_per_part;
1257         } else {
1258             /* set part_stop_serial to an effectively infinite value */
1259             self->part_stop_serial = G_MAXUINT64;
1260         }
1261     }
1262
1263     DBG(1, "unpausing");
1264     self->paused = FALSE;
1265     g_cond_broadcast(self->state_cond);
1266
1267     g_mutex_unlock(self->state_mutex);
1268 }
1269
1270 static void
1271 use_device_impl(
1272     XferDestTaper *xdt,
1273     Device *device)
1274 {
1275     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(xdt);
1276     GValue val;
1277
1278     /* short-circuit if nothing is changing */
1279     if (self->device == device)
1280         return;
1281
1282     g_mutex_lock(self->state_mutex);
1283     if (self->device)
1284         g_object_unref(self->device);
1285     self->device = device;
1286     g_object_ref(device);
1287
1288     /* get this new device's streaming requirements */
1289     bzero(&val, sizeof(val));
1290     if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
1291         || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
1292         g_warning("Couldn't get streaming type for %s", self->device->device_name);
1293         self->streaming = STREAMING_REQUIREMENT_REQUIRED;
1294     } else {
1295         self->streaming = g_value_get_enum(&val);
1296     }
1297     g_value_unset(&val);
1298
1299     /* check that the blocksize hasn't changed */
1300     if (self->block_size != device->block_size) {
1301         g_mutex_unlock(self->state_mutex);
1302         xfer_cancel_with_error(XFER_ELEMENT(self),
1303             _("All devices used by the taper must have the same block size"));
1304         return;
1305     }
1306     g_mutex_unlock(self->state_mutex);
1307 }
1308
1309 static guint64
1310 get_part_bytes_written_impl(
1311     XferDestTaper *xdt)
1312 {
1313     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(xdt);
1314
1315     /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
1316      * partial write to the 64-bit value on a 32-bit system).  This is ok for
1317      * the moment, as it's only informational, but be warned. */
1318     return self->bytes_written + self->slab_bytes_written;
1319 }
1320
1321 static void
1322 instance_init(
1323     XferElement *elt)
1324 {
1325     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(elt);
1326     elt->can_generate_eof = FALSE;
1327
1328     self->state_mutex = g_mutex_new();
1329     self->state_cond = g_cond_new();
1330     self->slab_mutex = g_mutex_new();
1331     self->slab_cond = g_cond_new();
1332     self->slab_free_cond = g_cond_new();
1333
1334     self->last_part_successful = TRUE;
1335     self->paused = TRUE;
1336     self->part_stop_serial = 0;
1337     self->disk_cache_read_fd = -1;
1338     self->disk_cache_write_fd = -1;
1339 }
1340
1341 static void
1342 finalize_impl(
1343     GObject * obj_self)
1344 {
1345     XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(obj_self);
1346     Slab *slab, *next_slab;
1347
1348     if (self->disk_cache_dirname)
1349         g_free(self->disk_cache_dirname);
1350
1351     g_mutex_free(self->state_mutex);
1352     g_cond_free(self->state_cond);
1353
1354     g_mutex_free(self->slab_mutex);
1355     g_cond_free(self->slab_cond);
1356     g_cond_free(self->slab_free_cond);
1357
1358     /* free the slab train, without reference to the refcounts */
1359     for (slab = self->oldest_slab; slab != NULL; slab = next_slab) {
1360         next_slab = slab->next;
1361         free_slab(slab);
1362     }
1363     self->disk_cacher_slab = NULL;
1364     self->mem_cache_slab = NULL;
1365     self->device_slab = NULL;
1366     self->oldest_slab = NULL;
1367     self->newest_slab = NULL;
1368
1369     if (self->reader_slab) {
1370         free_slab(self->reader_slab);
1371         self->reader_slab = NULL;
1372     }
1373
1374     if (self->part_header)
1375         dumpfile_free(self->part_header);
1376
1377     if (self->disk_cache_read_fd != -1)
1378         close(self->disk_cache_read_fd); /* ignore error */
1379     if (self->disk_cache_write_fd != -1)
1380         close(self->disk_cache_write_fd); /* ignore error */
1381
1382     if (self->device)
1383         g_object_unref(self->device);
1384
1385     /* chain up */
1386     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1387 }
1388
1389 static void
1390 class_init(
1391     XferDestTaperCacherClass * selfc)
1392 {
1393     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1394     XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
1395     GObjectClass *goc = G_OBJECT_CLASS(selfc);
1396     static xfer_element_mech_pair_t mech_pairs[] = {
1397         { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 1},
1398         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1399     };
1400
1401     klass->start = start_impl;
1402     klass->cancel = cancel_impl;
1403     klass->push_buffer = push_buffer_impl;
1404     xdt_klass->start_part = start_part_impl;
1405     xdt_klass->use_device = use_device_impl;
1406     xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
1407     goc->finalize = finalize_impl;
1408
1409     klass->perl_class = "Amanda::Xfer::Dest::Taper::Cacher";
1410     klass->mech_pairs = mech_pairs;
1411
1412     parent_class = g_type_class_peek_parent(selfc);
1413 }
1414
1415 static GType
1416 xfer_dest_taper_cacher_get_type (void)
1417 {
1418     static GType type = 0;
1419
1420     if G_UNLIKELY(type == 0) {
1421         static const GTypeInfo info = {
1422             sizeof (XferDestTaperCacherClass),
1423             (GBaseInitFunc) NULL,
1424             (GBaseFinalizeFunc) NULL,
1425             (GClassInitFunc) class_init,
1426             (GClassFinalizeFunc) NULL,
1427             NULL /* class_data */,
1428             sizeof (XferDestTaperCacher),
1429             0 /* n_preallocs */,
1430             (GInstanceInitFunc) instance_init,
1431             NULL
1432         };
1433
1434         type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperCacher", &info, 0);
1435     }
1436
1437     return type;
1438 }
1439
1440 /*
1441  * Constructor
1442  */
1443
1444 XferElement *
1445 xfer_dest_taper_cacher(
1446     Device *first_device,
1447     size_t max_memory,
1448     guint64 part_size,
1449     gboolean use_mem_cache,
1450     const char *disk_cache_dirname)
1451 {
1452     XferDestTaperCacher *self = (XferDestTaperCacher *)g_object_new(XFER_DEST_TAPER_CACHER_TYPE, NULL);
1453
1454     self->max_memory = max_memory;
1455     self->part_size = part_size;
1456     self->partnum = 1;
1457     self->device = first_device;
1458     g_object_ref(self->device);
1459
1460     /* pick only one caching mechanism, caller! */
1461     if (use_mem_cache)
1462         g_assert(!disk_cache_dirname);
1463     if (disk_cache_dirname)
1464         g_assert(!use_mem_cache);
1465
1466     /* and if part size is zero, then we don't do any caching */
1467     g_assert(part_size != 0 || (!use_mem_cache && !disk_cache_dirname));
1468
1469     self->use_mem_cache = use_mem_cache;
1470     if (disk_cache_dirname)
1471         self->disk_cache_dirname = g_strdup(disk_cache_dirname);
1472
1473     /* calculate the device-dependent parameters */
1474     self->block_size = first_device->block_size;
1475
1476     /* The slab size should be large enough to justify the overhead of all
1477      * of the mutexes, but it needs to be small enough to have a few slabs
1478      * available so that the threads are not constantly waiting on one
1479      * another.  The choice is sixteen blocks, not more than a quarter of
1480      * the part size, and not more than 10MB.  If we're not using the mem
1481      * cache, then avoid exceeding max_memory by keeping the slab size less
1482      * than a quarter of max_memory. */
1483
1484     self->slab_size = self->block_size * 16;
1485     if (self->part_size)
1486         self->slab_size = MIN(self->slab_size, self->part_size / 4);
1487     self->slab_size = MIN(self->slab_size, 10*1024*1024);
1488     if (!use_mem_cache)
1489         self->slab_size = MIN(self->slab_size, self->max_memory / 4);
1490
1491     /* round slab size up to the nearest multiple of the block size */
1492     self->slab_size =
1493         ((self->slab_size + self->block_size - 1) / self->block_size) * self->block_size;
1494
1495     /* round part size up to a multiple of the slab size */
1496     if (self->part_size != 0) {
1497         self->slabs_per_part = (self->part_size + self->slab_size - 1) / self->slab_size;
1498         self->part_size = self->slabs_per_part * self->slab_size;
1499     } else {
1500         self->slabs_per_part = 0;
1501     }
1502
1503     /* set max_slabs */
1504     if (use_mem_cache) {
1505         self->max_slabs = self->slabs_per_part; /* increase max_slabs to serve as mem buf */
1506     } else {
1507         self->max_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
1508     }
1509
1510     /* Note that max_slabs == 1 will cause deadlocks, due to some assumptions in
1511         * alloc_slab, so we check here that it's at least 2. */
1512     if (self->max_slabs < 2)
1513         self->max_slabs = 2;
1514
1515     DBG(1, "using slab_size %zu and max_slabs %ju", self->slab_size, (uintmax_t)self->max_slabs);
1516
1517     return XFER_ELEMENT(self);
1518 }