]> git.gag.com Git - debian/amanda/blobdiff - device-src/xfer-dest-taper-splitter.c
Merge branch 'master' into squeeze
[debian/amanda] / device-src / xfer-dest-taper-splitter.c
diff --git a/device-src/xfer-dest-taper-splitter.c b/device-src/xfer-dest-taper-splitter.c
new file mode 100644 (file)
index 0000000..ccd7fde
--- /dev/null
@@ -0,0 +1,1042 @@
+/*
+ * Amanda, The Advanced Maryland Automatic Network Disk Archiver
+ * Copyright (c) 2009, 2010 Zmanda, Inc.  All Rights Reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+ *
+ * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
+ * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
+ */
+
+#include "amanda.h"
+#include "amxfer.h"
+#include "xfer-device.h"
+#include "arglist.h"
+#include "conffile.h"
+
+/* A transfer destination that writes an entire dumpfile to one or more files
+ * on one or more devices, without any caching.  This destination supports both
+ * LEOM-based splitting (in which parts are never rewound) and cache_inform-based
+ * splitting (in which rewound parts are read from holding disk). */
+
+/*
+ * File Slices - Cache Information
+ *
+ * The cache_inform implementation adds cache information to a linked list of
+ * these objects, in order.  The objects are arranged in a linked list, and
+ * describe the files in which the part data is stored.  Note that we assume
+ * that slices are added *before* they are needed: the xfer element will fail
+ * if it tries to rewind and does not find a suitable slice.
+ *
+ * The slices should be "fast forwarded" after every part, so that the first
+ * byte in part_slices is the first byte of the part; when a retry of a part is
+ * required, use the iterator methods to properly open the various files and do
+ * the buffering.
+ */
+
+typedef struct FileSlice {
+    struct FileSlice *next;
+
+    /* fully-qualified filename to read from (or NULL to read from
+     * disk_cache_read_fd in XferDestTaperCacher) */
+    char *filename;
+
+    /* offset in file to start at */
+    guint64 offset;
+
+    /* length of data to read */
+    guint64 length;
+} FileSlice;
+
+/*
+ * Xfer Dest Taper
+ */
+
+static GObjectClass *parent_class = NULL;
+
+typedef struct XferDestTaperSplitter {
+    XferDestTaper __parent__;
+
+    /* object parameters
+     *
+     * These values are supplied to the constructor, and can be assumed
+     * constant for the lifetime of the element.
+     */
+
+    /* Maximum size of each part (bytes) */
+    guint64 part_size;
+
+    /* the device's need for streaming (it's assumed that all subsequent devices
+     * have the same needs) */
+    StreamingRequirement streaming;
+
+    /* block size expected by the target device */
+    gsize block_size;
+
+    /* TRUE if this element is expecting slices via cache_inform */
+    gboolean expect_cache_inform;
+
+    /* The thread doing the actual writes to tape; this also handles buffering
+     * for streaming */
+    GThread *device_thread;
+
+    /* Ring Buffer
+     *
+     * This buffer holds MAX_MEMORY bytes of data (rounded up to the next
+     * blocksize), and serves as the interface between the device_thread and
+     * the thread calling push_buffer.  Ring_length is the total length of the
+     * buffer in bytes, while ring_count is the number of data bytes currently
+     * in the buffer.  The ring_add_cond is signalled when data is added to the
+     * buffer, while ring_free_cond is signalled when data is removed.  Both
+     * are governed by ring_mutex, and both are signalled when the transfer is
+     * cancelled.
+     */
+
+    GMutex *ring_mutex;
+    GCond *ring_add_cond, *ring_free_cond;
+    gchar *ring_buffer;
+    gsize ring_length, ring_count;
+    gsize ring_head, ring_tail;
+    gboolean ring_head_at_eof;
+
+    /* Element State
+     *
+     * "state" includes all of the variables below (including device
+     * parameters).  Note that the device_thread holdes this mutex for the
+     * entire duration of writing a part.
+     *
+     * state_mutex should always be locked before ring_mutex, if both are to be
+     * held simultaneously.
+     */
+    GMutex *state_mutex;
+    GCond *state_cond;
+    volatile gboolean paused;
+
+    /* The device to write to, and the header to write to it */
+    Device *volatile device;
+    dumpfile_t *volatile part_header;
+
+    /* bytes to read from cached slices before reading from the ring buffer */
+    guint64 bytes_to_read_from_slices;
+
+    /* part number in progress */
+    volatile guint64 partnum;
+
+    /* status of the last part */
+    gboolean last_part_eof;
+    gboolean last_part_eom;
+    gboolean last_part_successful;
+
+    /* true if the element is done writing to devices */
+    gboolean no_more_parts;
+
+    /* total bytes written in the current part */
+    volatile guint64 part_bytes_written;
+
+    /* The list of active slices for the current part.  The cache_inform method
+     * appends to this list. It is safe to read this linked list, beginning at
+     * the head, *if* you can guarantee that slices will not be fast-forwarded
+     * in the interim.  The finalize method for this class will take care of
+     * freeing any leftover slices. Take the part_slices mutex while modifying
+     * the links in this list. */
+    FileSlice *part_slices;
+    GMutex *part_slices_mutex;
+} XferDestTaperSplitter;
+
+static GType xfer_dest_taper_splitter_get_type(void);
+#define XFER_DEST_TAPER_SPLITTER_TYPE (xfer_dest_taper_splitter_get_type())
+#define XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter)
+#define XFER_DEST_TAPER_SPLITTER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter const)
+#define XFER_DEST_TAPER_SPLITTER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
+#define IS_XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_splitter_get_type ())
+#define XFER_DEST_TAPER_SPLITTER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
+
+typedef struct {
+    XferDestTaperClass __parent__;
+
+} XferDestTaperSplitterClass;
+
+/*
+ * Debug logging
+ */
+
+#define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
+static void
+_xdt_dbg(const char *fmt, ...)
+{
+    va_list argp;
+    char msg[1024];
+
+    arglist_start(argp, fmt);
+    g_vsnprintf(msg, sizeof(msg), fmt, argp);
+    arglist_end(argp);
+    g_debug("XDT: %s", msg);
+}
+
+/* "Fast forward" the slice list by the given length.  This will free any
+ * slices that are no longer necessary, and adjust the offset and length of the
+ * first remaining slice.  This assumes the state mutex is locked during its
+ * operation.
+ *
+ * @param self: element
+ * @param length: number of bytes to fast forward
+ */
+static void
+fast_forward_slices(
+       XferDestTaperSplitter *self,
+       guint64 length)
+{
+    FileSlice *slice;
+
+    /* consume slices until we've eaten the whole part */
+    g_mutex_lock(self->part_slices_mutex);
+    while (length > 0) {
+       g_assert(self->part_slices);
+       slice = self->part_slices;
+
+       if (slice->length <= length) {
+           length -= slice->length;
+
+           self->part_slices = slice->next;
+           if (slice->filename)
+               g_free(slice->filename);
+           g_free(slice);
+           slice = self->part_slices;
+       } else {
+           slice->length -= length;
+           slice->offset += length;
+           break;
+       }
+    }
+    g_mutex_unlock(self->part_slices_mutex);
+}
+
+/*
+ * Slice Iterator
+ */
+
+/* A struct for use in iterating over data in the slices */
+typedef struct SliceIterator {
+    /* current slice */
+    FileSlice *slice;
+
+    /* file descriptor of the current file, or -1 if it's not open yet */
+    int cur_fd;
+
+    /* bytes remaining in this slice */
+    guint64 slice_remaining;
+} SliceIterator;
+
+/* Utility functions for SliceIterator */
+
+/* Begin iterating over slices, starting at the first byte of the first slice.
+ * Initializes a pre-allocated SliceIterator.  The caller must ensure that
+ * fast_forward_slices is not called while an iteration is in
+ * progress.
+ */
+static void
+iterate_slices(
+       XferDestTaperSplitter *self,
+       SliceIterator *iter)
+{
+    iter->cur_fd = -1;
+    iter->slice_remaining = 0;
+    g_mutex_lock(self->part_slices_mutex);
+    iter->slice = self->part_slices;
+    /* it's safe to unlock this because, at worst, a new entry will
+     * be appended while the iterator is in progress */
+    g_mutex_unlock(self->part_slices_mutex);
+}
+
+
+/* Get a block of data from the iterator, returning a pointer to a buffer
+ * containing the data; the buffer remains the property of the iterator.
+ * Returns NULL on error, after calling xfer_cancel_with_error with an
+ * appropriate error message.  This function does not block, so it does not
+ * check for cancellation.
+ */
+static gpointer
+iterator_get_block(
+       XferDestTaperSplitter *self,
+       SliceIterator *iter,
+       gpointer buf,
+       gsize bytes_needed)
+{
+    gsize buf_offset = 0;
+    XferElement *elt = XFER_ELEMENT(self);
+
+    g_assert(iter != NULL);
+    g_assert(buf != NULL);
+
+    while (bytes_needed > 0) {
+       gsize read_size;
+       int bytes_read;
+
+       if (iter->cur_fd < 0) {
+           guint64 offset;
+
+           g_assert(iter->slice != NULL);
+           g_assert(iter->slice->filename != NULL);
+
+           iter->cur_fd = open(iter->slice->filename, O_RDONLY, 0);
+           if (iter->cur_fd < 0) {
+               xfer_cancel_with_error(elt,
+                   _("Could not open '%s' for reading: %s"),
+                   iter->slice->filename, strerror(errno));
+               return NULL;
+           }
+
+           iter->slice_remaining = iter->slice->length;
+           offset = iter->slice->offset;
+
+           if (lseek(iter->cur_fd, offset, SEEK_SET) == -1) {
+               xfer_cancel_with_error(elt,
+                   _("Could not seek '%s' for reading: %s"),
+                   iter->slice->filename, strerror(errno));
+               return NULL;
+           }
+       }
+
+       read_size = MIN(iter->slice_remaining, bytes_needed);
+       bytes_read = full_read(iter->cur_fd,
+                              buf + buf_offset,
+                              read_size);
+       if (bytes_read < 0 || (gsize)bytes_read < read_size) {
+           xfer_cancel_with_error(elt,
+               _("Error reading '%s': %s"),
+               iter->slice->filename,
+               errno? strerror(errno) : _("Unexpected EOF"));
+           return NULL;
+       }
+
+       iter->slice_remaining -= bytes_read;
+       buf_offset += bytes_read;
+       bytes_needed -= bytes_read;
+
+       if (iter->slice_remaining <= 0) {
+           if (close(iter->cur_fd) < 0) {
+               xfer_cancel_with_error(elt,
+                   _("Could not close fd %d: %s"),
+                   iter->cur_fd, strerror(errno));
+               return NULL;
+           }
+           iter->cur_fd = -1;
+
+           iter->slice = iter->slice->next;
+
+           if (elt->cancelled)
+               return NULL;
+       }
+    }
+
+    return buf;
+}
+
+
+/* Free the iterator's resources */
+static void
+iterator_free(
+       SliceIterator *iter)
+{
+    if (iter->cur_fd >= 0)
+       close(iter->cur_fd);
+}
+
+/*
+ * Device Thread
+ */
+
+/* Wait for at least one block, or EOF, to be available in the ring buffer.
+ * Called with the ring mutex held. */
+static gsize
+device_thread_wait_for_block(
+    XferDestTaperSplitter *self)
+{
+    XferElement *elt = XFER_ELEMENT(self);
+    gsize bytes_needed = self->device->block_size;
+    gsize usable;
+
+    /* for any kind of streaming, we need to fill the entire buffer before the
+     * first byte */
+    if (self->part_bytes_written == 0 && self->streaming != STREAMING_REQUIREMENT_NONE)
+       bytes_needed = self->ring_length;
+
+    while (1) {
+       /* are we ready? */
+       if (elt->cancelled)
+           break;
+
+       if (self->ring_count >= bytes_needed)
+           break;
+
+       if (self->ring_head_at_eof)
+           break;
+
+       /* nope - so wait */
+       g_cond_wait(self->ring_add_cond, self->ring_mutex);
+
+       /* in STREAMING_REQUIREMENT_REQUIRED, once we decide to wait for more bytes,
+        * we need to wait for the entire buffer to fill */
+       if (self->streaming == STREAMING_REQUIREMENT_REQUIRED)
+           bytes_needed = self->ring_length;
+    }
+
+    usable = MIN(self->ring_count, bytes_needed);
+    if (self->part_size)
+       usable = MIN(usable, self->part_size - self->part_bytes_written);
+
+    return usable;
+}
+
+/* Mark WRITTEN bytes as free in the ring buffer.  Called with the ring mutex
+ * held. */
+static void
+device_thread_consume_block(
+    XferDestTaperSplitter *self,
+    gsize written)
+{
+    self->ring_count -= written;
+    self->ring_tail += written;
+    if (self->ring_tail >= self->ring_length)
+       self->ring_tail -= self->ring_length;
+    g_cond_broadcast(self->ring_free_cond);
+}
+
+/* Write an entire part.  Called with the state_mutex held */
+static XMsg *
+device_thread_write_part(
+    XferDestTaperSplitter *self)
+{
+    GTimer *timer = g_timer_new();
+    XferElement *elt = XFER_ELEMENT(self);
+
+    enum { PART_EOF, PART_LEOM, PART_EOP, PART_FAILED } part_status = PART_FAILED;
+    int fileno = 0;
+    XMsg *msg;
+
+    self->part_bytes_written = 0;
+
+    g_timer_start(timer);
+
+    /* write the header; if this fails or hits LEOM, we consider this a
+     * successful 0-byte part */
+    if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
+       part_status = PART_LEOM;
+       goto part_done;
+    }
+
+    fileno = self->device->file;
+    g_assert(fileno > 0);
+
+    /* free the header, now that it's written */
+    dumpfile_free(self->part_header);
+    self->part_header = NULL;
+
+    /* First, read the requisite number of bytes from the part_slices, if the part was
+     * unsuccessful. */
+    if (self->bytes_to_read_from_slices) {
+       SliceIterator iter;
+       gsize to_write = self->block_size;
+       gpointer buf = g_malloc(to_write);
+       gboolean successful = TRUE;
+       guint64 bytes_from_slices = self->bytes_to_read_from_slices;
+
+       DBG(5, "reading %ju bytes from slices", (uintmax_t)bytes_from_slices);
+
+       iterate_slices(self, &iter);
+       while (bytes_from_slices) {
+           gboolean ok;
+
+           if (!iterator_get_block(self, &iter, buf, to_write)) {
+               part_status = PART_FAILED;
+               successful = FALSE;
+               break;
+           }
+
+           /* note that it's OK to reference these ring_* vars here, as they
+            * are static at this point */
+           ok = device_write_block(self->device, (guint)to_write, buf);
+
+           if (!ok) {
+               part_status = PART_FAILED;
+               successful = FALSE;
+               break;
+           }
+
+           self->part_bytes_written += to_write;
+           bytes_from_slices -= to_write;
+
+           if (self->part_size && self->part_bytes_written >= self->part_size) {
+               part_status = PART_EOP;
+               successful = FALSE;
+               break;
+           } else if (self->device->is_eom) {
+               part_status = PART_LEOM;
+               successful = FALSE;
+               break;
+           }
+       }
+
+       iterator_free(&iter);
+       g_free(buf);
+
+       /* if we didn't finish, get out of here now */
+       if (!successful)
+           goto part_done;
+    }
+
+    g_mutex_lock(self->ring_mutex);
+    while (1) {
+       gsize to_write;
+       gboolean ok;
+
+       /* wait for at least one block, and (if necessary) prebuffer */
+       to_write = device_thread_wait_for_block(self);
+       to_write = MIN(to_write, self->device->block_size);
+       if (elt->cancelled)
+           break;
+
+       if (to_write == 0) {
+           part_status = PART_EOF;
+           break;
+       }
+
+       g_mutex_unlock(self->ring_mutex);
+       DBG(8, "writing %ju bytes to device", (uintmax_t)to_write);
+
+       /* note that it's OK to reference these ring_* vars here, as they
+        * are static at this point */
+       ok = device_write_block(self->device, (guint)to_write,
+               self->ring_buffer + self->ring_tail);
+       g_mutex_lock(self->ring_mutex);
+
+       if (!ok) {
+           part_status = PART_FAILED;
+           break;
+       }
+
+       self->part_bytes_written += to_write;
+       device_thread_consume_block(self, to_write);
+
+       if (self->part_size && self->part_bytes_written >= self->part_size) {
+           part_status = PART_EOP;
+           break;
+       } else if (self->device->is_eom) {
+           part_status = PART_LEOM;
+           break;
+       }
+    }
+    g_mutex_unlock(self->ring_mutex);
+part_done:
+
+    /* if we write all of the blocks, but the finish_file fails, then likely
+     * there was some buffering going on in the device driver, and the blocks
+     * did not all make it to permanent storage -- so it's a failed part.  Note
+     * that we try to finish_file even if the part failed, just to be thorough. */
+    if (self->device->in_file) {
+       if (!device_finish_file(self->device))
+           if (!elt->cancelled) {
+               part_status = PART_FAILED;
+           }
+    }
+
+    g_timer_stop(timer);
+
+    msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
+    msg->size = self->part_bytes_written;
+    msg->duration = g_timer_elapsed(timer, NULL);
+    msg->partnum = self->partnum;
+    msg->fileno = fileno;
+    msg->successful = self->last_part_successful = part_status != PART_FAILED;
+    msg->eom = self->last_part_eom = part_status == PART_LEOM || self->device->is_eom;
+    msg->eof = self->last_part_eof = part_status == PART_EOF;
+
+    /* time runs backward on some test boxes, so make sure this is positive */
+    if (msg->duration < 0) msg->duration = 0;
+
+    if (msg->successful)
+       self->partnum++;
+    self->no_more_parts = msg->eof || (!msg->successful && !self->expect_cache_inform);
+
+    g_timer_destroy(timer);
+
+    return msg;
+}
+
+static gpointer
+device_thread(
+    gpointer data)
+{
+    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
+    XferElement *elt = XFER_ELEMENT(self);
+    XMsg *msg;
+
+    DBG(1, "(this is the device thread)");
+
+    /* This is the outer loop, that loops once for each split part written to
+     * tape. */
+    g_mutex_lock(self->state_mutex);
+    while (1) {
+       /* wait until the main thread un-pauses us, and check that we have
+        * the relevant device info available (block_size) */
+       while (self->paused && !elt->cancelled) {
+           DBG(9, "waiting to be unpaused");
+           g_cond_wait(self->state_cond, self->state_mutex);
+       }
+       DBG(9, "done waiting");
+
+        if (elt->cancelled)
+           break;
+
+       DBG(2, "beginning to write part");
+       msg = device_thread_write_part(self);
+       DBG(2, "done writing part");
+
+       if (!msg) /* cancelled */
+           break;
+
+       /* release the slices for this part, if there were any slices */
+       if (msg->successful && self->expect_cache_inform) {
+           fast_forward_slices(self, msg->size);
+       }
+
+       xfer_queue_message(elt->xfer, msg);
+
+       /* pause ourselves and await instructions from the main thread */
+       self->paused = TRUE;
+
+       /* if this is the last part, we're done with the part loop */
+       if (self->no_more_parts)
+           break;
+    }
+    g_mutex_unlock(self->state_mutex);
+
+    /* tell the main thread we're done */
+    xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
+
+    return NULL;
+}
+
+/*
+ * Class mechanics
+ */
+
+static void
+push_buffer_impl(
+    XferElement *elt,
+    gpointer buf,
+    size_t size)
+{
+    XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
+    gchar *p = buf;
+
+    DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
+
+    /* do nothing if cancelled */
+    if (G_UNLIKELY(elt->cancelled)) {
+        goto free_and_finish;
+    }
+
+    /* handle EOF */
+    if (G_UNLIKELY(buf == NULL)) {
+       /* indicate EOF to the device thread */
+       g_mutex_lock(self->ring_mutex);
+       self->ring_head_at_eof = TRUE;
+       g_cond_broadcast(self->ring_add_cond);
+       g_mutex_unlock(self->ring_mutex);
+       goto free_and_finish;
+    }
+
+    /* push the block into the ring buffer, in pieces if necessary */
+    g_mutex_lock(self->ring_mutex);
+    while (size > 0) {
+       gsize avail;
+
+       /* wait for some space */
+       while (self->ring_count == self->ring_length && !elt->cancelled) {
+           DBG(9, "waiting for any space to buffer pushed data");
+           g_cond_wait(self->ring_free_cond, self->ring_mutex);
+       }
+       DBG(9, "done waiting");
+
+       if (elt->cancelled)
+           goto unlock_and_free_and_finish;
+
+       /* only copy to the end of the buffer, if the available space wraps
+        * around to the beginning */
+       avail = MIN(size, self->ring_length - self->ring_count);
+       avail = MIN(avail, self->ring_length - self->ring_head);
+
+       /* copy AVAIL bytes into the ring buf (knowing it's contiguous) */
+       memmove(self->ring_buffer + self->ring_head, p, avail);
+
+       /* reset the ring variables to represent this state */
+       self->ring_count += avail;
+       self->ring_head += avail; /* will, at most, hit ring_length */
+       if (self->ring_head == self->ring_length)
+           self->ring_head = 0;
+       p = (gpointer)((guchar *)p + avail);
+       size -= avail;
+
+       /* and give the device thread a notice that data is ready */
+       g_cond_broadcast(self->ring_add_cond);
+    }
+
+unlock_and_free_and_finish:
+    g_mutex_unlock(self->ring_mutex);
+
+free_and_finish:
+    if (buf)
+        g_free(buf);
+}
+
+/*
+ * Element mechanics
+ */
+
+static gboolean
+start_impl(
+    XferElement *elt)
+{
+    XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
+    GError *error = NULL;
+
+    self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
+    if (!self->device_thread) {
+        g_critical(_("Error creating new thread: %s (%s)"),
+            error->message, errno? strerror(errno) : _("no error code"));
+    }
+
+    return TRUE;
+}
+
+static gboolean
+cancel_impl(
+    XferElement *elt,
+    gboolean expect_eof)
+{
+    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
+    gboolean rv;
+
+    /* chain up first */
+    rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
+
+    /* then signal all of our condition variables, so that threads waiting on them
+     * wake up and see elt->cancelled. */
+    g_mutex_lock(self->ring_mutex);
+    g_cond_broadcast(self->ring_add_cond);
+    g_cond_broadcast(self->ring_free_cond);
+    g_mutex_unlock(self->ring_mutex);
+
+    g_mutex_lock(self->state_mutex);
+    g_cond_broadcast(self->state_cond);
+    g_mutex_unlock(self->state_mutex);
+
+    return rv;
+}
+
+static void
+start_part_impl(
+    XferDestTaper *xdt,
+    gboolean retry_part,
+    dumpfile_t *header)
+{
+    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
+
+    g_assert(self->device != NULL);
+    g_assert(!self->device->in_file);
+    g_assert(header != NULL);
+
+    DBG(1, "start_part()");
+
+    /* we can only retry the part if we're getting slices via cache_inform's */
+    if (retry_part) {
+       if (self->last_part_successful) {
+           xfer_cancel_with_error(XFER_ELEMENT(self),
+               _("Previous part did not fail; cannot retry"));
+           return;
+       }
+
+       if (!self->expect_cache_inform) {
+           xfer_cancel_with_error(XFER_ELEMENT(self),
+               _("No cache for previous failed part; cannot retry"));
+           return;
+       }
+
+       self->bytes_to_read_from_slices = self->part_bytes_written;
+    } else {
+       /* don't read any bytes from the slices, since we're not retrying */
+       self->bytes_to_read_from_slices = 0;
+    }
+
+    g_mutex_lock(self->state_mutex);
+    g_assert(self->paused);
+    g_assert(!self->no_more_parts);
+
+    if (self->part_header)
+       dumpfile_free(self->part_header);
+    self->part_header = dumpfile_copy(header);
+
+    DBG(1, "unpausing");
+    self->paused = FALSE;
+    g_cond_broadcast(self->state_cond);
+
+    g_mutex_unlock(self->state_mutex);
+}
+
+static void
+use_device_impl(
+    XferDestTaper *xdtself,
+    Device *device)
+{
+    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
+    StreamingRequirement newstreaming;
+    GValue val;
+
+    DBG(1, "use_device(%s)%s", device->device_name, (device == self->device)? " (no change)":"");
+
+    /* short-circuit if nothing is changing */
+    if (self->device == device)
+       return;
+
+    g_mutex_lock(self->state_mutex);
+    if (self->device)
+       g_object_unref(self->device);
+    self->device = device;
+    g_object_ref(device);
+
+    /* get this new device's streaming requirements */
+    bzero(&val, sizeof(val));
+    if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
+        || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
+        g_warning("Couldn't get streaming type for %s", self->device->device_name);
+    } else {
+        newstreaming = g_value_get_enum(&val);
+       if (newstreaming != self->streaming)
+           g_warning("New device has different streaming requirements from the original; "
+                   "ignoring new requirement");
+    }
+    g_value_unset(&val);
+
+    /* check that the blocksize hasn't changed */
+    if (self->block_size != device->block_size) {
+        g_mutex_unlock(self->state_mutex);
+        xfer_cancel_with_error(XFER_ELEMENT(self),
+            _("All devices used by the taper must have the same block size"));
+        return;
+    }
+    g_mutex_unlock(self->state_mutex);
+}
+
+static void
+cache_inform_impl(
+    XferDestTaper *xdt,
+    const char *filename,
+    off_t offset,
+    off_t length)
+{
+    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
+    FileSlice *slice = g_new(FileSlice, 1), *iter;
+
+    slice->next = NULL;
+    slice->filename = g_strdup(filename);
+    slice->offset = offset;
+    slice->length = length;
+
+    g_mutex_lock(self->part_slices_mutex);
+    if (self->part_slices) {
+       for (iter = self->part_slices; iter->next; iter = iter->next) {}
+       iter->next = slice;
+    } else {
+       self->part_slices = slice;
+    }
+    g_mutex_unlock(self->part_slices_mutex);
+}
+
+static guint64
+get_part_bytes_written_impl(
+    XferDestTaper *xdtself)
+{
+    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
+
+    /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
+     * partial write to the 64-bit value on a 32-bit system).  This is ok for
+     * the moment, as it's only informational, but be warned. */
+    return self->part_bytes_written;
+}
+
+static void
+instance_init(
+    XferElement *elt)
+{
+    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
+    elt->can_generate_eof = FALSE;
+
+    self->state_mutex = g_mutex_new();
+    self->state_cond = g_cond_new();
+    self->ring_mutex = g_mutex_new();
+    self->ring_add_cond = g_cond_new();
+    self->ring_free_cond = g_cond_new();
+    self->part_slices_mutex = g_mutex_new();
+
+    self->device = NULL;
+    self->paused = TRUE;
+    self->part_header = NULL;
+    self->partnum = 1;
+    self->part_bytes_written = 0;
+    self->part_slices = NULL;
+}
+
+static void
+finalize_impl(
+    GObject * obj_self)
+{
+    XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(obj_self);
+    FileSlice *slice, *next_slice;
+
+    g_mutex_free(self->state_mutex);
+    g_cond_free(self->state_cond);
+
+    g_mutex_free(self->ring_mutex);
+    g_cond_free(self->ring_add_cond);
+    g_cond_free(self->ring_free_cond);
+
+    g_mutex_free(self->part_slices_mutex);
+
+    for (slice = self->part_slices; slice; slice = next_slice) {
+       next_slice = slice->next;
+       if (slice->filename)
+           g_free(slice->filename);
+       g_free(slice);
+    }
+
+    if (self->ring_buffer)
+       g_free(self->ring_buffer);
+
+    if (self->part_header)
+       dumpfile_free(self->part_header);
+
+    if (self->device)
+       g_object_unref(self->device);
+
+    /* chain up */
+    G_OBJECT_CLASS(parent_class)->finalize(obj_self);
+}
+
+static void
+class_init(
+    XferDestTaperSplitterClass * selfc)
+{
+    XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
+    XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
+    GObjectClass *goc = G_OBJECT_CLASS(selfc);
+    static xfer_element_mech_pair_t mech_pairs[] = {
+       { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 1},
+       { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
+    };
+
+    klass->start = start_impl;
+    klass->cancel = cancel_impl;
+    klass->push_buffer = push_buffer_impl;
+    xdt_klass->start_part = start_part_impl;
+    xdt_klass->use_device = use_device_impl;
+    xdt_klass->cache_inform = cache_inform_impl;
+    xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
+    goc->finalize = finalize_impl;
+
+    klass->perl_class = "Amanda::Xfer::Dest::Taper::Splitter";
+    klass->mech_pairs = mech_pairs;
+
+    parent_class = g_type_class_peek_parent(selfc);
+}
+
+static GType
+xfer_dest_taper_splitter_get_type (void)
+{
+    static GType type = 0;
+
+    if G_UNLIKELY(type == 0) {
+        static const GTypeInfo info = {
+            sizeof (XferDestTaperSplitterClass),
+            (GBaseInitFunc) NULL,
+            (GBaseFinalizeFunc) NULL,
+            (GClassInitFunc) class_init,
+            (GClassFinalizeFunc) NULL,
+            NULL /* class_data */,
+            sizeof (XferDestTaperSplitter),
+            0 /* n_preallocs */,
+            (GInstanceInitFunc) instance_init,
+            NULL
+        };
+
+        type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperSplitter", &info, 0);
+    }
+
+    return type;
+}
+
+/*
+ * Constructor
+ */
+
+XferElement *
+xfer_dest_taper_splitter(
+    Device *first_device,
+    size_t max_memory,
+    guint64 part_size,
+    gboolean expect_cache_inform)
+{
+    XferDestTaperSplitter *self = (XferDestTaperSplitter *)g_object_new(XFER_DEST_TAPER_SPLITTER_TYPE, NULL);
+    GValue val;
+
+    /* max_memory and part_size get rounded up to the next multiple of
+     * block_size */
+    max_memory = ((max_memory + first_device->block_size - 1)
+                       / first_device->block_size) * first_device->block_size;
+    if (part_size)
+       part_size = ((part_size + first_device->block_size - 1)
+                           / first_device->block_size) * first_device->block_size;
+
+    self->part_size = part_size;
+    self->partnum = 1;
+    self->device = first_device;
+
+    g_object_ref(self->device);
+    self->block_size = first_device->block_size;
+    self->paused = TRUE;
+    self->no_more_parts = FALSE;
+
+    /* set up a ring buffer of size max_memory */
+    self->ring_length = max_memory;
+    self->ring_buffer = g_malloc(max_memory);
+    self->ring_head = self->ring_tail = 0;
+    self->ring_count = 0;
+    self->ring_head_at_eof = 0;
+
+    /* get this new device's streaming requirements */
+    bzero(&val, sizeof(val));
+    if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
+        || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
+        g_warning("Couldn't get streaming type for %s", self->device->device_name);
+        self->streaming = STREAMING_REQUIREMENT_REQUIRED;
+    } else {
+        self->streaming = g_value_get_enum(&val);
+    }
+    g_value_unset(&val);
+
+    /* grab data from cache_inform, just in case we hit PEOM */
+    self->expect_cache_inform = expect_cache_inform;
+
+    return XFER_ELEMENT(self);
+}