X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fxfer-dest-taper-splitter.c;h=7295da389c90301ecfb9222bfd7c6c6a91a18014;hb=b116e9366c7b2ea2c2eb53b0a13df4090e176235;hp=c0aa5301f324388af6493eb688d60e87a479f087;hpb=fd48f3e498442f0cbff5f3606c7c403d0566150e;p=debian%2Famanda diff --git a/device-src/xfer-dest-taper-splitter.c b/device-src/xfer-dest-taper-splitter.c index c0aa530..7295da3 100644 --- a/device-src/xfer-dest-taper-splitter.c +++ b/device-src/xfer-dest-taper-splitter.c @@ -19,72 +19,44 @@ * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com */ -#include "amxfer.h" #include "amanda.h" +#include "amxfer.h" #include "xfer-device.h" #include "arglist.h" #include "conffile.h" -/* A transfer destination that writes and entire dumpfile to one or more files on one - * or more devices. This is designed to work in concert with Amanda::Taper::Scribe. */ - -/* Future Plans: - * - capture EOF early enough to avoid wasting a tape when the part size is an even multiple of the volume size - maybe reader thread can just go back and tag previous slab with EOF in that case? - * - use mmap to make the disk-cacher thread unnecessary, if supported, by simply mapping slabs into the disk cache file - * - can we find a way to fall back to mem_cache when the disk cache gets ENOSPC? Does it even make sense to try, since this would change the part size? - * - distinguish some permanent device errors and do not retry the part? (this will be a change of behavior) - */ +/* A transfer destination that writes an entire dumpfile to one or more files + * on one or more devices, without any caching. This destination supports both + * LEOM-based splitting (in which parts are never rewound) and cache_inform-based + * splitting (in which rewound parts are read from holding disk). */ /* - * Slabs + * File Slices - Cache Information * - * Slabs are larger than blocks, and are the unit on which the element - * operates. They are designed to be a few times larger than a block, to - * achieve a corresponding reduction in the number of locks and unlocks used - * per block, and similar reduction in the the amount of memory overhead - * required. - */ - -typedef struct Slab { - struct Slab *next; - - /* counts incoming pointers: the preceding slab's 'next' pointer, and pointers - * from any processes operating on the slab */ - gint refcount; - - /* number of this slab in the sequence, global to this element's lifetime. - * Since this counts slabs, which are about 1M, this can address 16 - * yottabytes of data before wrapping. */ - guint64 serial; - - /* slab size; this is only less than the element's slab size if the - * transfer is at EOF. */ - gsize size; - - /* base of the slab_size buffer */ - gpointer base; -} Slab; - -/* - * File Slices + * The cache_inform implementation adds cache information to a linked list of + * these objects, in order. The objects are arranged in a linked list, and + * describe the files in which the part data is stored. Note that we assume + * that slices are added *before* they are needed: the xfer element will fail + * if it tries to rewind and does not find a suitable slice. * - * These objects are arranged in a linked list, and describe the files in which - * the disk cache is stored. Note that we assume that slices are added *before* - * they are needed: the xfer element will fail if it tries to rewind and does not - * find a suitable slice. + * The slices should be "fast forwarded" after every part, so that the first + * byte in part_slices is the first byte of the part; when a retry of a part is + * required, use the iterator methods to properly open the various files and do + * the buffering. */ typedef struct FileSlice { struct FileSlice *next; - /* fully-qualified filename to read from, or NULL to read from disk_cache_read_fd */ + /* fully-qualified filename to read from (or NULL to read from + * disk_cache_read_fd in XferDestTaperCacher) */ char *filename; /* offset in file to start at */ - off_t offset; + guint64 offset; /* length of data to read */ - gsize length; + guint64 length; } FileSlice; /* @@ -102,108 +74,50 @@ typedef struct XferDestTaperSplitter { * constant for the lifetime of the element. */ - /* maximum buffer space to use for streaming; this is unrelated to the - * fallback_splitsize */ - gsize max_memory; - - /* split buffering info; if we're doing memory buffering, use_mem_cache is - * true; if we're doing disk buffering, disk_cache_dirname is non-NULL - * and contains the (allocated) filename of the cache file. Either way, - * part_size gives the desired cache size. If part_size is zero, then - * no splitting takes place (so part_size is effectively infinite) */ - gboolean use_mem_cache; - char *disk_cache_dirname; - guint64 part_size; /* (bytes) */ - - /* - * threads - */ + /* Maximum size of each part (bytes) */ + guint64 part_size; - /* The thread doing the actual writes to tape; this also handles buffering - * for streaming */ - GThread *device_thread; - - /* The thread writing slabs to the disk cache, if any */ - GThread *disk_cache_thread; + /* the device's need for streaming (it's assumed that all subsequent devices + * have the same needs) */ + StreamingRequirement streaming; - /* slab train - * - * All in-memory data is contained in a linked list called the "slab - * train". Various components are operating simultaneously at different - * points in this train. Data from the upstream XferElement is appended to - * the head of the train, and the device thread follows along behind, - * writing data to the device. When caching parts in memory, the slab - * train just grows to eventually contain the whole part. When using an - * on-disk cache, the disk cache thread writes the tail of the train to - * disk, freeing slabs to be re-used at the head of the train. Some - * careful coordination of these components allows them to operate as - * independently as possible within the limits of the user's configuration. - * - * Slabs are rarely, if ever, freed: the oldest_slab reference generally - * ensures that all slabs have refcount > 0, and this pointer is only - * advanced when re-using slabs that have been flushed to the disk cache or - * when freeing slabs after completion of the transfer. */ - - /* pointers into the slab train are all protected by this mutex. Note that - * the slabs themselves can be manipulated without this lock; it's only - * when changing the pointers that the mutex must be held. Furthermore, a - * foo_slab variable which is not NULL will not be changed except by its - * controlling thread (disk_cacher_slab is controlled by disk_cache_thread, - * and device_slab is controlled by device_thread). This means that a - * controlling thread can drop the slab_mutex once it has ensured its slab - * is non-NULL. - * - * Slab_cond is notified when a new slab is made available from the reader. - * Slab_free_cond is notified when a slab becomes available for - * reallocation. - * - * Any thread waiting on either condition variable should also check - * elt->cancelled, and act appropriately if awakened in a cancelled state. - */ - GMutex *slab_mutex; GCond *slab_cond; GCond *slab_free_cond; + /* block size expected by the target device */ + gsize block_size; - /* slabs in progress by each thread, or NULL if the thread is waiting on - * slab_cond. These can only be changed by their respective threads, except - * when they are NULL (in which case the reader will point them to a new - * slab and signal the slab_cond). */ - Slab *volatile disk_cacher_slab; - Slab *volatile mem_cache_slab; - Slab *volatile device_slab; + /* TRUE if this element is expecting slices via cache_inform */ + gboolean expect_cache_inform; - /* tail and head of the slab train */ - Slab *volatile oldest_slab; - Slab *volatile newest_slab; + /* The thread doing the actual writes to tape; this also handles buffering + * for streaming */ + GThread *device_thread; - /* thread-specific information + /* Ring Buffer * - * These values are only used by one thread, and thus are not - * subject to any locking or concurrency constraints. + * This buffer holds MAX_MEMORY bytes of data (rounded up to the next + * blocksize), and serves as the interface between the device_thread and + * the thread calling push_buffer. Ring_length is the total length of the + * buffer in bytes, while ring_count is the number of data bytes currently + * in the buffer. The ring_add_cond is signalled when data is added to the + * buffer, while ring_free_cond is signalled when data is removed. Both + * are governed by ring_mutex, and both are signalled when the transfer is + * cancelled. */ - /* slab in progress by the reader (not in the slab train) */ - Slab *reader_slab; - - /* the serial to be assigned to reader_slab */ - guint64 next_serial; - - /* bytes written to the device in this part */ - guint64 bytes_written; - - /* bytes written to the device in the current slab */ - guint64 slab_bytes_written; + GMutex *ring_mutex; + GCond *ring_add_cond, *ring_free_cond; + gchar *ring_buffer; + gsize ring_length, ring_count; + gsize ring_head, ring_tail; + gboolean ring_head_at_eof; - /* element state + /* Element State * * "state" includes all of the variables below (including device - * parameters). Note that the device_thread reads state values when - * paused is false without locking the mutex. No other thread should - * change state when the element is not paused. + * parameters). Note that the device_thread holdes this mutex for the + * entire duration of writing a part. * - * If there is every any reason to lock both mutexes, acquire this one - * first. - * - * Any thread waiting on this condition variable should also check - * elt->cancelled, and act appropriately if awakened in a cancelled state. + * state_mutex should always be locked before ring_mutex, if both are to be + * held simultaneously. */ GMutex *state_mutex; GCond *state_cond; @@ -213,57 +127,31 @@ typedef struct XferDestTaperSplitter { Device *volatile device; dumpfile_t *volatile part_header; - /* If true, when unpaused, the device should begin at the beginning of the - * cache; if false, it should proceed to the next part. */ - volatile gboolean retry_part; - - /* If true, the previous part was completed successfully; only used for - * assertions */ - volatile gboolean last_part_successful; + /* bytes to read from cached slices before reading from the ring buffer */ + guint64 bytes_to_read_from_slices; /* part number in progress */ volatile guint64 partnum; - /* if true, the main thread should *not* call start_part */ - volatile gboolean no_more_parts; - - /* the first serial in this part, and the serial to stop at */ - volatile guint64 part_first_serial, part_stop_serial; - - /* file slices for the current part */ - FileSlice *volatile part_slices; - - /* read and write file descriptors for the disk cache file, in use by the - * disk_cache_thread. If these are -1, wait on state_cond until they are - * not; once the value is set, it will not change. */ - volatile int disk_cache_read_fd; - volatile int disk_cache_write_fd; - - /* device parameters - * - * Note that these values aren't known until we begin writing to the - * device; if block_size is zero, threads should block on state_cond until - * it is nonzero, at which point all of the dependent fields will have - * their correct values. Note that, since this value never changes after - * it has been set, it is safe to read block_size without acquiring the - * mutext first. */ - - /* this device's need for streaming */ - StreamingRequirement streaming; - - /* block size expected by the target device */ - gsize block_size; - - /* Size of a slab - some multiple of the block size */ - gsize slab_size; - - /* maximum number of slabs allowed, rounded up to the next whole slab. If - * using mem cache, this is the equivalent of part_size bytes; otherwise, - * it is equivalent to max_memory bytes. */ - guint64 max_slabs; - - /* number of slabs in a part */ - guint64 slabs_per_part; + /* status of the last part */ + gboolean last_part_eof; + gboolean last_part_eom; + gboolean last_part_successful; + + /* true if the element is done writing to devices */ + gboolean no_more_parts; + + /* total bytes written in the current part */ + volatile guint64 part_bytes_written; + + /* The list of active slices for the current part. The cache_inform method + * appends to this list. It is safe to read this linked list, beginning at + * the head, *if* you can guarantee that slices will not be fast-forwarded + * in the interim. The finalize method for this class will take care of + * freeing any leftover slices. Take the part_slices mutex while modifying + * the links in this list. */ + FileSlice *part_slices; + GMutex *part_slices_mutex; } XferDestTaperSplitter; static GType xfer_dest_taper_splitter_get_type(void); @@ -296,785 +184,399 @@ _xdt_dbg(const char *fmt, ...) g_debug("XDT thd-%p: %s", g_thread_self(), msg); } -/* - * Slab handling - */ - -/* called with the slab_mutex held, this gets a new slab to write into, with - * refcount 1. It will block if max_memory slabs are already in use, and mem - * caching is not in use, although allocation may be forced with the 'force' - * parameter. - * - * If the memory allocation cannot be satisfied due to system constraints, - * this function will send an XMSG_ERROR, wait for the transfer to cancel, and - * return NULL. If the transfer is cancelled by some other means while this - * function is blocked awaiting a free slab, it will return NULL. +/* "Fast forward" the slice list by the given length. This will free any + * slices that are no longer necessary, and adjust the offset and length of the + * first remaining slice. This assumes the state mutex is locked during its + * operation. * - * @param self: the xfer element - * @param force: allocate a slab even if it would exceed max_memory - * @returns: a new slab, or NULL if the xfer is cancelled + * @param self: element + * @param length: number of bytes to fast forward */ -static Slab * -alloc_slab( - XferDestTaperSplitter *self, - gboolean force) +static void +fast_forward_slices( + XferDestTaperSplitter *self, + guint64 length) { - XferElement *elt = XFER_ELEMENT(self); - Slab *rv; - - DBG(8, "alloc_slab(force=%d)", force); - if (!force) { - /* throttle based on maximum number of extant slabs */ - while (G_UNLIKELY( - !elt->cancelled && - self->oldest_slab && - self->newest_slab && - self->oldest_slab->refcount > 1 && - (self->newest_slab->serial - self->oldest_slab->serial + 1) >= self->max_slabs)) { - DBG(9, "waiting for available slab"); - g_cond_wait(self->slab_free_cond, self->slab_mutex); - } - DBG(9, "done waiting"); - - if (elt->cancelled) - return NULL; - } - - /* if the oldest slab doesn't have anything else pointing to it, just use - * that */ - if (self->oldest_slab && self->oldest_slab->refcount == 1) { - rv = self->oldest_slab; - self->oldest_slab = rv->next; - } else { - rv = g_new0(Slab, 1); - rv->refcount = 1; - rv->base = g_try_malloc(self->slab_size); - if (!rv->base) { - g_free(rv); - xfer_cancel_with_error(XFER_ELEMENT(self), - _("Could not allocate %zu bytes of memory"), self->slab_size); - return NULL; - } - } + FileSlice *slice; - rv->next = NULL; - rv->size = 0; - return rv; -} + /* consume slices until we've eaten the whole part */ + g_mutex_lock(self->part_slices_mutex); + while (length > 0) { + g_assert(self->part_slices); + slice = self->part_slices; -/* called with the slab_mutex held, this frees the given slave entirely. The - * reference count is not consulted. - * - * @param slab: slab to free - */ -static void -free_slab( - Slab *slab) -{ - if (slab) { - if (slab->base) - g_free(slab->base); - g_free(slab); - } -} + if (slice->length <= length) { + length -= slice->length; -/* called with the slab_mutex held, this decrements the refcount of the - * given slab - * - * @param self: xfer element - * @param slab: slab to free - */ -static inline void -unref_slab( - XferDestTaperSplitter *self, - Slab *slab) -{ - g_assert(slab->refcount > 1); - slab->refcount--; - if (G_UNLIKELY(slab->refcount == 1 && slab == self->oldest_slab)) { - g_cond_broadcast(self->slab_free_cond); - } else if (G_UNLIKELY(slab->refcount == 0)) { - free_slab(slab); + self->part_slices = slice->next; + if (slice->filename) + g_free(slice->filename); + g_free(slice); + slice = self->part_slices; + } else { + slice->length -= length; + slice->offset += length; + break; + } } + g_mutex_unlock(self->part_slices_mutex); } -/* called with the slab_mutex held, this sets *slabp to *slabp->next, - * adjusting refcounts appropriately, and returns the new value - * - * @param self: xfer element - * @param slabp: slab pointer to advance - * @returns: new value of *slabp +/* + * Slice Iterator */ -static inline Slab * -next_slab( - XferDestTaperSplitter *self, - Slab * volatile *slabp) -{ - Slab *next; - if (!slabp || !*slabp) - return NULL; +/* A struct for use in iterating over data in the slices */ +typedef struct SliceIterator { + /* current slice */ + FileSlice *slice; - next = (*slabp)->next; - if (next) - next->refcount++; - if (*slabp) - unref_slab(self, *slabp); - *slabp = next; + /* file descriptor of the current file, or -1 if it's not open yet */ + int cur_fd; - return next; -} + /* bytes remaining in this slice */ + guint64 slice_remaining; +} SliceIterator; -/* - * Disk Cache - * - * The disk cache thread's job is simply to follow along the slab train at - * maximum speed, writing slabs to the disk cache file. */ +/* Utility functions for SliceIterator */ -static gboolean -open_disk_cache_fds( - XferDestTaperSplitter *self) +/* Begin iterating over slices, starting at the first byte of the first slice. + * Initializes a pre-allocated SliceIterator. The caller must ensure that + * fast_forward_slices is not called while an iteration is in + * progress. + */ +static void +iterate_slices( + XferDestTaperSplitter *self, + SliceIterator *iter) { - char * filename; - - g_assert(self->disk_cache_read_fd == -1); - g_assert(self->disk_cache_write_fd == -1); - - g_mutex_lock(self->state_mutex); - filename = g_strdup_printf("%s/amanda-split-buffer-XXXXXX", - self->disk_cache_dirname); - - self->disk_cache_write_fd = g_mkstemp(filename); - if (self->disk_cache_write_fd < 0) { - g_mutex_unlock(self->state_mutex); - xfer_cancel_with_error(XFER_ELEMENT(self), - _("Error creating cache file in '%s': %s"), self->disk_cache_dirname, - strerror(errno)); - g_free(filename); - return FALSE; - } - - /* open a separate copy of the file for reading */ - self->disk_cache_read_fd = open(filename, O_RDONLY); - if (self->disk_cache_read_fd < 0) { - g_mutex_unlock(self->state_mutex); - xfer_cancel_with_error(XFER_ELEMENT(self), - _("Error opening cache file in '%s': %s"), self->disk_cache_dirname, - strerror(errno)); - g_free(filename); - return FALSE; - } - - /* signal anyone waiting for this value */ - g_cond_broadcast(self->state_cond); - g_mutex_unlock(self->state_mutex); - - /* errors from unlink are not fatal */ - if (unlink(filename) < 0) { - g_warning("While unlinking '%s': %s (ignored)", filename, strerror(errno)); - } - - g_free(filename); - return TRUE; + iter->cur_fd = -1; + iter->slice_remaining = 0; + g_mutex_lock(self->part_slices_mutex); + iter->slice = self->part_slices; + /* it's safe to unlock this because, at worst, a new entry will + * be appended while the iterator is in progress */ + g_mutex_unlock(self->part_slices_mutex); } + +/* Get a block of data from the iterator, returning a pointer to a buffer + * containing the data; the buffer remains the property of the iterator. + * Returns NULL on error, after calling xfer_cancel_with_error with an + * appropriate error message. This function does not block, so it does not + * check for cancellation. + */ static gpointer -disk_cache_thread( - gpointer data) +iterator_get_block( + XferDestTaperSplitter *self, + SliceIterator *iter, + gpointer buf, + gsize bytes_needed) { - XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data); + gsize buf_offset = 0; XferElement *elt = XFER_ELEMENT(self); - DBG(1, "(this is the disk cache thread)"); - - /* open up the disk cache file first */ - if (!open_disk_cache_fds(self)) - return NULL; - - while (!elt->cancelled) { - gboolean eof, eop; - guint64 stop_serial; - Slab *slab; + g_assert(iter != NULL); + g_assert(buf != NULL); - /* rewind to the begining of the disk cache file */ - if (lseek(self->disk_cache_write_fd, 0, SEEK_SET) == -1) { - xfer_cancel_with_error(XFER_ELEMENT(self), - _("Error seeking disk cache file in '%s': %s"), self->disk_cache_dirname, - strerror(errno)); - return NULL; - } - - /* we need to sit and wait for the next part to begin, first making sure - * we have a slab .. */ - g_mutex_lock(self->slab_mutex); - while (!self->disk_cacher_slab && !elt->cancelled) { - DBG(9, "waiting for a disk slab"); - g_cond_wait(self->slab_cond, self->slab_mutex); - } - DBG(9, "done waiting"); - g_mutex_unlock(self->slab_mutex); + while (bytes_needed > 0) { + gsize read_size; + int bytes_read; - if (elt->cancelled) - break; + if (iter->cur_fd < 0) { + guint64 offset; - /* this slab is now fixed until this thread changes it */ - g_assert(self->disk_cacher_slab != NULL); - - /* and then making sure we're ready to write that slab. */ - g_mutex_lock(self->state_mutex); - while ((self->paused || - (self->disk_cacher_slab && self->disk_cacher_slab->serial > self->part_first_serial)) - && !elt->cancelled) { - DBG(9, "waiting for the disk slab to become current and un-paused"); - g_cond_wait(self->state_cond, self->state_mutex); - } - DBG(9, "done waiting"); + g_assert(iter->slice != NULL); + g_assert(iter->slice->filename != NULL); - stop_serial = self->part_stop_serial; - g_mutex_unlock(self->state_mutex); + iter->cur_fd = open(iter->slice->filename, O_RDONLY, 0); + if (iter->cur_fd < 0) { + xfer_cancel_with_error(elt, + _("Could not open '%s' for reading: %s"), + iter->slice->filename, strerror(errno)); + return NULL; + } - if (elt->cancelled) - break; + iter->slice_remaining = iter->slice->length; + offset = iter->slice->offset; - g_mutex_lock(self->slab_mutex); - slab = self->disk_cacher_slab; - eop = eof = FALSE; - while (!eop && !eof) { - /* if we're at the head of the slab train, wait for more data */ - while (!self->disk_cacher_slab && !elt->cancelled) { - DBG(9, "waiting for the next disk slab"); - g_cond_wait(self->slab_cond, self->slab_mutex); + if (lseek(iter->cur_fd, offset, SEEK_SET) == -1) { + xfer_cancel_with_error(elt, + _("Could not seek '%s' for reading: %s"), + iter->slice->filename, strerror(errno)); + return NULL; } - DBG(9, "done waiting"); + } - if (elt->cancelled) - break; + read_size = MIN(iter->slice_remaining, bytes_needed); + bytes_read = full_read(iter->cur_fd, + buf + buf_offset, + read_size); + if (bytes_read < 0 || (gsize)bytes_read < read_size) { + xfer_cancel_with_error(elt, + _("Error reading '%s': %s"), + iter->slice->filename, + errno? strerror(errno) : _("Unexpected EOF")); + return NULL; + } - /* drop the lock long enough to write the slab; the refcount - * protects the slab during this time */ - slab = self->disk_cacher_slab; - g_mutex_unlock(self->slab_mutex); + iter->slice_remaining -= bytes_read; + buf_offset += bytes_read; + bytes_needed -= bytes_read; - if (full_write(self->disk_cache_write_fd, slab->base, slab->size) < slab->size) { - xfer_cancel_with_error(XFER_ELEMENT(self), - _("Error writing to disk cache file in '%s': %s"), self->disk_cache_dirname, - strerror(errno)); + if (iter->slice_remaining <= 0) { + if (close(iter->cur_fd) < 0) { + xfer_cancel_with_error(elt, + _("Could not close fd %d: %s"), + iter->cur_fd, strerror(errno)); return NULL; } + iter->cur_fd = -1; - eof = slab->size < self->slab_size; - eop = (slab->serial + 1 == stop_serial); + iter->slice = iter->slice->next; - g_mutex_lock(self->slab_mutex); - next_slab(self, &self->disk_cacher_slab); - } - g_mutex_unlock(self->slab_mutex); - - if (eof) { - /* this very thread should have just set this value to NULL, and since it's - * EOF, there should not be any 'next' slab */ - g_assert(self->disk_cacher_slab == NULL); - break; + if (elt->cancelled) + return NULL; } } - return NULL; + return buf; +} + + +/* Free the iterator's resources */ +static void +iterator_free( + SliceIterator *iter) +{ + if (iter->cur_fd >= 0) + close(iter->cur_fd); } /* * Device Thread - * - * The device thread's job is to write slabs to self->device, applying whatever - * streaming algorithms are required. It does this by alternately getting the - * next slab from a "slab source" and writing that slab to the device. Most of - * the slab source functions assume that self->slab_mutex is held, but may - * release the mutex (either explicitly or via a g_cond_wait), so it is not - * valid to assume that any slab pointers remain unchanged after a slab_source - * function invication. */ -/* This struct tracks the current state of the slab source */ -typedef struct slab_source_state { - /* temporary slab used for reading from disk */ - Slab *tmp_slab; - - /* current source slice */ - FileSlice *slice; - - /* open fd in current slice, or -1 */ - int slice_fd; - - /* next serial to read from disk */ - guint64 next_serial; - - /* bytes remaining in this slice */ - gsize slice_remaining; -} slab_source_state; - -/* Called with the slab_mutex held, this function pre-buffers enough data into the slab - * train to meet the device's streaming needs. */ -static gboolean -slab_source_prebuffer( +/* Wait for at least one block, or EOF, to be available in the ring buffer. + * Called with the ring mutex held. */ +static gsize +device_thread_wait_for_block( XferDestTaperSplitter *self) { XferElement *elt = XFER_ELEMENT(self); - guint64 prebuffer_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size; - guint64 i; - Slab *slab; - - /* always prebuffer at least one slab, even if max_memory is 0 */ - if (prebuffer_slabs == 0) prebuffer_slabs = 1; - - /* pre-buffering is not necessary if we're reading from a disk cache */ - if (self->retry_part && self->part_slices) - return TRUE; - - /* pre-buffering means waiting until we have at least prebuffer_slabs in the - * slab train ahead of the device_slab, or the newest slab is at EOF. */ - while (!elt->cancelled) { - gboolean eof_or_eop = FALSE; - - /* see if there's enough data yet */ - for (i = 0, slab = self->device_slab; - i < prebuffer_slabs && slab != NULL; - i++, slab = slab->next) { - eof_or_eop = (slab->size < self->slab_size) - || (slab->serial + 1 == self->part_stop_serial); - } - if (i == prebuffer_slabs || eof_or_eop) - break; + gsize bytes_needed = self->device->block_size; + gsize usable; - DBG(9, "prebuffering wait"); - g_cond_wait(self->slab_cond, self->slab_mutex); - } - DBG(9, "done waiting"); + /* for any kind of streaming, we need to fill the entire buffer before the + * first byte */ + if (self->part_bytes_written == 0 && self->streaming != STREAMING_REQUIREMENT_NONE) + bytes_needed = self->ring_length; - if (elt->cancelled) { - self->last_part_successful = FALSE; - self->no_more_parts = TRUE; - return FALSE; - } - - return TRUE; -} - -/* Called without the slab_mutex held, this function sets up a new slab_source_state - * object based on the configuratino of the Xfer Element. */ -static inline gboolean -slab_source_setup( - XferDestTaperSplitter *self, - slab_source_state *state) -{ - state->tmp_slab = NULL; - state->slice_fd = -1; - state->slice = NULL; - state->slice_remaining = 0; - state->next_serial = G_MAXUINT64; - - /* if we're to retry the part, rewind to the beginning */ - if (self->retry_part) { - if (self->use_mem_cache) { - /* rewind device_slab to point to the mem_cache_slab */ - g_mutex_lock(self->slab_mutex); - if (self->device_slab) - unref_slab(self, self->device_slab); - self->device_slab = self->mem_cache_slab; - if(self->device_slab != NULL) - self->device_slab->refcount++; - g_mutex_unlock(self->slab_mutex); - } else { - g_assert(self->part_slices); - - g_mutex_lock(self->slab_mutex); - - /* we're going to read from the disk cache until we get to the oldest useful - * slab in memory, so it had best exist */ - g_assert(self->oldest_slab != NULL); - - /* point device_slab at the oldest slab we have */ - self->oldest_slab->refcount++; - if (self->device_slab) - unref_slab(self, self->device_slab); - self->device_slab = self->oldest_slab; - - /* and increment it until it is at least the slab we want to start from */ - while (self->device_slab->serial < self->part_first_serial) { - next_slab(self, &self->device_slab); - } + while (1) { + /* are we ready? */ + if (elt->cancelled) + break; - /* get a new, temporary slab for use while reading */ - state->tmp_slab = alloc_slab(self, TRUE); + if (self->ring_count >= bytes_needed) + break; - g_mutex_unlock(self->slab_mutex); + if (self->ring_head_at_eof) + break; - if (!state->tmp_slab) { - /* if we couldn't allocate a slab, then we're cancelled, so we're done with - * this part. */ - self->last_part_successful = FALSE; - self->no_more_parts = TRUE; - return FALSE; - } + /* nope - so wait */ + g_cond_wait(self->ring_add_cond, self->ring_mutex); - state->tmp_slab->size = self->slab_size; - state->slice = self->part_slices; - state->next_serial = self->part_first_serial; - } + /* in STREAMING_REQUIREMENT_REQUIRED, once we decide to wait for more bytes, + * we need to wait for the entire buffer to fill */ + if (self->streaming == STREAMING_REQUIREMENT_REQUIRED) + bytes_needed = self->ring_length; } - /* if the streaming mode requires it, pre-buffer */ - if (self->streaming == STREAMING_REQUIREMENT_DESIRED || - self->streaming == STREAMING_REQUIREMENT_REQUIRED) { - gboolean prebuffer_ok; - - g_mutex_lock(self->slab_mutex); - prebuffer_ok = slab_source_prebuffer(self); - g_mutex_unlock(self->slab_mutex); - if (!prebuffer_ok) - return FALSE; - } + usable = MIN(self->ring_count, bytes_needed); + if (self->part_size) + usable = MIN(usable, self->part_size - self->part_bytes_written); - return TRUE; + return usable; } -/* Called with the slab_mutex held, this does the work of slab_source_get when - * reading from the disk cache. Note that this explicitly releases the - * slab_mutex during execution - do not depend on any protected values across a - * call to this function. The mutex is held on return. */ -static Slab * -slab_source_get_from_disk( +/* Mark WRITTEN bytes as free in the ring buffer. Called with the ring mutex + * held. */ +static void +device_thread_consume_block( XferDestTaperSplitter *self, - slab_source_state *state, - guint64 serial) + gsize written) +{ + self->ring_count -= written; + self->ring_tail += written; + if (self->ring_tail >= self->ring_length) + self->ring_tail -= self->ring_length; + g_cond_broadcast(self->ring_free_cond); +} + +/* Write an entire part. Called with the state_mutex held */ +static XMsg * +device_thread_write_part( + XferDestTaperSplitter *self) { + GTimer *timer = g_timer_new(); XferElement *elt = XFER_ELEMENT(self); - gsize bytes_needed = self->slab_size; - gsize slab_offset = 0; - /* NOTE: slab_mutex is held, but we don't need it here, so release it for the moment */ - g_mutex_unlock(self->slab_mutex); + enum { PART_EOF, PART_LEOM, PART_EOP, PART_FAILED } part_status = PART_FAILED; + int fileno = 0; + XMsg *msg; - g_assert(state->next_serial == serial); + self->part_bytes_written = 0; - while (bytes_needed > 0) { - gsize read_size, bytes_read; - - if (state->slice_fd < 0) { - g_assert(state->slice); - if (state->slice->filename) { - /* regular cache_inform file - just open it */ - state->slice_fd = open(state->slice->filename, O_RDONLY, 0); - if (state->slice_fd < 0) { - xfer_cancel_with_error(XFER_ELEMENT(self), - _("Could not open '%s' for reading: %s"), - state->slice->filename, strerror(errno)); - goto fatal_error; - } - } else { - /* wait for the disk_cache_thread to open the disk_cache_read_fd, and then copy it */ - g_mutex_lock(self->state_mutex); - while (self->disk_cache_read_fd == -1 && !elt->cancelled) { - DBG(9, "waiting for disk_cache_thread to start up"); - g_cond_wait(self->state_cond, self->state_mutex); - } - DBG(9, "done waiting"); - state->slice_fd = self->disk_cache_read_fd; - g_mutex_unlock(self->state_mutex); - } + g_timer_start(timer); - if (lseek(state->slice_fd, state->slice->offset, SEEK_SET) == -1) { - xfer_cancel_with_error(XFER_ELEMENT(self), - _("Could not seek '%s' for reading: %s"), - state->slice->filename? state->slice->filename : "(cache file)", - strerror(errno)); - goto fatal_error; - } + /* write the header; if this fails or hits LEOM, we consider this a + * successful 0-byte part */ + if (!device_start_file(self->device, self->part_header) || self->device->is_eom) { + part_status = PART_LEOM; + goto part_done; + } - state->slice_remaining = state->slice->length; - } + fileno = self->device->file; + g_assert(fileno > 0); - read_size = MIN(state->slice_remaining, bytes_needed); - bytes_read = full_read(state->slice_fd, - state->tmp_slab->base + slab_offset, - read_size); - if (bytes_read < read_size) { - xfer_cancel_with_error(XFER_ELEMENT(self), - _("Error reading '%s': %s"), - state->slice->filename? state->slice->filename : "(cache file)", - errno? strerror(errno) : _("Unexpected EOF")); - goto fatal_error; - } + /* free the header, now that it's written */ + dumpfile_free(self->part_header); + self->part_header = NULL; - state->slice_remaining -= bytes_read; - if (state->slice_remaining == 0) { - if (close(state->slice_fd) < 0) { - xfer_cancel_with_error(XFER_ELEMENT(self), - _("Could not close fd %d: %s"), - state->slice_fd, strerror(errno)); - goto fatal_error; - } - state->slice_fd = -1; - state->slice = state->slice->next; - } + /* First, read the requisite number of bytes from the part_slices, if the part was + * unsuccessful. */ + if (self->bytes_to_read_from_slices) { + SliceIterator iter; + gsize to_write = self->block_size; + gpointer buf = g_malloc(to_write); + gboolean successful = TRUE; + guint64 bytes_from_slices = self->bytes_to_read_from_slices; - bytes_needed -= bytes_read; - slab_offset += bytes_read; - } + DBG(5, "reading %ju bytes from slices", (uintmax_t)bytes_from_slices); - state->tmp_slab->serial = state->next_serial++; + iterate_slices(self, &iter); + while (bytes_from_slices) { + gboolean ok; - g_mutex_lock(self->slab_mutex); - return state->tmp_slab; + if (!iterator_get_block(self, &iter, buf, to_write)) { + part_status = PART_FAILED; + successful = FALSE; + break; + } -fatal_error: - g_mutex_lock(self->slab_mutex); + /* note that it's OK to reference these ring_* vars here, as they + * are static at this point */ + ok = device_write_block(self->device, (guint)to_write, buf); - self->last_part_successful = FALSE; - self->no_more_parts = TRUE; - return NULL; -} + if (!ok) { + part_status = PART_FAILED; + successful = FALSE; + break; + } -/* Called with the slab_mutex held, this function gets the slab with the given - * serial number, waiting if necessary for that slab to be available. Note - * that the slab_mutex may be released during execution, although it is always - * held on return. */ -static inline Slab * -slab_source_get( - XferDestTaperSplitter *self, - slab_source_state *state, - guint64 serial) -{ - XferElement *elt = (XferElement *)self; - - /* device_slab is only NULL if we're following the slab train, so wait for - * a new slab */ - if (!self->device_slab) { - /* if the streaming mode requires it, pre-buffer */ - if (self->streaming == STREAMING_REQUIREMENT_DESIRED) { - if (!slab_source_prebuffer(self)) - return NULL; + self->part_bytes_written += to_write; + bytes_from_slices -= to_write; - /* fall through to make sure we have a device_slab; - * slab_source_prebuffer doesn't guarantee device_slab != NULL */ + if (self->part_size && self->part_bytes_written >= self->part_size) { + part_status = PART_EOP; + successful = FALSE; + break; + } else if (self->device->is_eom) { + part_status = PART_LEOM; + successful = FALSE; + break; + } } - while (self->device_slab == NULL && !elt->cancelled) { - DBG(9, "waiting for the next slab"); - g_cond_wait(self->slab_cond, self->slab_mutex); - } - DBG(9, "done waiting"); + iterator_free(&iter); + g_free(buf); - if (elt->cancelled) - goto fatal_error; + /* if we didn't finish, get out of here now */ + if (!successful) + goto part_done; } - /* device slab is now set, and only this thread can change it */ - g_assert(self->device_slab); - - /* if the next item in the device slab is the one we want, then the job is - * pretty easy */ - if (G_LIKELY(serial == self->device_slab->serial)) - return self->device_slab; - - /* otherwise, we're reading from disk */ - g_assert(serial < self->device_slab->serial); - return slab_source_get_from_disk(self, state, serial); + g_mutex_lock(self->ring_mutex); + while (1) { + gsize to_write; + gboolean ok; -fatal_error: - self->last_part_successful = FALSE; - self->no_more_parts = TRUE; - return NULL; -} + /* wait for at least one block, and (if necessary) prebuffer */ + to_write = device_thread_wait_for_block(self); + to_write = MIN(to_write, self->device->block_size); + if (elt->cancelled) + break; -/* Called without the slab_mutex held, this frees any resources assigned - * to the slab source state */ -static inline void -slab_source_free( - XferDestTaperSplitter *self, - slab_source_state *state) -{ - if (state->slice_fd != -1) - close(state->slice_fd); + if (to_write == 0) { + part_status = PART_EOF; + break; + } - if (state->tmp_slab) { - g_mutex_lock(self->slab_mutex); - free_slab(state->tmp_slab); - g_mutex_unlock(self->slab_mutex); - } -} + g_mutex_unlock(self->ring_mutex); + DBG(8, "writing %ju bytes to device", (uintmax_t)to_write); -/* Called without the slab_mutex, this writes the given slab to the device */ -static gboolean -write_slab_to_device( - XferDestTaperSplitter *self, - Slab *slab) -{ - XferElement *elt = XFER_ELEMENT(self); - gpointer buf = slab->base; - gsize remaining = slab->size; + /* note that it's OK to reference these ring_* vars here, as they + * are static at this point */ + ok = device_write_block(self->device, (guint)to_write, + self->ring_buffer + self->ring_tail); + g_mutex_lock(self->ring_mutex); - while (remaining && !elt->cancelled) { - gsize write_size = MIN(self->block_size, remaining); - gboolean ok; - ok = device_write_block(self->device, write_size, buf); if (!ok) { - self->bytes_written += slab->size - remaining; - - /* TODO: handle an error without is_eom - * differently/fatally? or at least with a warning? */ - self->last_part_successful = FALSE; - self->no_more_parts = FALSE; - return FALSE; + part_status = PART_FAILED; + break; } - buf += write_size; - self->slab_bytes_written += write_size; - remaining -= write_size; - } + self->part_bytes_written += to_write; + device_thread_consume_block(self, to_write); - if (elt->cancelled) { - self->last_part_successful = FALSE; - self->no_more_parts = TRUE; - return FALSE; + if (self->part_size && self->part_bytes_written >= self->part_size) { + part_status = PART_EOP; + break; + } else if (self->device->is_eom) { + part_status = PART_LEOM; + break; + } } + g_mutex_unlock(self->ring_mutex); +part_done: - self->bytes_written += slab->size; - self->slab_bytes_written = 0; - return TRUE; -} - -static XMsg * -device_thread_write_part( - XferDestTaperSplitter *self) -{ - GTimer *timer = g_timer_new(); - XMsg *msg; - slab_source_state src_state; - guint64 serial, stop_serial; - gboolean eof = FALSE; - int fileno = 0; - - self->last_part_successful = FALSE; - self->bytes_written = 0; - - if (!device_start_file(self->device, self->part_header)) - goto part_done; - - dumpfile_free(self->part_header); - self->part_header = NULL; - - fileno = self->device->file; - g_assert(fileno > 0); - - if (!slab_source_setup(self, &src_state)) - goto part_done; - - g_timer_start(timer); - - stop_serial = self->part_stop_serial; - g_mutex_lock(self->slab_mutex); - for (serial = self->part_first_serial; serial < stop_serial && !eof; serial++) { - Slab *slab = slab_source_get(self, &src_state, serial); - DBG(8, "writing slab %p (serial %ju) to device", slab, serial); - g_mutex_unlock(self->slab_mutex); - if (!slab) - goto part_done; - - eof = slab->size < self->slab_size; - - if (!write_slab_to_device(self, slab)) - goto part_done; - - g_mutex_lock(self->slab_mutex); - DBG(8, "wrote slab %p to device", slab); - - /* if we're reading from the slab train, advance self->device_slab. */ - if (slab == self->device_slab) { - next_slab(self, &self->device_slab); - } + if (elt->cancelled) { + g_timer_destroy(timer); + return NULL; } - g_mutex_unlock(self->slab_mutex); /* if we write all of the blocks, but the finish_file fails, then likely * there was some buffering going on in the device driver, and the blocks - * did not all make it to permanent storage -- so it's a failed part. */ - if (!device_finish_file(self->device)) - goto part_done; - - slab_source_free(self, &src_state); - - self->last_part_successful = TRUE; - self->no_more_parts = eof; + * did not all make it to permanent storage -- so it's a failed part. Note + * that we try to finish_file even if the part failed, just to be thorough. */ + if (self->device->in_file) { + if (!device_finish_file(self->device)) + part_status = PART_FAILED; + } -part_done: g_timer_stop(timer); msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0); - msg->size = self->bytes_written; + msg->size = self->part_bytes_written; msg->duration = g_timer_elapsed(timer, NULL); msg->partnum = self->partnum; msg->fileno = fileno; - msg->successful = self->last_part_successful; - msg->eom = !self->last_part_successful; - msg->eof = self->no_more_parts; + msg->successful = self->last_part_successful = part_status != PART_FAILED; + msg->eom = self->last_part_eom = (part_status == PART_LEOM || !msg->successful); + msg->eof = self->last_part_eof = part_status == PART_EOF; - if (self->last_part_successful) + /* time runs backward on some test boxes, so make sure this is positive */ + if (msg->duration < 0) msg->duration = 0; + + if (msg->successful) self->partnum++; + self->no_more_parts = msg->eof || (!msg->successful && !self->expect_cache_inform); g_timer_destroy(timer); return msg; } -/* Called with the status_mutex held, this frees any cached data for - * a successful part */ -static void -release_part_cache( - XferDestTaperSplitter *self) -{ - if (self->use_mem_cache && self->mem_cache_slab) { - /* move up the mem_cache_slab to point to the first slab in - * the next part (probably NULL at this point), so that the - * reader can continue reading data into the new mem cache - * immediately. */ - g_mutex_lock(self->slab_mutex); - unref_slab(self, self->mem_cache_slab); - self->mem_cache_slab = self->device_slab; - if (self->mem_cache_slab) - self->mem_cache_slab->refcount++; - g_mutex_unlock(self->slab_mutex); - } - - /* the disk_cache_thread takes care of freeing its cache */ - else if (self->disk_cache_dirname) - return; - - /* if we have part_slices, fast-forward them. Note that we should have a - * full part's worth of slices by now. */ - else if (self->part_slices) { - guint64 bytes_remaining = self->slabs_per_part * self->slab_size; - FileSlice *slice = self->part_slices; - - /* consume slices until we've eaten the whole part */ - while (bytes_remaining > 0) { - if (slice == NULL) - g_critical("Not all data in part was represented to cache_inform"); - - if (slice->length <= bytes_remaining) { - bytes_remaining -= slice->length; - - self->part_slices = slice->next; - g_free(slice->filename); - g_free(slice); - slice = self->part_slices; - } else { - slice->length -= bytes_remaining; - slice->offset += bytes_remaining; - break; - } - } - } -} - static gpointer device_thread( gpointer data) @@ -1085,15 +587,6 @@ device_thread( DBG(1, "(this is the device thread)"); - if (self->disk_cache_dirname) { - GError *error = NULL; - self->disk_cache_thread = g_thread_create(disk_cache_thread, (gpointer)self, TRUE, &error); - if (!self->disk_cache_thread) { - g_critical(_("Error creating new thread: %s (%s)"), - error->message, errno? strerror(errno) : _("no error code")); - } - } - /* This is the outer loop, that loops once for each split part written to * tape. */ g_mutex_lock(self->state_mutex); @@ -1109,33 +602,29 @@ device_thread( if (elt->cancelled) break; - g_mutex_unlock(self->state_mutex); - self->slab_bytes_written = 0; DBG(2, "beginning to write part"); msg = device_thread_write_part(self); DBG(2, "done writing part"); - g_mutex_lock(self->state_mutex); - /* release any cache of a successful part, but don't bother at EOF */ - if (msg->successful && !msg->eof) - release_part_cache(self); + if (!msg) /* cancelled */ + break; + + /* release the slices for this part, if there were any slices */ + if (msg->successful && self->expect_cache_inform) { + fast_forward_slices(self, msg->size); + } xfer_queue_message(elt->xfer, msg); + /* pause ourselves and await instructions from the main thread */ + self->paused = TRUE; + /* if this is the last part, we're done with the part loop */ if (self->no_more_parts) break; - - /* pause ourselves and await instructions from the main thread */ - self->paused = TRUE; } - g_mutex_unlock(self->state_mutex); - /* make sure the other thread is done before we send XMSG_DONE */ - if (self->disk_cache_thread) - g_thread_join(self->disk_cache_thread); - /* tell the main thread we're done */ xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0)); @@ -1146,49 +635,6 @@ device_thread( * Class mechanics */ -/* called with the slab_mutex held, this adds the reader_slab to the head of - * the slab train and signals the condition variable. */ -static void -add_reader_slab_to_train( - XferDestTaperSplitter *self) -{ - Slab *slab = self->reader_slab; - - DBG(3, "adding slab of new data to the slab train"); - - if (self->newest_slab) { - self->newest_slab->next = slab; - slab->refcount++; - - self->newest_slab->refcount--; - } - - self->newest_slab = slab; /* steal reader_slab's ref */ - self->reader_slab = NULL; - - /* steal reader_slab's reference for newest_slab */ - - /* if any of the other pointers are waiting for this slab, update them */ - if (self->disk_cache_dirname && !self->disk_cacher_slab) { - self->disk_cacher_slab = slab; - slab->refcount++; - } - if (self->use_mem_cache && !self->mem_cache_slab) { - self->mem_cache_slab = slab; - slab->refcount++; - } - if (!self->device_slab) { - self->device_slab = slab; - slab->refcount++; - } - if (!self->oldest_slab) { - self->oldest_slab = slab; - slab->refcount++; - } - - g_cond_broadcast(self->slab_cond); -} - static void push_buffer_impl( XferElement *elt, @@ -1196,7 +642,7 @@ push_buffer_impl( size_t size) { XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt; - gpointer p; + gchar *p = buf; DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size); @@ -1207,66 +653,52 @@ push_buffer_impl( /* handle EOF */ if (G_UNLIKELY(buf == NULL)) { - /* send off the last, probably partial slab */ - g_mutex_lock(self->slab_mutex); - - /* create a new, empty slab if necessary */ - if (!self->reader_slab) { - self->reader_slab = alloc_slab(self, FALSE); - if (!self->reader_slab) { - /* we've been cancelled while waiting for a slab */ - g_mutex_unlock(self->slab_mutex); - - /* wait for the xfer to cancel, so we don't get another buffer - * pushed to us (and do so *without* the mutex held) */ - wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer); - - goto free_and_finish; - } - self->reader_slab->serial = self->next_serial++; - } - - add_reader_slab_to_train(self); - g_mutex_unlock(self->slab_mutex); - + /* indicate EOF to the device thread */ + g_mutex_lock(self->ring_mutex); + self->ring_head_at_eof = TRUE; + g_cond_broadcast(self->ring_add_cond); + g_mutex_unlock(self->ring_mutex); goto free_and_finish; } - p = buf; - while (1) { - gsize copy_size; - - /* get a fresh slab, if needed */ - if (G_UNLIKELY(!self->reader_slab) || self->reader_slab->size == self->slab_size) { - g_mutex_lock(self->slab_mutex); - if (self->reader_slab) - add_reader_slab_to_train(self); - self->reader_slab = alloc_slab(self, FALSE); - if (!self->reader_slab) { - /* we've been cancelled while waiting for a slab */ - g_mutex_unlock(self->slab_mutex); - - /* wait for the xfer to cancel, so we don't get another buffer - * pushed to us (and do so *without* the mutex held) */ - wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer); - - goto free_and_finish; - } - self->reader_slab->serial = self->next_serial++; - g_mutex_unlock(self->slab_mutex); - } + /* push the block into the ring buffer, in pieces if necessary */ + g_mutex_lock(self->ring_mutex); + while (size > 0) { + gsize avail; - if (size == 0) - break; - - copy_size = MIN(self->slab_size - self->reader_slab->size, size); - memcpy(self->reader_slab->base+self->reader_slab->size, p, copy_size); + /* wait for some space */ + while (self->ring_count == self->ring_length && !elt->cancelled) { + DBG(9, "waiting for any space to buffer pushed data"); + g_cond_wait(self->ring_free_cond, self->ring_mutex); + } + DBG(9, "done waiting"); - self->reader_slab->size += copy_size; - p += copy_size; - size -= copy_size; + if (elt->cancelled) + goto unlock_and_free_and_finish; + + /* only copy to the end of the buffer, if the available space wraps + * around to the beginning */ + avail = MIN(size, self->ring_length - self->ring_count); + avail = MIN(avail, self->ring_length - self->ring_head); + + /* copy AVAIL bytes into the ring buf (knowing it's contiguous) */ + memmove(self->ring_buffer + self->ring_head, p, avail); + + /* reset the ring variables to represent this state */ + self->ring_count += avail; + self->ring_head += avail; /* will, at most, hit ring_length */ + if (self->ring_head == self->ring_length) + self->ring_head = 0; + p = (gpointer)((guchar *)p + avail); + size -= avail; + + /* and give the device thread a notice that data is ready */ + g_cond_broadcast(self->ring_add_cond); } +unlock_and_free_and_finish: + g_mutex_unlock(self->ring_mutex); + free_and_finish: if (buf) g_free(buf); @@ -1309,27 +741,47 @@ cancel_impl( g_cond_broadcast(self->state_cond); g_mutex_unlock(self->state_mutex); - g_mutex_lock(self->slab_mutex); - g_cond_broadcast(self->slab_cond); - g_cond_broadcast(self->slab_free_cond); - g_mutex_unlock(self->slab_mutex); + g_mutex_lock(self->ring_mutex); + g_cond_broadcast(self->ring_add_cond); + g_cond_broadcast(self->ring_free_cond); + g_mutex_unlock(self->ring_mutex); return rv; } static void start_part_impl( - XferDestTaper *xdtself, + XferDestTaper *xdt, gboolean retry_part, dumpfile_t *header) { - XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself); + XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt); g_assert(self->device != NULL); g_assert(!self->device->in_file); g_assert(header != NULL); - DBG(1, "start_part(retry_part=%d)", retry_part); + DBG(1, "start_part()"); + + /* we can only retry the part if we're getting slices via cache_inform's */ + if (retry_part) { + if (self->last_part_successful) { + xfer_cancel_with_error(XFER_ELEMENT(self), + _("Previous part did not fail; cannot retry")); + return; + } + + if (!self->expect_cache_inform) { + xfer_cancel_with_error(XFER_ELEMENT(self), + _("No cache for previous failed part; cannot retry")); + return; + } + + self->bytes_to_read_from_slices = self->part_bytes_written; + } else { + /* don't read any bytes from the slices, since we're not retrying */ + self->bytes_to_read_from_slices = 0; + } g_mutex_lock(self->state_mutex); g_assert(self->paused); @@ -1339,27 +791,6 @@ start_part_impl( dumpfile_free(self->part_header); self->part_header = dumpfile_copy(header); - if (retry_part) { - if (!self->use_mem_cache && !self->part_slices) { - g_mutex_unlock(self->state_mutex); - xfer_cancel_with_error(XFER_ELEMENT(self), - _("Failed part was not cached; cannot retry")); - return; - } - g_assert(!self->last_part_successful); - self->retry_part = TRUE; - } else { - g_assert(self->last_part_successful); - self->retry_part = FALSE; - self->part_first_serial = self->part_stop_serial; - if (self->part_size != 0) { - self->part_stop_serial = self->part_first_serial + self->slabs_per_part; - } else { - /* set part_stop_serial to an effectively infinite value */ - self->part_stop_serial = G_MAXUINT64; - } - } - DBG(1, "unpausing"); self->paused = FALSE; g_cond_broadcast(self->state_cond); @@ -1373,8 +804,11 @@ use_device_impl( Device *device) { XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself); + StreamingRequirement newstreaming; GValue val; + DBG(1, "use_device(%s)%s", device->device_name, (device == self->device)? " (no change)":""); + /* short-circuit if nothing is changing */ if (self->device == device) return; @@ -1390,9 +824,11 @@ use_device_impl( if (!device_property_get(self->device, PROPERTY_STREAMING, &val) || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) { g_warning("Couldn't get streaming type for %s", self->device->device_name); - self->streaming = STREAMING_REQUIREMENT_REQUIRED; } else { - self->streaming = g_value_get_enum(&val); + newstreaming = g_value_get_enum(&val); + if (newstreaming != self->streaming) + g_warning("New device has different streaming requirements from the original; " + "ignoring new requirement"); } g_value_unset(&val); @@ -1408,41 +844,27 @@ use_device_impl( static void cache_inform_impl( - XferDestTaper *xdtself, + XferDestTaper *xdt, const char *filename, off_t offset, off_t length) { - XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself); - FileSlice *slice, *iter; - - DBG(1, "cache_inform(\"%s\", %jd, %jd)", filename, (intmax_t)offset, (intmax_t)length); + XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt); + FileSlice *slice = g_new(FileSlice, 1), *iter; - /* do we even need this info? */ - if (self->disk_cache_dirname || self->use_mem_cache || self->part_size == 0) - return; - - /* handle the (admittedly unlikely) event that length is larger than gsize. - * Hopefully if sizeof(off_t) = sizeof(gsize), this will get optimized out */ - while (sizeof(off_t) > sizeof(gsize) && length > (off_t)SIZE_MAX) { - cache_inform_impl(xdtself, filename, offset, (off_t)SIZE_MAX); - offset += (off_t)SIZE_MAX; - length -= (off_t)SIZE_MAX; - } - - slice = g_new0(FileSlice, 1); + slice->next = NULL; slice->filename = g_strdup(filename); slice->offset = offset; - slice->length = (gsize)length; + slice->length = length; - g_mutex_lock(self->state_mutex); + g_mutex_lock(self->part_slices_mutex); if (self->part_slices) { for (iter = self->part_slices; iter->next; iter = iter->next) {} iter->next = slice; } else { self->part_slices = slice; } - g_mutex_unlock(self->state_mutex); + g_mutex_unlock(self->part_slices_mutex); } static guint64 @@ -1450,7 +872,11 @@ get_part_bytes_written_impl( XferDestTaper *xdtself) { XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself); - return self->bytes_written + self->slab_bytes_written; + + /* NOTE: this access is unsafe and may return inconsistent results (e.g, a + * partial write to the 64-bit value on a 32-bit system). This is ok for + * the moment, as it's only informational, but be warned. */ + return self->part_bytes_written; } static void @@ -1462,15 +888,17 @@ instance_init( self->state_mutex = g_mutex_new(); self->state_cond = g_cond_new(); - self->slab_mutex = g_mutex_new(); - self->slab_cond = g_cond_new(); - self->slab_free_cond = g_cond_new(); + self->ring_mutex = g_mutex_new(); + self->ring_add_cond = g_cond_new(); + self->ring_free_cond = g_cond_new(); + self->part_slices_mutex = g_mutex_new(); - self->last_part_successful = TRUE; + self->device = NULL; self->paused = TRUE; - self->part_stop_serial = 0; - self->disk_cache_read_fd = -1; - self->disk_cache_write_fd = -1; + self->part_header = NULL; + self->partnum = 1; + self->part_bytes_written = 0; + self->part_slices = NULL; } static void @@ -1478,49 +906,30 @@ finalize_impl( GObject * obj_self) { XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(obj_self); - Slab *slab, *next_slab; FileSlice *slice, *next_slice; - if (self->disk_cache_dirname) - g_free(self->disk_cache_dirname); - g_mutex_free(self->state_mutex); g_cond_free(self->state_cond); - g_mutex_free(self->slab_mutex); - g_cond_free(self->slab_cond); - g_cond_free(self->slab_free_cond); + g_mutex_free(self->ring_mutex); + g_cond_free(self->ring_add_cond); + g_cond_free(self->ring_free_cond); - /* free the slab train, without reference to the refcounts */ - for (slab = self->oldest_slab; slab != NULL; slab = next_slab) { - next_slab = slab->next; - free_slab(slab); - } - self->disk_cacher_slab = NULL; - self->mem_cache_slab = NULL; - self->device_slab = NULL; - self->oldest_slab = NULL; - self->newest_slab = NULL; - - if (self->reader_slab) { - free_slab(self->reader_slab); - self->reader_slab = NULL; - } + g_mutex_free(self->part_slices_mutex); for (slice = self->part_slices; slice; slice = next_slice) { next_slice = slice->next; - g_free(slice->filename); + if (slice->filename) + g_free(slice->filename); g_free(slice); } + if (self->ring_buffer) + g_free(self->ring_buffer); + if (self->part_header) dumpfile_free(self->part_header); - if (self->disk_cache_read_fd != -1) - close(self->disk_cache_read_fd); /* ignore error */ - if (self->disk_cache_write_fd != -1) - close(self->disk_cache_write_fd); /* ignore error */ - if (self->device) g_object_unref(self->device); @@ -1589,81 +998,48 @@ xfer_dest_taper_splitter( Device *first_device, size_t max_memory, guint64 part_size, - gboolean use_mem_cache, - const char *disk_cache_dirname) + gboolean expect_cache_inform) { XferDestTaperSplitter *self = (XferDestTaperSplitter *)g_object_new(XFER_DEST_TAPER_SPLITTER_TYPE, NULL); + GValue val; + + /* max_memory and part_size get rounded up to the next multiple of + * block_size */ + max_memory = ((max_memory + first_device->block_size - 1) + / first_device->block_size) * first_device->block_size; + if (part_size) + part_size = ((part_size + first_device->block_size - 1) + / first_device->block_size) * first_device->block_size; - self->max_memory = max_memory; self->part_size = part_size; self->partnum = 1; self->device = first_device; - g_object_ref(self->device); - - /* pick only one caching mechanism, caller! */ - g_assert(!use_mem_cache || !disk_cache_dirname); - - /* and if part size is zero, then we don't do any caching */ - if (part_size == 0) { - g_assert(!use_mem_cache && !disk_cache_dirname); - } - self->use_mem_cache = use_mem_cache; - if (disk_cache_dirname) { - self->disk_cache_dirname = g_strdup(disk_cache_dirname); - - self->part_slices = g_new0(FileSlice, 1); - self->part_slices->filename = NULL; /* indicates "use disk_cache_read_fd" */ - self->part_slices->offset = 0; - self->part_slices->length = 0; /* will be filled in in start_part */ - } - - /* calculate the device-dependent parameters */ + g_object_ref(self->device); self->block_size = first_device->block_size; + self->paused = TRUE; + self->no_more_parts = FALSE; - /* The slab size should be large enough to justify the overhead of all - * of the mutexes, but it needs to be small enough to have a few slabs - * available so that the threads are not constantly waiting on one - * another. The choice is sixteen blocks, not more than a quarter of - * the part size, and not more than 10MB. If we're not using the mem - * cache, then avoid exceeding max_memory by keeping the slab size less - * than a quarter of max_memory. */ - - self->slab_size = self->block_size * 16; - if (self->part_size) - self->slab_size = MIN(self->slab_size, self->part_size / 4); - self->slab_size = MIN(self->slab_size, 10*1024*1024); - if (!self->use_mem_cache) - self->slab_size = MIN(self->slab_size, self->max_memory / 4); - - /* round slab size up to the nearest multiple of the block size */ - self->slab_size = - ((self->slab_size + self->block_size - 1) / self->block_size) * self->block_size; - - /* round part size up to a multiple of the slab size */ - if (self->part_size != 0) { - self->slabs_per_part = (self->part_size + self->slab_size - 1) / self->slab_size; - self->part_size = self->slabs_per_part * self->slab_size; - } else { - self->slabs_per_part = 0; - } - - /* fill in the file slice's length, now that we know the real part size */ - if (self->disk_cache_dirname) - self->part_slices->length = self->part_size; + /* set up a ring buffer of size max_memory */ + self->ring_length = max_memory; + self->ring_buffer = g_malloc(max_memory); + self->ring_head = self->ring_tail = 0; + self->ring_count = 0; + self->ring_head_at_eof = 0; - if (self->use_mem_cache) { - self->max_slabs = self->slabs_per_part; + /* get this new device's streaming requirements */ + bzero(&val, sizeof(val)); + if (!device_property_get(self->device, PROPERTY_STREAMING, &val) + || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) { + g_warning("Couldn't get streaming type for %s", self->device->device_name); + self->streaming = STREAMING_REQUIREMENT_REQUIRED; } else { - self->max_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size; + self->streaming = g_value_get_enum(&val); } + g_value_unset(&val); - /* Note that max_slabs == 1 will cause deadlocks, due to some assumptions in - * alloc_slab, so we check here that it's at least 2. */ - if (self->max_slabs < 2) - self->max_slabs = 2; - - DBG(1, "using slab_size %zu and max_slabs %ju", self->slab_size, (uintmax_t)self->max_slabs); + /* grab data from cache_inform, just in case we hit PEOM */ + self->expect_cache_inform = expect_cache_inform; return XFER_ELEMENT(self); }