Imported Upstream version 3.2.0
[debian/amanda] / device-src / xfer-dest-taper-splitter.c
index c0aa5301f324388af6493eb688d60e87a479f087..7295da389c90301ecfb9222bfd7c6c6a91a18014 100644 (file)
  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
  */
 
-#include "amxfer.h"
 #include "amanda.h"
+#include "amxfer.h"
 #include "xfer-device.h"
 #include "arglist.h"
 #include "conffile.h"
 
-/* A transfer destination that writes and entire dumpfile to one or more files on one
- * or more devices.   This is designed to work in concert with Amanda::Taper::Scribe. */
-
-/* Future Plans:
- * - capture EOF early enough to avoid wasting a tape when the part size is an even multiple of the volume size - maybe reader thread can just go back and tag previous slab with EOF in that case?
- * - use mmap to make the disk-cacher thread unnecessary, if supported, by simply mapping slabs into the disk cache file
- * - can we find a way to fall back to mem_cache when the disk cache gets ENOSPC? Does it even make sense to try, since this would change the part size?
- * - distinguish some permanent device errors and do not retry the part? (this will be a change of behavior)
- */
+/* A transfer destination that writes an entire dumpfile to one or more files
+ * on one or more devices, without any caching.  This destination supports both
+ * LEOM-based splitting (in which parts are never rewound) and cache_inform-based
+ * splitting (in which rewound parts are read from holding disk). */
 
 /*
- * Slabs
+ * File Slices - Cache Information
  *
- * Slabs are larger than blocks, and are the unit on which the element
- * operates.  They are designed to be a few times larger than a block, to
- * achieve a corresponding reduction in the number of locks and unlocks used
- * per block, and similar reduction in the the amount of memory overhead
- * required.
- */
-
-typedef struct Slab {
-    struct Slab *next;
-
-    /* counts incoming pointers: the preceding slab's 'next' pointer, and pointers
-     * from any processes operating on the slab */
-    gint refcount;
-
-    /* number of this slab in the sequence, global to this element's lifetime.
-     * Since this counts slabs, which are about 1M, this can address 16
-     * yottabytes of data before wrapping. */
-    guint64 serial;
-
-    /* slab size; this is only less than the element's slab size if the
-     * transfer is at EOF. */
-    gsize size;
-
-    /* base of the slab_size buffer */
-    gpointer base;
-} Slab;
-
-/*
- * File Slices
+ * The cache_inform implementation adds cache information to a linked list of
+ * these objects, in order.  The objects are arranged in a linked list, and
+ * describe the files in which the part data is stored.  Note that we assume
+ * that slices are added *before* they are needed: the xfer element will fail
+ * if it tries to rewind and does not find a suitable slice.
  *
- * These objects are arranged in a linked list, and describe the files in which
- * the disk cache is stored.  Note that we assume that slices are added *before*
- * they are needed: the xfer element will fail if it tries to rewind and does not
- * find a suitable slice.
+ * The slices should be "fast forwarded" after every part, so that the first
+ * byte in part_slices is the first byte of the part; when a retry of a part is
+ * required, use the iterator methods to properly open the various files and do
+ * the buffering.
  */
 
 typedef struct FileSlice {
     struct FileSlice *next;
 
-    /* fully-qualified filename to read from, or NULL to read from disk_cache_read_fd */
+    /* fully-qualified filename to read from (or NULL to read from
+     * disk_cache_read_fd in XferDestTaperCacher) */
     char *filename;
 
     /* offset in file to start at */
-    off_t offset;
+    guint64 offset;
 
     /* length of data to read */
-    gsize length;
+    guint64 length;
 } FileSlice;
 
 /*
@@ -102,108 +74,50 @@ typedef struct XferDestTaperSplitter {
      * constant for the lifetime of the element.
      */
 
-    /* maximum buffer space to use for streaming; this is unrelated to the
-     * fallback_splitsize */
-    gsize max_memory;
-
-    /* split buffering info; if we're doing memory buffering, use_mem_cache is
-     * true; if we're doing disk buffering, disk_cache_dirname is non-NULL
-     * and contains the (allocated) filename of the cache file.  Either way,
-     * part_size gives the desired cache size.  If part_size is zero, then
-     * no splitting takes place (so part_size is effectively infinite) */
-    gboolean use_mem_cache;
-    char *disk_cache_dirname;
-    guint64 part_size; /* (bytes) */
-
-    /*
-     * threads
-     */
+    /* Maximum size of each part (bytes) */
+    guint64 part_size;
 
-    /* The thread doing the actual writes to tape; this also handles buffering
-     * for streaming */
-    GThread *device_thread;
-
-    /* The thread writing slabs to the disk cache, if any */
-    GThread *disk_cache_thread;
+    /* the device's need for streaming (it's assumed that all subsequent devices
+     * have the same needs) */
+    StreamingRequirement streaming;
 
-    /* slab train
-     *
-     * All in-memory data is contained in a linked list called the "slab
-     * train".  Various components are operating simultaneously at different
-     * points in this train.  Data from the upstream XferElement is appended to
-     * the head of the train, and the device thread follows along behind,
-     * writing data to the device.  When caching parts in memory, the slab
-     * train just grows to eventually contain the whole part.  When using an
-     * on-disk cache, the disk cache thread writes the tail of the train to
-     * disk, freeing slabs to be re-used at the head of the train.  Some
-     * careful coordination of these components allows them to operate as
-     * independently as possible within the limits of the user's configuration.
-     *
-     * Slabs are rarely, if ever, freed: the oldest_slab reference generally
-     * ensures that all slabs have refcount > 0, and this pointer is only
-     * advanced when re-using slabs that have been flushed to the disk cache or
-     * when freeing slabs after completion of the transfer. */
-
-    /* pointers into the slab train are all protected by this mutex.  Note that
-     * the slabs themselves can be manipulated without this lock; it's only
-     * when changing the pointers that the mutex must be held.  Furthermore, a
-     * foo_slab variable which is not NULL will not be changed except by its
-     * controlling thread (disk_cacher_slab is controlled by disk_cache_thread,
-     * and device_slab is controlled by device_thread).  This means that a
-     * controlling thread can drop the slab_mutex once it has ensured its slab
-     * is non-NULL.
-     *
-     * Slab_cond is notified when a new slab is made available from the reader.
-     * Slab_free_cond is notified when a slab becomes available for
-     * reallocation.
-     *
-     * Any thread waiting on either condition variable should also check
-     * elt->cancelled, and act appropriately if awakened in a cancelled state.
-     */
-    GMutex *slab_mutex; GCond *slab_cond; GCond *slab_free_cond;
+    /* block size expected by the target device */
+    gsize block_size;
 
-    /* slabs in progress by each thread, or NULL if the thread is waiting on
-     * slab_cond.  These can only be changed by their respective threads, except
-     * when they are NULL (in which case the reader will point them to a new
-     * slab and signal the slab_cond). */
-    Slab *volatile disk_cacher_slab;
-    Slab *volatile mem_cache_slab;
-    Slab *volatile device_slab;
+    /* TRUE if this element is expecting slices via cache_inform */
+    gboolean expect_cache_inform;
 
-    /* tail and head of the slab train */
-    Slab *volatile oldest_slab;
-    Slab *volatile newest_slab;
+    /* The thread doing the actual writes to tape; this also handles buffering
+     * for streaming */
+    GThread *device_thread;
 
-    /* thread-specific information
+    /* Ring Buffer
      *
-     * These values are only used by one thread, and thus are not
-     * subject to any locking or concurrency constraints.
+     * This buffer holds MAX_MEMORY bytes of data (rounded up to the next
+     * blocksize), and serves as the interface between the device_thread and
+     * the thread calling push_buffer.  Ring_length is the total length of the
+     * buffer in bytes, while ring_count is the number of data bytes currently
+     * in the buffer.  The ring_add_cond is signalled when data is added to the
+     * buffer, while ring_free_cond is signalled when data is removed.  Both
+     * are governed by ring_mutex, and both are signalled when the transfer is
+     * cancelled.
      */
 
-    /* slab in progress by the reader (not in the slab train) */
-    Slab *reader_slab;
-
-    /* the serial to be assigned to reader_slab */
-    guint64 next_serial;
-
-    /* bytes written to the device in this part */
-    guint64 bytes_written;
-
-    /* bytes written to the device in the current slab */
-    guint64 slab_bytes_written;
+    GMutex *ring_mutex;
+    GCond *ring_add_cond, *ring_free_cond;
+    gchar *ring_buffer;
+    gsize ring_length, ring_count;
+    gsize ring_head, ring_tail;
+    gboolean ring_head_at_eof;
 
-    /* element state
+    /* Element State
      *
      * "state" includes all of the variables below (including device
-     * parameters).  Note that the device_thread reads state values when
-     * paused is false without locking the mutex.  No other thread should
-     * change state when the element is not paused.
+     * parameters).  Note that the device_thread holdes this mutex for the
+     * entire duration of writing a part.
      *
-     * If there is every any reason to lock both mutexes, acquire this one
-     * first.
-     *
-     * Any thread waiting on this condition variable should also check
-     * elt->cancelled, and act appropriately if awakened in a cancelled state.
+     * state_mutex should always be locked before ring_mutex, if both are to be
+     * held simultaneously.
      */
     GMutex *state_mutex;
     GCond *state_cond;
@@ -213,57 +127,31 @@ typedef struct XferDestTaperSplitter {
     Device *volatile device;
     dumpfile_t *volatile part_header;
 
-    /* If true, when unpaused, the device should begin at the beginning of the
-     * cache; if false, it should proceed to the next part. */
-    volatile gboolean retry_part;
-
-    /* If true, the previous part was completed successfully; only used for
-     * assertions */
-    volatile gboolean last_part_successful;
+    /* bytes to read from cached slices before reading from the ring buffer */
+    guint64 bytes_to_read_from_slices;
 
     /* part number in progress */
     volatile guint64 partnum;
 
-    /* if true, the main thread should *not* call start_part */
-    volatile gboolean no_more_parts;
-
-    /* the first serial in this part, and the serial to stop at */
-    volatile guint64 part_first_serial, part_stop_serial;
-
-    /* file slices for the current part */
-    FileSlice *volatile part_slices;
-
-    /* read and write file descriptors for the disk cache file, in use by the
-     * disk_cache_thread.  If these are -1, wait on state_cond until they are
-     * not; once the value is set, it will not change. */
-    volatile int disk_cache_read_fd;
-    volatile int disk_cache_write_fd;
-
-    /* device parameters
-     *
-     * Note that these values aren't known until we begin writing to the
-     * device; if block_size is zero, threads should block on state_cond until
-     * it is nonzero, at which point all of the dependent fields will have
-     * their correct values.  Note that, since this value never changes after
-     * it has been set, it is safe to read block_size without acquiring the
-     * mutext first. */
-
-    /* this device's need for streaming */
-    StreamingRequirement streaming;
-
-    /* block size expected by the target device */
-    gsize block_size;
-
-    /* Size of a slab - some multiple of the block size */
-    gsize slab_size;
-
-    /* maximum number of slabs allowed, rounded up to the next whole slab.  If
-     * using mem cache, this is the equivalent of part_size bytes; otherwise,
-     * it is equivalent to max_memory bytes. */
-    guint64 max_slabs;
-
-    /* number of slabs in a part */
-    guint64 slabs_per_part;
+    /* status of the last part */
+    gboolean last_part_eof;
+    gboolean last_part_eom;
+    gboolean last_part_successful;
+
+    /* true if the element is done writing to devices */
+    gboolean no_more_parts;
+
+    /* total bytes written in the current part */
+    volatile guint64 part_bytes_written;
+
+    /* The list of active slices for the current part.  The cache_inform method
+     * appends to this list. It is safe to read this linked list, beginning at
+     * the head, *if* you can guarantee that slices will not be fast-forwarded
+     * in the interim.  The finalize method for this class will take care of
+     * freeing any leftover slices. Take the part_slices mutex while modifying
+     * the links in this list. */
+    FileSlice *part_slices;
+    GMutex *part_slices_mutex;
 } XferDestTaperSplitter;
 
 static GType xfer_dest_taper_splitter_get_type(void);
@@ -296,785 +184,399 @@ _xdt_dbg(const char *fmt, ...)
     g_debug("XDT thd-%p: %s", g_thread_self(), msg);
 }
 
-/*
- * Slab handling
- */
-
-/* called with the slab_mutex held, this gets a new slab to write into, with
- * refcount 1.  It will block if max_memory slabs are already in use, and mem
- * caching is not in use, although allocation may be forced with the 'force'
- * parameter.
- *
- * If the memory allocation cannot be satisfied due to system constraints,
- * this function will send an XMSG_ERROR, wait for the transfer to cancel, and
- * return NULL.  If the transfer is cancelled by some other means while this
- * function is blocked awaiting a free slab, it will return NULL.
+/* "Fast forward" the slice list by the given length.  This will free any
+ * slices that are no longer necessary, and adjust the offset and length of the
+ * first remaining slice.  This assumes the state mutex is locked during its
+ * operation.
  *
- * @param self: the xfer element
- * @param force: allocate a slab even if it would exceed max_memory
- * @returns: a new slab, or NULL if the xfer is cancelled
+ * @param self: element
+ * @param length: number of bytes to fast forward
  */
-static Slab *
-alloc_slab(
-    XferDestTaperSplitter *self,
-    gboolean force)
+static void
+fast_forward_slices(
+       XferDestTaperSplitter *self,
+       guint64 length)
 {
-    XferElement *elt = XFER_ELEMENT(self);
-    Slab *rv;
-
-    DBG(8, "alloc_slab(force=%d)", force);
-    if (!force) {
-       /* throttle based on maximum number of extant slabs */
-       while (G_UNLIKELY(
-            !elt->cancelled &&
-           self->oldest_slab &&
-           self->newest_slab &&
-           self->oldest_slab->refcount > 1 &&
-           (self->newest_slab->serial - self->oldest_slab->serial + 1) >= self->max_slabs)) {
-           DBG(9, "waiting for available slab");
-           g_cond_wait(self->slab_free_cond, self->slab_mutex);
-       }
-       DBG(9, "done waiting");
-
-        if (elt->cancelled)
-            return NULL;
-    }
-
-    /* if the oldest slab doesn't have anything else pointing to it, just use
-     * that */
-    if (self->oldest_slab && self->oldest_slab->refcount == 1) {
-       rv = self->oldest_slab;
-       self->oldest_slab = rv->next;
-    } else {
-       rv = g_new0(Slab, 1);
-       rv->refcount = 1;
-       rv->base = g_try_malloc(self->slab_size);
-       if (!rv->base) {
-           g_free(rv);
-           xfer_cancel_with_error(XFER_ELEMENT(self),
-               _("Could not allocate %zu bytes of memory"), self->slab_size);
-           return NULL;
-       }
-    }
+    FileSlice *slice;
 
-    rv->next = NULL;
-    rv->size = 0;
-    return rv;
-}
+    /* consume slices until we've eaten the whole part */
+    g_mutex_lock(self->part_slices_mutex);
+    while (length > 0) {
+       g_assert(self->part_slices);
+       slice = self->part_slices;
 
-/* called with the slab_mutex held, this frees the given slave entirely.  The
- * reference count is not consulted.
- *
- * @param slab: slab to free
- */
-static void
-free_slab(
-    Slab *slab)
-{
-    if (slab) {
-       if (slab->base)
-           g_free(slab->base);
-       g_free(slab);
-    }
-}
+       if (slice->length <= length) {
+           length -= slice->length;
 
-/* called with the slab_mutex held, this decrements the refcount of the
- * given slab
- *
- * @param self: xfer element
- * @param slab: slab to free
- */
-static inline void
-unref_slab(
-    XferDestTaperSplitter *self,
-    Slab *slab)
-{
-    g_assert(slab->refcount > 1);
-    slab->refcount--;
-    if (G_UNLIKELY(slab->refcount == 1 && slab == self->oldest_slab)) {
-       g_cond_broadcast(self->slab_free_cond);
-    } else if (G_UNLIKELY(slab->refcount == 0)) {
-       free_slab(slab);
+           self->part_slices = slice->next;
+           if (slice->filename)
+               g_free(slice->filename);
+           g_free(slice);
+           slice = self->part_slices;
+       } else {
+           slice->length -= length;
+           slice->offset += length;
+           break;
+       }
     }
+    g_mutex_unlock(self->part_slices_mutex);
 }
 
-/* called with the slab_mutex held, this sets *slabp to *slabp->next,
- * adjusting refcounts appropriately, and returns the new value
- *
- * @param self: xfer element
- * @param slabp: slab pointer to advance
- * @returns: new value of *slabp
+/*
+ * Slice Iterator
  */
-static inline Slab *
-next_slab(
-    XferDestTaperSplitter *self,
-    Slab * volatile *slabp)
-{
-    Slab *next;
 
-    if (!slabp || !*slabp)
-       return NULL;
+/* A struct for use in iterating over data in the slices */
+typedef struct SliceIterator {
+    /* current slice */
+    FileSlice *slice;
 
-    next = (*slabp)->next;
-    if (next)
-       next->refcount++;
-    if (*slabp)
-       unref_slab(self, *slabp);
-    *slabp = next;
+    /* file descriptor of the current file, or -1 if it's not open yet */
+    int cur_fd;
 
-    return next;
-}
+    /* bytes remaining in this slice */
+    guint64 slice_remaining;
+} SliceIterator;
 
-/*
- * Disk Cache
- *
- * The disk cache thread's job is simply to follow along the slab train at
- * maximum speed, writing slabs to the disk cache file. */
+/* Utility functions for SliceIterator */
 
-static gboolean
-open_disk_cache_fds(
-    XferDestTaperSplitter *self)
+/* Begin iterating over slices, starting at the first byte of the first slice.
+ * Initializes a pre-allocated SliceIterator.  The caller must ensure that
+ * fast_forward_slices is not called while an iteration is in
+ * progress.
+ */
+static void
+iterate_slices(
+       XferDestTaperSplitter *self,
+       SliceIterator *iter)
 {
-    char * filename;
-
-    g_assert(self->disk_cache_read_fd == -1);
-    g_assert(self->disk_cache_write_fd == -1);
-
-    g_mutex_lock(self->state_mutex);
-    filename = g_strdup_printf("%s/amanda-split-buffer-XXXXXX",
-                               self->disk_cache_dirname);
-
-    self->disk_cache_write_fd = g_mkstemp(filename);
-    if (self->disk_cache_write_fd < 0) {
-       g_mutex_unlock(self->state_mutex);
-       xfer_cancel_with_error(XFER_ELEMENT(self),
-           _("Error creating cache file in '%s': %s"), self->disk_cache_dirname,
-           strerror(errno));
-       g_free(filename);
-       return FALSE;
-    }
-
-    /* open a separate copy of the file for reading */
-    self->disk_cache_read_fd = open(filename, O_RDONLY);
-    if (self->disk_cache_read_fd < 0) {
-       g_mutex_unlock(self->state_mutex);
-       xfer_cancel_with_error(XFER_ELEMENT(self),
-           _("Error opening cache file in '%s': %s"), self->disk_cache_dirname,
-           strerror(errno));
-       g_free(filename);
-       return FALSE;
-    }
-
-    /* signal anyone waiting for this value */
-    g_cond_broadcast(self->state_cond);
-    g_mutex_unlock(self->state_mutex);
-
-    /* errors from unlink are not fatal */
-    if (unlink(filename) < 0) {
-       g_warning("While unlinking '%s': %s (ignored)", filename, strerror(errno));
-    }
-
-    g_free(filename);
-    return TRUE;
+    iter->cur_fd = -1;
+    iter->slice_remaining = 0;
+    g_mutex_lock(self->part_slices_mutex);
+    iter->slice = self->part_slices;
+    /* it's safe to unlock this because, at worst, a new entry will
+     * be appended while the iterator is in progress */
+    g_mutex_unlock(self->part_slices_mutex);
 }
 
+
+/* Get a block of data from the iterator, returning a pointer to a buffer
+ * containing the data; the buffer remains the property of the iterator.
+ * Returns NULL on error, after calling xfer_cancel_with_error with an
+ * appropriate error message.  This function does not block, so it does not
+ * check for cancellation.
+ */
 static gpointer
-disk_cache_thread(
-    gpointer data)
+iterator_get_block(
+       XferDestTaperSplitter *self,
+       SliceIterator *iter,
+       gpointer buf,
+       gsize bytes_needed)
 {
-    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
+    gsize buf_offset = 0;
     XferElement *elt = XFER_ELEMENT(self);
 
-    DBG(1, "(this is the disk cache thread)");
-
-    /* open up the disk cache file first */
-    if (!open_disk_cache_fds(self))
-       return NULL;
-
-    while (!elt->cancelled) {
-       gboolean eof, eop;
-       guint64 stop_serial;
-       Slab *slab;
+    g_assert(iter != NULL);
+    g_assert(buf != NULL);
 
-       /* rewind to the begining of the disk cache file */
-       if (lseek(self->disk_cache_write_fd, 0, SEEK_SET) == -1) {
-           xfer_cancel_with_error(XFER_ELEMENT(self),
-               _("Error seeking disk cache file in '%s': %s"), self->disk_cache_dirname,
-               strerror(errno));
-           return NULL;
-       }
-
-       /* we need to sit and wait for the next part to begin, first making sure
-        * we have a slab .. */
-       g_mutex_lock(self->slab_mutex);
-       while (!self->disk_cacher_slab && !elt->cancelled) {
-           DBG(9, "waiting for a disk slab");
-           g_cond_wait(self->slab_cond, self->slab_mutex);
-       }
-       DBG(9, "done waiting");
-       g_mutex_unlock(self->slab_mutex);
+    while (bytes_needed > 0) {
+       gsize read_size;
+       int bytes_read;
 
-       if (elt->cancelled)
-           break;
+       if (iter->cur_fd < 0) {
+           guint64 offset;
 
-       /* this slab is now fixed until this thread changes it */
-       g_assert(self->disk_cacher_slab != NULL);
-
-       /* and then making sure we're ready to write that slab. */
-       g_mutex_lock(self->state_mutex);
-        while ((self->paused ||
-                   (self->disk_cacher_slab && self->disk_cacher_slab->serial > self->part_first_serial))
-               && !elt->cancelled) {
-            DBG(9, "waiting for the disk slab to become current and un-paused");
-            g_cond_wait(self->state_cond, self->state_mutex);
-        }
-       DBG(9, "done waiting");
+           g_assert(iter->slice != NULL);
+           g_assert(iter->slice->filename != NULL);
 
-       stop_serial = self->part_stop_serial;
-       g_mutex_unlock(self->state_mutex);
+           iter->cur_fd = open(iter->slice->filename, O_RDONLY, 0);
+           if (iter->cur_fd < 0) {
+               xfer_cancel_with_error(elt,
+                   _("Could not open '%s' for reading: %s"),
+                   iter->slice->filename, strerror(errno));
+               return NULL;
+           }
 
-       if (elt->cancelled)
-           break;
+           iter->slice_remaining = iter->slice->length;
+           offset = iter->slice->offset;
 
-       g_mutex_lock(self->slab_mutex);
-       slab = self->disk_cacher_slab;
-       eop = eof = FALSE;
-       while (!eop && !eof) {
-           /* if we're at the head of the slab train, wait for more data */
-           while (!self->disk_cacher_slab && !elt->cancelled) {
-               DBG(9, "waiting for the next disk slab");
-               g_cond_wait(self->slab_cond, self->slab_mutex);
+           if (lseek(iter->cur_fd, offset, SEEK_SET) == -1) {
+               xfer_cancel_with_error(elt,
+                   _("Could not seek '%s' for reading: %s"),
+                   iter->slice->filename, strerror(errno));
+               return NULL;
            }
-           DBG(9, "done waiting");
+       }
 
-            if (elt->cancelled)
-                break;
+       read_size = MIN(iter->slice_remaining, bytes_needed);
+       bytes_read = full_read(iter->cur_fd,
+                              buf + buf_offset,
+                              read_size);
+       if (bytes_read < 0 || (gsize)bytes_read < read_size) {
+           xfer_cancel_with_error(elt,
+               _("Error reading '%s': %s"),
+               iter->slice->filename,
+               errno? strerror(errno) : _("Unexpected EOF"));
+           return NULL;
+       }
 
-           /* drop the lock long enough to write the slab; the refcount
-            * protects the slab during this time */
-           slab = self->disk_cacher_slab;
-           g_mutex_unlock(self->slab_mutex);
+       iter->slice_remaining -= bytes_read;
+       buf_offset += bytes_read;
+       bytes_needed -= bytes_read;
 
-           if (full_write(self->disk_cache_write_fd, slab->base, slab->size) < slab->size) {
-               xfer_cancel_with_error(XFER_ELEMENT(self),
-                   _("Error writing to disk cache file in '%s': %s"), self->disk_cache_dirname,
-                   strerror(errno));
+       if (iter->slice_remaining <= 0) {
+           if (close(iter->cur_fd) < 0) {
+               xfer_cancel_with_error(elt,
+                   _("Could not close fd %d: %s"),
+                   iter->cur_fd, strerror(errno));
                return NULL;
            }
+           iter->cur_fd = -1;
 
-           eof = slab->size < self->slab_size;
-           eop = (slab->serial + 1 == stop_serial);
+           iter->slice = iter->slice->next;
 
-           g_mutex_lock(self->slab_mutex);
-           next_slab(self, &self->disk_cacher_slab);
-       }
-       g_mutex_unlock(self->slab_mutex);
-
-       if (eof) {
-           /* this very thread should have just set this value to NULL, and since it's
-            * EOF, there should not be any 'next' slab */
-           g_assert(self->disk_cacher_slab == NULL);
-           break;
+           if (elt->cancelled)
+               return NULL;
        }
     }
 
-    return NULL;
+    return buf;
+}
+
+
+/* Free the iterator's resources */
+static void
+iterator_free(
+       SliceIterator *iter)
+{
+    if (iter->cur_fd >= 0)
+       close(iter->cur_fd);
 }
 
 /*
  * Device Thread
- *
- * The device thread's job is to write slabs to self->device, applying whatever
- * streaming algorithms are required.  It does this by alternately getting the
- * next slab from a "slab source" and writing that slab to the device.  Most of
- * the slab source functions assume that self->slab_mutex is held, but may
- * release the mutex (either explicitly or via a g_cond_wait), so it is not
- * valid to assume that any slab pointers remain unchanged after a slab_source
- * function invication.
  */
 
-/* This struct tracks the current state of the slab source */
-typedef struct slab_source_state {
-    /* temporary slab used for reading from disk */
-    Slab *tmp_slab;
-
-    /* current source slice */
-    FileSlice *slice;
-
-    /* open fd in current slice, or -1 */
-    int slice_fd;
-
-    /* next serial to read from disk */
-    guint64 next_serial;
-
-    /* bytes remaining in this slice */
-    gsize slice_remaining;
-} slab_source_state;
-
-/* Called with the slab_mutex held, this function pre-buffers enough data into the slab
- * train to meet the device's streaming needs. */
-static gboolean
-slab_source_prebuffer(
+/* Wait for at least one block, or EOF, to be available in the ring buffer.
+ * Called with the ring mutex held. */
+static gsize
+device_thread_wait_for_block(
     XferDestTaperSplitter *self)
 {
     XferElement *elt = XFER_ELEMENT(self);
-    guint64 prebuffer_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
-    guint64 i;
-    Slab *slab;
-
-    /* always prebuffer at least one slab, even if max_memory is 0 */
-    if (prebuffer_slabs == 0) prebuffer_slabs = 1;
-
-    /* pre-buffering is not necessary if we're reading from a disk cache */
-    if (self->retry_part && self->part_slices)
-       return TRUE;
-
-    /* pre-buffering means waiting until we have at least prebuffer_slabs in the
-     * slab train ahead of the device_slab, or the newest slab is at EOF. */
-    while (!elt->cancelled) {
-       gboolean eof_or_eop = FALSE;
-
-       /* see if there's enough data yet */
-       for (i = 0, slab = self->device_slab;
-            i < prebuffer_slabs && slab != NULL;
-            i++, slab = slab->next) {
-           eof_or_eop = (slab->size < self->slab_size)
-               || (slab->serial + 1 == self->part_stop_serial);
-       }
-       if (i == prebuffer_slabs || eof_or_eop)
-           break;
+    gsize bytes_needed = self->device->block_size;
+    gsize usable;
 
-       DBG(9, "prebuffering wait");
-       g_cond_wait(self->slab_cond, self->slab_mutex);
-    }
-    DBG(9, "done waiting");
+    /* for any kind of streaming, we need to fill the entire buffer before the
+     * first byte */
+    if (self->part_bytes_written == 0 && self->streaming != STREAMING_REQUIREMENT_NONE)
+       bytes_needed = self->ring_length;
 
-    if (elt->cancelled) {
-       self->last_part_successful = FALSE;
-       self->no_more_parts = TRUE;
-       return FALSE;
-    }
-
-    return TRUE;
-}
-
-/* Called without the slab_mutex held, this function sets up a new slab_source_state
- * object based on the configuratino of the Xfer Element. */
-static inline gboolean
-slab_source_setup(
-    XferDestTaperSplitter *self,
-    slab_source_state *state)
-{
-    state->tmp_slab = NULL;
-    state->slice_fd = -1;
-    state->slice = NULL;
-    state->slice_remaining = 0;
-    state->next_serial = G_MAXUINT64;
-
-    /* if we're to retry the part, rewind to the beginning */
-    if (self->retry_part) {
-       if (self->use_mem_cache) {
-           /* rewind device_slab to point to the mem_cache_slab */
-           g_mutex_lock(self->slab_mutex);
-           if (self->device_slab)
-               unref_slab(self, self->device_slab);
-           self->device_slab = self->mem_cache_slab;
-           if(self->device_slab != NULL)
-               self->device_slab->refcount++;
-           g_mutex_unlock(self->slab_mutex);
-       } else {
-           g_assert(self->part_slices);
-
-           g_mutex_lock(self->slab_mutex);
-
-           /* we're going to read from the disk cache until we get to the oldest useful
-            * slab in memory, so it had best exist */
-           g_assert(self->oldest_slab != NULL);
-
-           /* point device_slab at the oldest slab we have */
-           self->oldest_slab->refcount++;
-           if (self->device_slab)
-               unref_slab(self, self->device_slab);
-           self->device_slab = self->oldest_slab;
-
-           /* and increment it until it is at least the slab we want to start from */
-           while (self->device_slab->serial < self->part_first_serial) {
-               next_slab(self, &self->device_slab);
-           }
+    while (1) {
+       /* are we ready? */
+       if (elt->cancelled)
+           break;
 
-           /* get a new, temporary slab for use while reading */
-           state->tmp_slab = alloc_slab(self, TRUE);
+       if (self->ring_count >= bytes_needed)
+           break;
 
-           g_mutex_unlock(self->slab_mutex);
+       if (self->ring_head_at_eof)
+           break;
 
-           if (!state->tmp_slab) {
-                /* if we couldn't allocate a slab, then we're cancelled, so we're done with
-                 * this part. */
-               self->last_part_successful = FALSE;
-               self->no_more_parts = TRUE;
-               return FALSE;
-           }
+       /* nope - so wait */
+       g_cond_wait(self->ring_add_cond, self->ring_mutex);
 
-           state->tmp_slab->size = self->slab_size;
-           state->slice = self->part_slices;
-           state->next_serial = self->part_first_serial;
-       }
+       /* in STREAMING_REQUIREMENT_REQUIRED, once we decide to wait for more bytes,
+        * we need to wait for the entire buffer to fill */
+       if (self->streaming == STREAMING_REQUIREMENT_REQUIRED)
+           bytes_needed = self->ring_length;
     }
 
-    /* if the streaming mode requires it, pre-buffer */
-    if (self->streaming == STREAMING_REQUIREMENT_DESIRED ||
-       self->streaming == STREAMING_REQUIREMENT_REQUIRED) {
-       gboolean prebuffer_ok;
-
-       g_mutex_lock(self->slab_mutex);
-       prebuffer_ok = slab_source_prebuffer(self);
-       g_mutex_unlock(self->slab_mutex);
-       if (!prebuffer_ok)
-           return FALSE;
-    }
+    usable = MIN(self->ring_count, bytes_needed);
+    if (self->part_size)
+       usable = MIN(usable, self->part_size - self->part_bytes_written);
 
-    return TRUE;
+    return usable;
 }
 
-/* Called with the slab_mutex held, this does the work of slab_source_get when
- * reading from the disk cache.  Note that this explicitly releases the
- * slab_mutex during execution - do not depend on any protected values across a
- * call to this function.  The mutex is held on return. */
-static Slab *
-slab_source_get_from_disk(
+/* Mark WRITTEN bytes as free in the ring buffer.  Called with the ring mutex
+ * held. */
+static void
+device_thread_consume_block(
     XferDestTaperSplitter *self,
-    slab_source_state *state,
-    guint64 serial)
+    gsize written)
+{
+    self->ring_count -= written;
+    self->ring_tail += written;
+    if (self->ring_tail >= self->ring_length)
+       self->ring_tail -= self->ring_length;
+    g_cond_broadcast(self->ring_free_cond);
+}
+
+/* Write an entire part.  Called with the state_mutex held */
+static XMsg *
+device_thread_write_part(
+    XferDestTaperSplitter *self)
 {
+    GTimer *timer = g_timer_new();
     XferElement *elt = XFER_ELEMENT(self);
-    gsize bytes_needed = self->slab_size;
-    gsize slab_offset = 0;
 
-    /* NOTE: slab_mutex is held, but we don't need it here, so release it for the moment */
-    g_mutex_unlock(self->slab_mutex);
+    enum { PART_EOF, PART_LEOM, PART_EOP, PART_FAILED } part_status = PART_FAILED;
+    int fileno = 0;
+    XMsg *msg;
 
-    g_assert(state->next_serial == serial);
+    self->part_bytes_written = 0;
 
-    while (bytes_needed > 0) {
-       gsize read_size, bytes_read;
-
-       if (state->slice_fd < 0) {
-           g_assert(state->slice);
-           if (state->slice->filename) {
-               /* regular cache_inform file - just open it */
-               state->slice_fd = open(state->slice->filename, O_RDONLY, 0);
-               if (state->slice_fd < 0) {
-                   xfer_cancel_with_error(XFER_ELEMENT(self),
-                        _("Could not open '%s' for reading: %s"),
-                       state->slice->filename, strerror(errno));
-                   goto fatal_error;
-               }
-           } else {
-               /* wait for the disk_cache_thread to open the disk_cache_read_fd, and then copy it */
-               g_mutex_lock(self->state_mutex);
-               while (self->disk_cache_read_fd == -1 && !elt->cancelled) {
-                   DBG(9, "waiting for disk_cache_thread to start up");
-                   g_cond_wait(self->state_cond, self->state_mutex);
-               }
-               DBG(9, "done waiting");
-               state->slice_fd = self->disk_cache_read_fd;
-               g_mutex_unlock(self->state_mutex);
-           }
+    g_timer_start(timer);
 
-           if (lseek(state->slice_fd, state->slice->offset, SEEK_SET) == -1) {
-               xfer_cancel_with_error(XFER_ELEMENT(self),
-                    _("Could not seek '%s' for reading: %s"),
-                   state->slice->filename? state->slice->filename : "(cache file)",
-                   strerror(errno));
-               goto fatal_error;
-           }
+    /* write the header; if this fails or hits LEOM, we consider this a
+     * successful 0-byte part */
+    if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
+       part_status = PART_LEOM;
+       goto part_done;
+    }
 
-           state->slice_remaining = state->slice->length;
-       }
+    fileno = self->device->file;
+    g_assert(fileno > 0);
 
-       read_size = MIN(state->slice_remaining, bytes_needed);
-       bytes_read = full_read(state->slice_fd,
-                              state->tmp_slab->base + slab_offset,
-                              read_size);
-       if (bytes_read < read_size) {
-            xfer_cancel_with_error(XFER_ELEMENT(self),
-                _("Error reading '%s': %s"),
-               state->slice->filename? state->slice->filename : "(cache file)",
-               errno? strerror(errno) : _("Unexpected EOF"));
-            goto fatal_error;
-       }
+    /* free the header, now that it's written */
+    dumpfile_free(self->part_header);
+    self->part_header = NULL;
 
-       state->slice_remaining -= bytes_read;
-       if (state->slice_remaining == 0) {
-           if (close(state->slice_fd) < 0) {
-               xfer_cancel_with_error(XFER_ELEMENT(self),
-                    _("Could not close fd %d: %s"),
-                   state->slice_fd, strerror(errno));
-               goto fatal_error;
-           }
-           state->slice_fd = -1;
-           state->slice = state->slice->next;
-       }
+    /* First, read the requisite number of bytes from the part_slices, if the part was
+     * unsuccessful. */
+    if (self->bytes_to_read_from_slices) {
+       SliceIterator iter;
+       gsize to_write = self->block_size;
+       gpointer buf = g_malloc(to_write);
+       gboolean successful = TRUE;
+       guint64 bytes_from_slices = self->bytes_to_read_from_slices;
 
-       bytes_needed -= bytes_read;
-       slab_offset += bytes_read;
-    }
+       DBG(5, "reading %ju bytes from slices", (uintmax_t)bytes_from_slices);
 
-    state->tmp_slab->serial = state->next_serial++;
+       iterate_slices(self, &iter);
+       while (bytes_from_slices) {
+           gboolean ok;
 
-    g_mutex_lock(self->slab_mutex);
-    return state->tmp_slab;
+           if (!iterator_get_block(self, &iter, buf, to_write)) {
+               part_status = PART_FAILED;
+               successful = FALSE;
+               break;
+           }
 
-fatal_error:
-    g_mutex_lock(self->slab_mutex);
+           /* note that it's OK to reference these ring_* vars here, as they
+            * are static at this point */
+           ok = device_write_block(self->device, (guint)to_write, buf);
 
-    self->last_part_successful = FALSE;
-    self->no_more_parts = TRUE;
-    return NULL;
-}
+           if (!ok) {
+               part_status = PART_FAILED;
+               successful = FALSE;
+               break;
+           }
 
-/* Called with the slab_mutex held, this function gets the slab with the given
- * serial number, waiting if necessary for that slab to be available.  Note
- * that the slab_mutex may be released during execution, although it is always
- * held on return. */
-static inline Slab *
-slab_source_get(
-    XferDestTaperSplitter *self,
-    slab_source_state *state,
-    guint64 serial)
-{
-    XferElement *elt = (XferElement *)self;
-
-    /* device_slab is only NULL if we're following the slab train, so wait for
-     * a new slab */
-    if (!self->device_slab) {
-       /* if the streaming mode requires it, pre-buffer */
-       if (self->streaming == STREAMING_REQUIREMENT_DESIRED) {
-           if (!slab_source_prebuffer(self))
-               return NULL;
+           self->part_bytes_written += to_write;
+           bytes_from_slices -= to_write;
 
-           /* fall through to make sure we have a device_slab;
-            * slab_source_prebuffer doesn't guarantee device_slab != NULL */
+           if (self->part_size && self->part_bytes_written >= self->part_size) {
+               part_status = PART_EOP;
+               successful = FALSE;
+               break;
+           } else if (self->device->is_eom) {
+               part_status = PART_LEOM;
+               successful = FALSE;
+               break;
+           }
        }
 
-       while (self->device_slab == NULL && !elt->cancelled) {
-           DBG(9, "waiting for the next slab");
-           g_cond_wait(self->slab_cond, self->slab_mutex);
-       }
-       DBG(9, "done waiting");
+       iterator_free(&iter);
+       g_free(buf);
 
-       if (elt->cancelled)
-           goto fatal_error;
+       /* if we didn't finish, get out of here now */
+       if (!successful)
+           goto part_done;
     }
 
-    /* device slab is now set, and only this thread can change it */
-    g_assert(self->device_slab);
-
-    /* if the next item in the device slab is the one we want, then the job is
-     * pretty easy */
-    if (G_LIKELY(serial == self->device_slab->serial))
-       return self->device_slab;
-
-    /* otherwise, we're reading from disk */
-    g_assert(serial < self->device_slab->serial);
-    return slab_source_get_from_disk(self, state, serial);
+    g_mutex_lock(self->ring_mutex);
+    while (1) {
+       gsize to_write;
+       gboolean ok;
 
-fatal_error:
-    self->last_part_successful = FALSE;
-    self->no_more_parts = TRUE;
-    return NULL;
-}
+       /* wait for at least one block, and (if necessary) prebuffer */
+       to_write = device_thread_wait_for_block(self);
+       to_write = MIN(to_write, self->device->block_size);
+       if (elt->cancelled)
+           break;
 
-/* Called without the slab_mutex held, this frees any resources assigned
- * to the slab source state */
-static inline void
-slab_source_free(
-    XferDestTaperSplitter *self,
-    slab_source_state *state)
-{
-    if (state->slice_fd != -1)
-       close(state->slice_fd);
+       if (to_write == 0) {
+           part_status = PART_EOF;
+           break;
+       }
 
-    if (state->tmp_slab) {
-       g_mutex_lock(self->slab_mutex);
-       free_slab(state->tmp_slab);
-       g_mutex_unlock(self->slab_mutex);
-    }
-}
+       g_mutex_unlock(self->ring_mutex);
+       DBG(8, "writing %ju bytes to device", (uintmax_t)to_write);
 
-/* Called without the slab_mutex, this writes the given slab to the device */
-static gboolean
-write_slab_to_device(
-    XferDestTaperSplitter *self,
-    Slab *slab)
-{
-    XferElement *elt = XFER_ELEMENT(self);
-    gpointer buf = slab->base;
-    gsize remaining = slab->size;
+       /* note that it's OK to reference these ring_* vars here, as they
+        * are static at this point */
+       ok = device_write_block(self->device, (guint)to_write,
+               self->ring_buffer + self->ring_tail);
+       g_mutex_lock(self->ring_mutex);
 
-    while (remaining && !elt->cancelled) {
-       gsize write_size = MIN(self->block_size, remaining);
-       gboolean ok;
-       ok = device_write_block(self->device, write_size, buf);
        if (!ok) {
-            self->bytes_written += slab->size - remaining;
-
-            /* TODO: handle an error without is_eom
-             * differently/fatally? or at least with a warning? */
-           self->last_part_successful = FALSE;
-           self->no_more_parts = FALSE;
-           return FALSE;
+           part_status = PART_FAILED;
+           break;
        }
 
-       buf += write_size;
-       self->slab_bytes_written += write_size;
-       remaining -= write_size;
-    }
+       self->part_bytes_written += to_write;
+       device_thread_consume_block(self, to_write);
 
-    if (elt->cancelled) {
-       self->last_part_successful = FALSE;
-       self->no_more_parts = TRUE;
-        return FALSE;
+       if (self->part_size && self->part_bytes_written >= self->part_size) {
+           part_status = PART_EOP;
+           break;
+       } else if (self->device->is_eom) {
+           part_status = PART_LEOM;
+           break;
+       }
     }
+    g_mutex_unlock(self->ring_mutex);
+part_done:
 
-    self->bytes_written += slab->size;
-    self->slab_bytes_written = 0;
-    return TRUE;
-}
-
-static XMsg *
-device_thread_write_part(
-    XferDestTaperSplitter *self)
-{
-    GTimer *timer = g_timer_new();
-    XMsg *msg;
-    slab_source_state src_state;
-    guint64 serial, stop_serial;
-    gboolean eof = FALSE;
-    int fileno = 0;
-
-    self->last_part_successful = FALSE;
-    self->bytes_written = 0;
-
-    if (!device_start_file(self->device, self->part_header))
-       goto part_done;
-
-    dumpfile_free(self->part_header);
-    self->part_header = NULL;
-
-    fileno = self->device->file;
-    g_assert(fileno > 0);
-
-    if (!slab_source_setup(self, &src_state))
-       goto part_done;
-
-    g_timer_start(timer);
-
-    stop_serial = self->part_stop_serial;
-    g_mutex_lock(self->slab_mutex);
-    for (serial = self->part_first_serial; serial < stop_serial && !eof; serial++) {
-       Slab *slab = slab_source_get(self, &src_state, serial);
-       DBG(8, "writing slab %p (serial %ju) to device", slab, serial);
-       g_mutex_unlock(self->slab_mutex);
-       if (!slab)
-           goto part_done;
-
-       eof = slab->size < self->slab_size;
-
-       if (!write_slab_to_device(self, slab))
-           goto part_done;
-
-       g_mutex_lock(self->slab_mutex);
-       DBG(8, "wrote slab %p to device", slab);
-
-       /* if we're reading from the slab train, advance self->device_slab. */
-       if (slab == self->device_slab) {
-           next_slab(self, &self->device_slab);
-       }
+    if (elt->cancelled) {
+       g_timer_destroy(timer);
+       return NULL;
     }
-    g_mutex_unlock(self->slab_mutex);
 
     /* if we write all of the blocks, but the finish_file fails, then likely
      * there was some buffering going on in the device driver, and the blocks
-     * did not all make it to permanent storage -- so it's a failed part. */
-    if (!device_finish_file(self->device))
-       goto part_done;
-
-    slab_source_free(self, &src_state);
-
-    self->last_part_successful = TRUE;
-    self->no_more_parts = eof;
+     * did not all make it to permanent storage -- so it's a failed part.  Note
+     * that we try to finish_file even if the part failed, just to be thorough. */
+    if (self->device->in_file) {
+       if (!device_finish_file(self->device))
+           part_status = PART_FAILED;
+    }
 
-part_done:
     g_timer_stop(timer);
 
     msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
-    msg->size = self->bytes_written;
+    msg->size = self->part_bytes_written;
     msg->duration = g_timer_elapsed(timer, NULL);
     msg->partnum = self->partnum;
     msg->fileno = fileno;
-    msg->successful = self->last_part_successful;
-    msg->eom = !self->last_part_successful;
-    msg->eof = self->no_more_parts;
+    msg->successful = self->last_part_successful = part_status != PART_FAILED;
+    msg->eom = self->last_part_eom = (part_status == PART_LEOM || !msg->successful);
+    msg->eof = self->last_part_eof = part_status == PART_EOF;
 
-    if (self->last_part_successful)
+    /* time runs backward on some test boxes, so make sure this is positive */
+    if (msg->duration < 0) msg->duration = 0;
+
+    if (msg->successful)
        self->partnum++;
+    self->no_more_parts = msg->eof || (!msg->successful && !self->expect_cache_inform);
 
     g_timer_destroy(timer);
 
     return msg;
 }
 
-/* Called with the status_mutex held, this frees any cached data for
- * a successful part */
-static void
-release_part_cache(
-    XferDestTaperSplitter *self)
-{
-    if (self->use_mem_cache && self->mem_cache_slab) {
-       /* move up the mem_cache_slab to point to the first slab in
-        * the next part (probably NULL at this point), so that the
-        * reader can continue reading data into the new mem cache
-        * immediately. */
-       g_mutex_lock(self->slab_mutex);
-       unref_slab(self, self->mem_cache_slab);
-       self->mem_cache_slab = self->device_slab;
-       if (self->mem_cache_slab)
-           self->mem_cache_slab->refcount++;
-       g_mutex_unlock(self->slab_mutex);
-    }
-
-    /* the disk_cache_thread takes care of freeing its cache */
-    else if (self->disk_cache_dirname)
-       return;
-
-    /* if we have part_slices, fast-forward them. Note that we should have a
-     * full part's worth of slices by now. */
-    else if (self->part_slices) {
-       guint64 bytes_remaining = self->slabs_per_part * self->slab_size;
-       FileSlice *slice = self->part_slices;
-
-       /* consume slices until we've eaten the whole part */
-       while (bytes_remaining > 0) {
-           if (slice == NULL)
-               g_critical("Not all data in part was represented to cache_inform");
-
-           if (slice->length <= bytes_remaining) {
-               bytes_remaining -= slice->length;
-
-               self->part_slices = slice->next;
-               g_free(slice->filename);
-               g_free(slice);
-               slice = self->part_slices;
-           } else {
-               slice->length -= bytes_remaining;
-               slice->offset += bytes_remaining;
-               break;
-           }
-       }
-    }
-}
-
 static gpointer
 device_thread(
     gpointer data)
@@ -1085,15 +587,6 @@ device_thread(
 
     DBG(1, "(this is the device thread)");
 
-    if (self->disk_cache_dirname) {
-        GError *error = NULL;
-       self->disk_cache_thread = g_thread_create(disk_cache_thread, (gpointer)self, TRUE, &error);
-        if (!self->disk_cache_thread) {
-            g_critical(_("Error creating new thread: %s (%s)"),
-                error->message, errno? strerror(errno) : _("no error code"));
-        }
-    }
-
     /* This is the outer loop, that loops once for each split part written to
      * tape. */
     g_mutex_lock(self->state_mutex);
@@ -1109,33 +602,29 @@ device_thread(
         if (elt->cancelled)
            break;
 
-        g_mutex_unlock(self->state_mutex);
-       self->slab_bytes_written = 0;
        DBG(2, "beginning to write part");
        msg = device_thread_write_part(self);
        DBG(2, "done writing part");
-        g_mutex_lock(self->state_mutex);
 
-       /* release any cache of a successful part, but don't bother at EOF */
-       if (msg->successful && !msg->eof)
-           release_part_cache(self);
+       if (!msg) /* cancelled */
+           break;
+
+       /* release the slices for this part, if there were any slices */
+       if (msg->successful && self->expect_cache_inform) {
+           fast_forward_slices(self, msg->size);
+       }
 
        xfer_queue_message(elt->xfer, msg);
 
+       /* pause ourselves and await instructions from the main thread */
+       self->paused = TRUE;
+
        /* if this is the last part, we're done with the part loop */
        if (self->no_more_parts)
            break;
-
-       /* pause ourselves and await instructions from the main thread */
-       self->paused = TRUE;
     }
-
     g_mutex_unlock(self->state_mutex);
 
-    /* make sure the other thread is done before we send XMSG_DONE */
-    if (self->disk_cache_thread)
-        g_thread_join(self->disk_cache_thread);
-
     /* tell the main thread we're done */
     xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
 
@@ -1146,49 +635,6 @@ device_thread(
  * Class mechanics
  */
 
-/* called with the slab_mutex held, this adds the reader_slab to the head of
- * the slab train and signals the condition variable. */
-static void
-add_reader_slab_to_train(
-    XferDestTaperSplitter *self)
-{
-    Slab *slab = self->reader_slab;
-
-    DBG(3, "adding slab of new data to the slab train");
-
-    if (self->newest_slab) {
-       self->newest_slab->next = slab;
-       slab->refcount++;
-
-       self->newest_slab->refcount--;
-    }
-
-    self->newest_slab = slab; /* steal reader_slab's ref */
-    self->reader_slab = NULL;
-
-    /* steal reader_slab's reference for newest_slab */
-
-    /* if any of the other pointers are waiting for this slab, update them */
-    if (self->disk_cache_dirname && !self->disk_cacher_slab) {
-       self->disk_cacher_slab = slab;
-       slab->refcount++;
-    }
-    if (self->use_mem_cache && !self->mem_cache_slab) {
-       self->mem_cache_slab = slab;
-       slab->refcount++;
-    }
-    if (!self->device_slab) {
-       self->device_slab = slab;
-       slab->refcount++;
-    }
-    if (!self->oldest_slab) {
-       self->oldest_slab = slab;
-       slab->refcount++;
-    }
-
-    g_cond_broadcast(self->slab_cond);
-}
-
 static void
 push_buffer_impl(
     XferElement *elt,
@@ -1196,7 +642,7 @@ push_buffer_impl(
     size_t size)
 {
     XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
-    gpointer p;
+    gchar *p = buf;
 
     DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
 
@@ -1207,66 +653,52 @@ push_buffer_impl(
 
     /* handle EOF */
     if (G_UNLIKELY(buf == NULL)) {
-       /* send off the last, probably partial slab */
-       g_mutex_lock(self->slab_mutex);
-
-       /* create a new, empty slab if necessary */
-       if (!self->reader_slab) {
-           self->reader_slab = alloc_slab(self, FALSE);
-            if (!self->reader_slab) {
-                /* we've been cancelled while waiting for a slab */
-                g_mutex_unlock(self->slab_mutex);
-
-                /* wait for the xfer to cancel, so we don't get another buffer
-                 * pushed to us (and do so *without* the mutex held) */
-                wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
-
-                goto free_and_finish;
-            }
-           self->reader_slab->serial = self->next_serial++;
-       }
-
-       add_reader_slab_to_train(self);
-       g_mutex_unlock(self->slab_mutex);
-
+       /* indicate EOF to the device thread */
+       g_mutex_lock(self->ring_mutex);
+       self->ring_head_at_eof = TRUE;
+       g_cond_broadcast(self->ring_add_cond);
+       g_mutex_unlock(self->ring_mutex);
        goto free_and_finish;
     }
 
-    p = buf;
-    while (1) {
-       gsize copy_size;
-
-       /* get a fresh slab, if needed */
-       if (G_UNLIKELY(!self->reader_slab) || self->reader_slab->size == self->slab_size) {
-           g_mutex_lock(self->slab_mutex);
-           if (self->reader_slab)
-               add_reader_slab_to_train(self);
-           self->reader_slab = alloc_slab(self, FALSE);
-            if (!self->reader_slab) {
-                /* we've been cancelled while waiting for a slab */
-                g_mutex_unlock(self->slab_mutex);
-
-                /* wait for the xfer to cancel, so we don't get another buffer
-                 * pushed to us (and do so *without* the mutex held) */
-                wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
-
-                goto free_and_finish;
-            }
-           self->reader_slab->serial = self->next_serial++;
-           g_mutex_unlock(self->slab_mutex);
-       }
+    /* push the block into the ring buffer, in pieces if necessary */
+    g_mutex_lock(self->ring_mutex);
+    while (size > 0) {
+       gsize avail;
 
-       if (size == 0)
-           break;
-
-       copy_size = MIN(self->slab_size - self->reader_slab->size, size);
-       memcpy(self->reader_slab->base+self->reader_slab->size, p, copy_size);
+       /* wait for some space */
+       while (self->ring_count == self->ring_length && !elt->cancelled) {
+           DBG(9, "waiting for any space to buffer pushed data");
+           g_cond_wait(self->ring_free_cond, self->ring_mutex);
+       }
+       DBG(9, "done waiting");
 
-       self->reader_slab->size += copy_size;
-       p += copy_size;
-       size -= copy_size;
+       if (elt->cancelled)
+           goto unlock_and_free_and_finish;
+
+       /* only copy to the end of the buffer, if the available space wraps
+        * around to the beginning */
+       avail = MIN(size, self->ring_length - self->ring_count);
+       avail = MIN(avail, self->ring_length - self->ring_head);
+
+       /* copy AVAIL bytes into the ring buf (knowing it's contiguous) */
+       memmove(self->ring_buffer + self->ring_head, p, avail);
+
+       /* reset the ring variables to represent this state */
+       self->ring_count += avail;
+       self->ring_head += avail; /* will, at most, hit ring_length */
+       if (self->ring_head == self->ring_length)
+           self->ring_head = 0;
+       p = (gpointer)((guchar *)p + avail);
+       size -= avail;
+
+       /* and give the device thread a notice that data is ready */
+       g_cond_broadcast(self->ring_add_cond);
     }
 
+unlock_and_free_and_finish:
+    g_mutex_unlock(self->ring_mutex);
+
 free_and_finish:
     if (buf)
         g_free(buf);
@@ -1309,27 +741,47 @@ cancel_impl(
     g_cond_broadcast(self->state_cond);
     g_mutex_unlock(self->state_mutex);
 
-    g_mutex_lock(self->slab_mutex);
-    g_cond_broadcast(self->slab_cond);
-    g_cond_broadcast(self->slab_free_cond);
-    g_mutex_unlock(self->slab_mutex);
+    g_mutex_lock(self->ring_mutex);
+    g_cond_broadcast(self->ring_add_cond);
+    g_cond_broadcast(self->ring_free_cond);
+    g_mutex_unlock(self->ring_mutex);
 
     return rv;
 }
 
 static void
 start_part_impl(
-    XferDestTaper *xdtself,
+    XferDestTaper *xdt,
     gboolean retry_part,
     dumpfile_t *header)
 {
-    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
+    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
 
     g_assert(self->device != NULL);
     g_assert(!self->device->in_file);
     g_assert(header != NULL);
 
-    DBG(1, "start_part(retry_part=%d)", retry_part);
+    DBG(1, "start_part()");
+
+    /* we can only retry the part if we're getting slices via cache_inform's */
+    if (retry_part) {
+       if (self->last_part_successful) {
+           xfer_cancel_with_error(XFER_ELEMENT(self),
+               _("Previous part did not fail; cannot retry"));
+           return;
+       }
+
+       if (!self->expect_cache_inform) {
+           xfer_cancel_with_error(XFER_ELEMENT(self),
+               _("No cache for previous failed part; cannot retry"));
+           return;
+       }
+
+       self->bytes_to_read_from_slices = self->part_bytes_written;
+    } else {
+       /* don't read any bytes from the slices, since we're not retrying */
+       self->bytes_to_read_from_slices = 0;
+    }
 
     g_mutex_lock(self->state_mutex);
     g_assert(self->paused);
@@ -1339,27 +791,6 @@ start_part_impl(
        dumpfile_free(self->part_header);
     self->part_header = dumpfile_copy(header);
 
-    if (retry_part) {
-       if (!self->use_mem_cache && !self->part_slices) {
-           g_mutex_unlock(self->state_mutex);
-           xfer_cancel_with_error(XFER_ELEMENT(self),
-               _("Failed part was not cached; cannot retry"));
-           return;
-       }
-       g_assert(!self->last_part_successful);
-       self->retry_part = TRUE;
-    } else {
-       g_assert(self->last_part_successful);
-       self->retry_part = FALSE;
-       self->part_first_serial = self->part_stop_serial;
-       if (self->part_size != 0) {
-           self->part_stop_serial = self->part_first_serial + self->slabs_per_part;
-       } else {
-           /* set part_stop_serial to an effectively infinite value */
-           self->part_stop_serial = G_MAXUINT64;
-       }
-    }
-
     DBG(1, "unpausing");
     self->paused = FALSE;
     g_cond_broadcast(self->state_cond);
@@ -1373,8 +804,11 @@ use_device_impl(
     Device *device)
 {
     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
+    StreamingRequirement newstreaming;
     GValue val;
 
+    DBG(1, "use_device(%s)%s", device->device_name, (device == self->device)? " (no change)":"");
+
     /* short-circuit if nothing is changing */
     if (self->device == device)
        return;
@@ -1390,9 +824,11 @@ use_device_impl(
     if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
         || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
         g_warning("Couldn't get streaming type for %s", self->device->device_name);
-        self->streaming = STREAMING_REQUIREMENT_REQUIRED;
     } else {
-        self->streaming = g_value_get_enum(&val);
+        newstreaming = g_value_get_enum(&val);
+       if (newstreaming != self->streaming)
+           g_warning("New device has different streaming requirements from the original; "
+                   "ignoring new requirement");
     }
     g_value_unset(&val);
 
@@ -1408,41 +844,27 @@ use_device_impl(
 
 static void
 cache_inform_impl(
-    XferDestTaper *xdtself,
+    XferDestTaper *xdt,
     const char *filename,
     off_t offset,
     off_t length)
 {
-    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
-    FileSlice *slice, *iter;
-
-    DBG(1, "cache_inform(\"%s\", %jd, %jd)", filename, (intmax_t)offset, (intmax_t)length);
+    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
+    FileSlice *slice = g_new(FileSlice, 1), *iter;
 
-    /* do we even need this info? */
-    if (self->disk_cache_dirname || self->use_mem_cache || self->part_size == 0)
-       return;
-
-    /* handle the (admittedly unlikely) event that length is larger than gsize.
-     * Hopefully if sizeof(off_t) = sizeof(gsize), this will get optimized out */
-    while (sizeof(off_t) > sizeof(gsize) && length > (off_t)SIZE_MAX) {
-       cache_inform_impl(xdtself, filename, offset, (off_t)SIZE_MAX);
-       offset += (off_t)SIZE_MAX;
-       length -= (off_t)SIZE_MAX;
-    }
-
-    slice = g_new0(FileSlice, 1);
+    slice->next = NULL;
     slice->filename = g_strdup(filename);
     slice->offset = offset;
-    slice->length = (gsize)length;
+    slice->length = length;
 
-    g_mutex_lock(self->state_mutex);
+    g_mutex_lock(self->part_slices_mutex);
     if (self->part_slices) {
        for (iter = self->part_slices; iter->next; iter = iter->next) {}
        iter->next = slice;
     } else {
        self->part_slices = slice;
     }
-    g_mutex_unlock(self->state_mutex);
+    g_mutex_unlock(self->part_slices_mutex);
 }
 
 static guint64
@@ -1450,7 +872,11 @@ get_part_bytes_written_impl(
     XferDestTaper *xdtself)
 {
     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
-    return self->bytes_written + self->slab_bytes_written;
+
+    /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
+     * partial write to the 64-bit value on a 32-bit system).  This is ok for
+     * the moment, as it's only informational, but be warned. */
+    return self->part_bytes_written;
 }
 
 static void
@@ -1462,15 +888,17 @@ instance_init(
 
     self->state_mutex = g_mutex_new();
     self->state_cond = g_cond_new();
-    self->slab_mutex = g_mutex_new();
-    self->slab_cond = g_cond_new();
-    self->slab_free_cond = g_cond_new();
+    self->ring_mutex = g_mutex_new();
+    self->ring_add_cond = g_cond_new();
+    self->ring_free_cond = g_cond_new();
+    self->part_slices_mutex = g_mutex_new();
 
-    self->last_part_successful = TRUE;
+    self->device = NULL;
     self->paused = TRUE;
-    self->part_stop_serial = 0;
-    self->disk_cache_read_fd = -1;
-    self->disk_cache_write_fd = -1;
+    self->part_header = NULL;
+    self->partnum = 1;
+    self->part_bytes_written = 0;
+    self->part_slices = NULL;
 }
 
 static void
@@ -1478,49 +906,30 @@ finalize_impl(
     GObject * obj_self)
 {
     XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(obj_self);
-    Slab *slab, *next_slab;
     FileSlice *slice, *next_slice;
 
-    if (self->disk_cache_dirname)
-       g_free(self->disk_cache_dirname);
-
     g_mutex_free(self->state_mutex);
     g_cond_free(self->state_cond);
 
-    g_mutex_free(self->slab_mutex);
-    g_cond_free(self->slab_cond);
-    g_cond_free(self->slab_free_cond);
+    g_mutex_free(self->ring_mutex);
+    g_cond_free(self->ring_add_cond);
+    g_cond_free(self->ring_free_cond);
 
-    /* free the slab train, without reference to the refcounts */
-    for (slab = self->oldest_slab; slab != NULL; slab = next_slab) {
-        next_slab = slab->next;
-        free_slab(slab);
-    }
-    self->disk_cacher_slab = NULL;
-    self->mem_cache_slab = NULL;
-    self->device_slab = NULL;
-    self->oldest_slab = NULL;
-    self->newest_slab = NULL;
-
-    if (self->reader_slab) {
-        free_slab(self->reader_slab);
-        self->reader_slab = NULL;
-    }
+    g_mutex_free(self->part_slices_mutex);
 
     for (slice = self->part_slices; slice; slice = next_slice) {
        next_slice = slice->next;
-       g_free(slice->filename);
+       if (slice->filename)
+           g_free(slice->filename);
        g_free(slice);
     }
 
+    if (self->ring_buffer)
+       g_free(self->ring_buffer);
+
     if (self->part_header)
        dumpfile_free(self->part_header);
 
-    if (self->disk_cache_read_fd != -1)
-       close(self->disk_cache_read_fd); /* ignore error */
-    if (self->disk_cache_write_fd != -1)
-       close(self->disk_cache_write_fd); /* ignore error */
-
     if (self->device)
        g_object_unref(self->device);
 
@@ -1589,81 +998,48 @@ xfer_dest_taper_splitter(
     Device *first_device,
     size_t max_memory,
     guint64 part_size,
-    gboolean use_mem_cache,
-    const char *disk_cache_dirname)
+    gboolean expect_cache_inform)
 {
     XferDestTaperSplitter *self = (XferDestTaperSplitter *)g_object_new(XFER_DEST_TAPER_SPLITTER_TYPE, NULL);
+    GValue val;
+
+    /* max_memory and part_size get rounded up to the next multiple of
+     * block_size */
+    max_memory = ((max_memory + first_device->block_size - 1)
+                       / first_device->block_size) * first_device->block_size;
+    if (part_size)
+       part_size = ((part_size + first_device->block_size - 1)
+                           / first_device->block_size) * first_device->block_size;
 
-    self->max_memory = max_memory;
     self->part_size = part_size;
     self->partnum = 1;
     self->device = first_device;
-    g_object_ref(self->device);
-
-    /* pick only one caching mechanism, caller! */
-    g_assert(!use_mem_cache || !disk_cache_dirname);
-
-    /* and if part size is zero, then we don't do any caching */
-    if (part_size == 0) {
-       g_assert(!use_mem_cache && !disk_cache_dirname);
-    }
 
-    self->use_mem_cache = use_mem_cache;
-    if (disk_cache_dirname) {
-       self->disk_cache_dirname = g_strdup(disk_cache_dirname);
-
-       self->part_slices = g_new0(FileSlice, 1);
-       self->part_slices->filename = NULL; /* indicates "use disk_cache_read_fd" */
-       self->part_slices->offset = 0;
-       self->part_slices->length = 0; /* will be filled in in start_part */
-    }
-
-    /* calculate the device-dependent parameters */
+    g_object_ref(self->device);
     self->block_size = first_device->block_size;
+    self->paused = TRUE;
+    self->no_more_parts = FALSE;
 
-    /* The slab size should be large enough to justify the overhead of all
-     * of the mutexes, but it needs to be small enough to have a few slabs
-     * available so that the threads are not constantly waiting on one
-     * another.  The choice is sixteen blocks, not more than a quarter of
-     * the part size, and not more than 10MB.  If we're not using the mem
-     * cache, then avoid exceeding max_memory by keeping the slab size less
-     * than a quarter of max_memory. */
-
-    self->slab_size = self->block_size * 16;
-    if (self->part_size)
-        self->slab_size = MIN(self->slab_size, self->part_size / 4);
-    self->slab_size = MIN(self->slab_size, 10*1024*1024);
-    if (!self->use_mem_cache)
-        self->slab_size = MIN(self->slab_size, self->max_memory / 4);
-
-    /* round slab size up to the nearest multiple of the block size */
-    self->slab_size =
-        ((self->slab_size + self->block_size - 1) / self->block_size) * self->block_size;
-
-    /* round part size up to a multiple of the slab size */
-    if (self->part_size != 0) {
-        self->slabs_per_part = (self->part_size + self->slab_size - 1) / self->slab_size;
-        self->part_size = self->slabs_per_part * self->slab_size;
-    } else {
-        self->slabs_per_part = 0;
-    }
-
-    /* fill in the file slice's length, now that we know the real part size */
-    if (self->disk_cache_dirname)
-        self->part_slices->length = self->part_size;
+    /* set up a ring buffer of size max_memory */
+    self->ring_length = max_memory;
+    self->ring_buffer = g_malloc(max_memory);
+    self->ring_head = self->ring_tail = 0;
+    self->ring_count = 0;
+    self->ring_head_at_eof = 0;
 
-    if (self->use_mem_cache) {
-        self->max_slabs = self->slabs_per_part;
+    /* get this new device's streaming requirements */
+    bzero(&val, sizeof(val));
+    if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
+        || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
+        g_warning("Couldn't get streaming type for %s", self->device->device_name);
+        self->streaming = STREAMING_REQUIREMENT_REQUIRED;
     } else {
-        self->max_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
+        self->streaming = g_value_get_enum(&val);
     }
+    g_value_unset(&val);
 
-    /* Note that max_slabs == 1 will cause deadlocks, due to some assumptions in
-        * alloc_slab, so we check here that it's at least 2. */
-    if (self->max_slabs < 2)
-        self->max_slabs = 2;
-
-    DBG(1, "using slab_size %zu and max_slabs %ju", self->slab_size, (uintmax_t)self->max_slabs);
+    /* grab data from cache_inform, just in case we hit PEOM */
+    self->expect_cache_inform = expect_cache_inform;
 
     return XFER_ELEMENT(self);
 }