2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 as published
7 * by the Free Software Foundation.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
24 #include "xfer-device.h"
28 /* A transfer destination that writes and entire dumpfile to one or more files on one
29 * or more devices. This is designed to work in concert with Amanda::Taper::Scribe. */
32 * - capture EOF early enough to avoid wasting a tape when the part size is an even multiple of the volume size - maybe reader thread can just go back and tag previous slab with EOF in that case?
33 * - use mmap to make the disk-cacher thread unnecessary, if supported, by simply mapping slabs into the disk cache file
34 * - can we find a way to fall back to mem_cache when the disk cache gets ENOSPC? Does it even make sense to try, since this would change the part size?
35 * - distinguish some permanent device errors and do not retry the part? (this will be a change of behavior)
41 * Slabs are larger than blocks, and are the unit on which the element
42 * operates. They are designed to be a few times larger than a block, to
43 * achieve a corresponding reduction in the number of locks and unlocks used
44 * per block, and similar reduction in the the amount of memory overhead
51 /* counts incoming pointers: the preceding slab's 'next' pointer, and pointers
52 * from any processes operating on the slab */
55 /* number of this slab in the sequence, global to this element's lifetime.
56 * Since this counts slabs, which are about 1M, this can address 16
57 * yottabytes of data before wrapping. */
60 /* slab size; this is only less than the element's slab size if the
61 * transfer is at EOF. */
64 /* base of the slab_size buffer */
71 * These objects are arranged in a linked list, and describe the files in which
72 * the disk cache is stored. Note that we assume that slices are added *before*
73 * they are needed: the xfer element will fail if it tries to rewind and does not
74 * find a suitable slice.
77 typedef struct FileSlice {
78 struct FileSlice *next;
80 /* fully-qualified filename to read from, or NULL to read from disk_cache_read_fd */
83 /* offset in file to start at */
86 /* length of data to read */
94 static GObjectClass *parent_class = NULL;
96 typedef struct XferDestTaperSplitter {
97 XferDestTaper __parent__;
101 * These values are supplied to the constructor, and can be assumed
102 * constant for the lifetime of the element.
105 /* maximum buffer space to use for streaming; this is unrelated to the
106 * fallback_splitsize */
109 /* split buffering info; if we're doing memory buffering, use_mem_cache is
110 * true; if we're doing disk buffering, disk_cache_dirname is non-NULL
111 * and contains the (allocated) filename of the cache file. Either way,
112 * part_size gives the desired cache size. If part_size is zero, then
113 * no splitting takes place (so part_size is effectively infinite) */
114 gboolean use_mem_cache;
115 char *disk_cache_dirname;
116 guint64 part_size; /* (bytes) */
122 /* The thread doing the actual writes to tape; this also handles buffering
124 GThread *device_thread;
126 /* The thread writing slabs to the disk cache, if any */
127 GThread *disk_cache_thread;
131 * All in-memory data is contained in a linked list called the "slab
132 * train". Various components are operating simultaneously at different
133 * points in this train. Data from the upstream XferElement is appended to
134 * the head of the train, and the device thread follows along behind,
135 * writing data to the device. When caching parts in memory, the slab
136 * train just grows to eventually contain the whole part. When using an
137 * on-disk cache, the disk cache thread writes the tail of the train to
138 * disk, freeing slabs to be re-used at the head of the train. Some
139 * careful coordination of these components allows them to operate as
140 * independently as possible within the limits of the user's configuration.
142 * Slabs are rarely, if ever, freed: the oldest_slab reference generally
143 * ensures that all slabs have refcount > 0, and this pointer is only
144 * advanced when re-using slabs that have been flushed to the disk cache or
145 * when freeing slabs after completion of the transfer. */
147 /* pointers into the slab train are all protected by this mutex. Note that
148 * the slabs themselves can be manipulated without this lock; it's only
149 * when changing the pointers that the mutex must be held. Furthermore, a
150 * foo_slab variable which is not NULL will not be changed except by its
151 * controlling thread (disk_cacher_slab is controlled by disk_cache_thread,
152 * and device_slab is controlled by device_thread). This means that a
153 * controlling thread can drop the slab_mutex once it has ensured its slab
156 * Slab_cond is notified when a new slab is made available from the reader.
157 * Slab_free_cond is notified when a slab becomes available for
160 * Any thread waiting on either condition variable should also check
161 * elt->cancelled, and act appropriately if awakened in a cancelled state.
163 GMutex *slab_mutex; GCond *slab_cond; GCond *slab_free_cond;
165 /* slabs in progress by each thread, or NULL if the thread is waiting on
166 * slab_cond. These can only be changed by their respective threads, except
167 * when they are NULL (in which case the reader will point them to a new
168 * slab and signal the slab_cond). */
169 Slab *volatile disk_cacher_slab;
170 Slab *volatile mem_cache_slab;
171 Slab *volatile device_slab;
173 /* tail and head of the slab train */
174 Slab *volatile oldest_slab;
175 Slab *volatile newest_slab;
177 /* thread-specific information
179 * These values are only used by one thread, and thus are not
180 * subject to any locking or concurrency constraints.
183 /* slab in progress by the reader (not in the slab train) */
186 /* the serial to be assigned to reader_slab */
189 /* bytes written to the device in this part */
190 guint64 bytes_written;
192 /* bytes written to the device in the current slab */
193 guint64 slab_bytes_written;
197 * "state" includes all of the variables below (including device
198 * parameters). Note that the device_thread reads state values when
199 * paused is false without locking the mutex. No other thread should
200 * change state when the element is not paused.
202 * If there is every any reason to lock both mutexes, acquire this one
205 * Any thread waiting on this condition variable should also check
206 * elt->cancelled, and act appropriately if awakened in a cancelled state.
210 volatile gboolean paused;
212 /* The device to write to, and the header to write to it */
213 Device *volatile device;
214 dumpfile_t *volatile part_header;
216 /* If true, when unpaused, the device should begin at the beginning of the
217 * cache; if false, it should proceed to the next part. */
218 volatile gboolean retry_part;
220 /* If true, the previous part was completed successfully; only used for
222 volatile gboolean last_part_successful;
224 /* part number in progress */
225 volatile guint64 partnum;
227 /* if true, the main thread should *not* call start_part */
228 volatile gboolean no_more_parts;
230 /* the first serial in this part, and the serial to stop at */
231 volatile guint64 part_first_serial, part_stop_serial;
233 /* file slices for the current part */
234 FileSlice *volatile part_slices;
236 /* read and write file descriptors for the disk cache file, in use by the
237 * disk_cache_thread. If these are -1, wait on state_cond until they are
238 * not; once the value is set, it will not change. */
239 volatile int disk_cache_read_fd;
240 volatile int disk_cache_write_fd;
244 * Note that these values aren't known until we begin writing to the
245 * device; if block_size is zero, threads should block on state_cond until
246 * it is nonzero, at which point all of the dependent fields will have
247 * their correct values. Note that, since this value never changes after
248 * it has been set, it is safe to read block_size without acquiring the
251 /* this device's need for streaming */
252 StreamingRequirement streaming;
254 /* block size expected by the target device */
257 /* Size of a slab - some multiple of the block size */
260 /* maximum number of slabs allowed, rounded up to the next whole slab. If
261 * using mem cache, this is the equivalent of part_size bytes; otherwise,
262 * it is equivalent to max_memory bytes. */
265 /* number of slabs in a part */
266 guint64 slabs_per_part;
267 } XferDestTaperSplitter;
269 static GType xfer_dest_taper_splitter_get_type(void);
270 #define XFER_DEST_TAPER_SPLITTER_TYPE (xfer_dest_taper_splitter_get_type())
271 #define XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter)
272 #define XFER_DEST_TAPER_SPLITTER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter const)
273 #define XFER_DEST_TAPER_SPLITTER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
274 #define IS_XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_splitter_get_type ())
275 #define XFER_DEST_TAPER_SPLITTER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
278 XferDestTaperClass __parent__;
280 } XferDestTaperSplitterClass;
286 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
288 _xdt_dbg(const char *fmt, ...)
293 arglist_start(argp, fmt);
294 g_vsnprintf(msg, sizeof(msg), fmt, argp);
296 g_debug("XDT thd-%p: %s", g_thread_self(), msg);
303 /* called with the slab_mutex held, this gets a new slab to write into, with
304 * refcount 1. It will block if max_memory slabs are already in use, and mem
305 * caching is not in use, although allocation may be forced with the 'force'
308 * If the memory allocation cannot be satisfied due to system constraints,
309 * this function will send an XMSG_ERROR, wait for the transfer to cancel, and
310 * return NULL. If the transfer is cancelled by some other means while this
311 * function is blocked awaiting a free slab, it will return NULL.
313 * @param self: the xfer element
314 * @param force: allocate a slab even if it would exceed max_memory
315 * @returns: a new slab, or NULL if the xfer is cancelled
319 XferDestTaperSplitter *self,
322 XferElement *elt = XFER_ELEMENT(self);
325 DBG(8, "alloc_slab(force=%d)", force);
327 /* throttle based on maximum number of extant slabs */
332 self->oldest_slab->refcount > 1 &&
333 (self->newest_slab->serial - self->oldest_slab->serial + 1) >= self->max_slabs)) {
334 DBG(9, "waiting for available slab");
335 g_cond_wait(self->slab_free_cond, self->slab_mutex);
337 DBG(9, "done waiting");
343 /* if the oldest slab doesn't have anything else pointing to it, just use
345 if (self->oldest_slab && self->oldest_slab->refcount == 1) {
346 rv = self->oldest_slab;
347 self->oldest_slab = rv->next;
349 rv = g_new0(Slab, 1);
351 rv->base = g_try_malloc(self->slab_size);
354 xfer_cancel_with_error(XFER_ELEMENT(self),
355 _("Could not allocate %zu bytes of memory"), self->slab_size);
365 /* called with the slab_mutex held, this frees the given slave entirely. The
366 * reference count is not consulted.
368 * @param slab: slab to free
381 /* called with the slab_mutex held, this decrements the refcount of the
384 * @param self: xfer element
385 * @param slab: slab to free
389 XferDestTaperSplitter *self,
392 g_assert(slab->refcount > 1);
394 if (G_UNLIKELY(slab->refcount == 1 && slab == self->oldest_slab)) {
395 g_cond_broadcast(self->slab_free_cond);
396 } else if (G_UNLIKELY(slab->refcount == 0)) {
401 /* called with the slab_mutex held, this sets *slabp to *slabp->next,
402 * adjusting refcounts appropriately, and returns the new value
404 * @param self: xfer element
405 * @param slabp: slab pointer to advance
406 * @returns: new value of *slabp
410 XferDestTaperSplitter *self,
411 Slab * volatile *slabp)
415 if (!slabp || !*slabp)
418 next = (*slabp)->next;
422 unref_slab(self, *slabp);
431 * The disk cache thread's job is simply to follow along the slab train at
432 * maximum speed, writing slabs to the disk cache file. */
436 XferDestTaperSplitter *self)
440 g_assert(self->disk_cache_read_fd == -1);
441 g_assert(self->disk_cache_write_fd == -1);
443 g_mutex_lock(self->state_mutex);
444 filename = g_strdup_printf("%s/amanda-split-buffer-XXXXXX",
445 self->disk_cache_dirname);
447 self->disk_cache_write_fd = g_mkstemp(filename);
448 if (self->disk_cache_write_fd < 0) {
449 g_mutex_unlock(self->state_mutex);
450 xfer_cancel_with_error(XFER_ELEMENT(self),
451 _("Error creating cache file in '%s': %s"), self->disk_cache_dirname,
457 /* open a separate copy of the file for reading */
458 self->disk_cache_read_fd = open(filename, O_RDONLY);
459 if (self->disk_cache_read_fd < 0) {
460 g_mutex_unlock(self->state_mutex);
461 xfer_cancel_with_error(XFER_ELEMENT(self),
462 _("Error opening cache file in '%s': %s"), self->disk_cache_dirname,
468 /* signal anyone waiting for this value */
469 g_cond_broadcast(self->state_cond);
470 g_mutex_unlock(self->state_mutex);
472 /* errors from unlink are not fatal */
473 if (unlink(filename) < 0) {
474 g_warning("While unlinking '%s': %s (ignored)", filename, strerror(errno));
485 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
486 XferElement *elt = XFER_ELEMENT(self);
488 DBG(1, "(this is the disk cache thread)");
490 /* open up the disk cache file first */
491 if (!open_disk_cache_fds(self))
494 while (!elt->cancelled) {
499 /* rewind to the begining of the disk cache file */
500 if (lseek(self->disk_cache_write_fd, 0, SEEK_SET) == -1) {
501 xfer_cancel_with_error(XFER_ELEMENT(self),
502 _("Error seeking disk cache file in '%s': %s"), self->disk_cache_dirname,
507 /* we need to sit and wait for the next part to begin, first making sure
508 * we have a slab .. */
509 g_mutex_lock(self->slab_mutex);
510 while (!self->disk_cacher_slab && !elt->cancelled) {
511 DBG(9, "waiting for a disk slab");
512 g_cond_wait(self->slab_cond, self->slab_mutex);
514 DBG(9, "done waiting");
515 g_mutex_unlock(self->slab_mutex);
520 /* this slab is now fixed until this thread changes it */
521 g_assert(self->disk_cacher_slab != NULL);
523 /* and then making sure we're ready to write that slab. */
524 g_mutex_lock(self->state_mutex);
525 while ((self->paused ||
526 (self->disk_cacher_slab && self->disk_cacher_slab->serial > self->part_first_serial))
527 && !elt->cancelled) {
528 DBG(9, "waiting for the disk slab to become current and un-paused");
529 g_cond_wait(self->state_cond, self->state_mutex);
531 DBG(9, "done waiting");
533 stop_serial = self->part_stop_serial;
534 g_mutex_unlock(self->state_mutex);
539 g_mutex_lock(self->slab_mutex);
540 slab = self->disk_cacher_slab;
542 while (!eop && !eof) {
543 /* if we're at the head of the slab train, wait for more data */
544 while (!self->disk_cacher_slab && !elt->cancelled) {
545 DBG(9, "waiting for the next disk slab");
546 g_cond_wait(self->slab_cond, self->slab_mutex);
548 DBG(9, "done waiting");
553 /* drop the lock long enough to write the slab; the refcount
554 * protects the slab during this time */
555 slab = self->disk_cacher_slab;
556 g_mutex_unlock(self->slab_mutex);
558 if (full_write(self->disk_cache_write_fd, slab->base, slab->size) < slab->size) {
559 xfer_cancel_with_error(XFER_ELEMENT(self),
560 _("Error writing to disk cache file in '%s': %s"), self->disk_cache_dirname,
565 eof = slab->size < self->slab_size;
566 eop = (slab->serial + 1 == stop_serial);
568 g_mutex_lock(self->slab_mutex);
569 next_slab(self, &self->disk_cacher_slab);
571 g_mutex_unlock(self->slab_mutex);
574 /* this very thread should have just set this value to NULL, and since it's
575 * EOF, there should not be any 'next' slab */
576 g_assert(self->disk_cacher_slab == NULL);
587 * The device thread's job is to write slabs to self->device, applying whatever
588 * streaming algorithms are required. It does this by alternately getting the
589 * next slab from a "slab source" and writing that slab to the device. Most of
590 * the slab source functions assume that self->slab_mutex is held, but may
591 * release the mutex (either explicitly or via a g_cond_wait), so it is not
592 * valid to assume that any slab pointers remain unchanged after a slab_source
593 * function invication.
596 /* This struct tracks the current state of the slab source */
597 typedef struct slab_source_state {
598 /* temporary slab used for reading from disk */
601 /* current source slice */
604 /* open fd in current slice, or -1 */
607 /* next serial to read from disk */
610 /* bytes remaining in this slice */
611 gsize slice_remaining;
614 /* Called with the slab_mutex held, this function pre-buffers enough data into the slab
615 * train to meet the device's streaming needs. */
617 slab_source_prebuffer(
618 XferDestTaperSplitter *self)
620 XferElement *elt = XFER_ELEMENT(self);
621 guint64 prebuffer_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
625 /* always prebuffer at least one slab, even if max_memory is 0 */
626 if (prebuffer_slabs == 0) prebuffer_slabs = 1;
628 /* pre-buffering is not necessary if we're reading from a disk cache */
629 if (self->retry_part && self->part_slices)
632 /* pre-buffering means waiting until we have at least prebuffer_slabs in the
633 * slab train ahead of the device_slab, or the newest slab is at EOF. */
634 while (!elt->cancelled) {
635 gboolean eof_or_eop = FALSE;
637 /* see if there's enough data yet */
638 for (i = 0, slab = self->device_slab;
639 i < prebuffer_slabs && slab != NULL;
640 i++, slab = slab->next) {
641 eof_or_eop = (slab->size < self->slab_size)
642 || (slab->serial + 1 == self->part_stop_serial);
644 if (i == prebuffer_slabs || eof_or_eop)
647 DBG(9, "prebuffering wait");
648 g_cond_wait(self->slab_cond, self->slab_mutex);
650 DBG(9, "done waiting");
652 if (elt->cancelled) {
653 self->last_part_successful = FALSE;
654 self->no_more_parts = TRUE;
661 /* Called without the slab_mutex held, this function sets up a new slab_source_state
662 * object based on the configuratino of the Xfer Element. */
663 static inline gboolean
665 XferDestTaperSplitter *self,
666 slab_source_state *state)
668 state->tmp_slab = NULL;
669 state->slice_fd = -1;
671 state->slice_remaining = 0;
672 state->next_serial = G_MAXUINT64;
674 /* if we're to retry the part, rewind to the beginning */
675 if (self->retry_part) {
676 if (self->use_mem_cache) {
677 /* rewind device_slab to point to the mem_cache_slab */
678 g_mutex_lock(self->slab_mutex);
679 if (self->device_slab)
680 unref_slab(self, self->device_slab);
681 self->device_slab = self->mem_cache_slab;
682 if(self->device_slab != NULL)
683 self->device_slab->refcount++;
684 g_mutex_unlock(self->slab_mutex);
686 g_assert(self->part_slices);
688 g_mutex_lock(self->slab_mutex);
690 /* we're going to read from the disk cache until we get to the oldest useful
691 * slab in memory, so it had best exist */
692 g_assert(self->oldest_slab != NULL);
694 /* point device_slab at the oldest slab we have */
695 self->oldest_slab->refcount++;
696 if (self->device_slab)
697 unref_slab(self, self->device_slab);
698 self->device_slab = self->oldest_slab;
700 /* and increment it until it is at least the slab we want to start from */
701 while (self->device_slab->serial < self->part_first_serial) {
702 next_slab(self, &self->device_slab);
705 /* get a new, temporary slab for use while reading */
706 state->tmp_slab = alloc_slab(self, TRUE);
708 g_mutex_unlock(self->slab_mutex);
710 if (!state->tmp_slab) {
711 /* if we couldn't allocate a slab, then we're cancelled, so we're done with
713 self->last_part_successful = FALSE;
714 self->no_more_parts = TRUE;
718 state->tmp_slab->size = self->slab_size;
719 state->slice = self->part_slices;
720 state->next_serial = self->part_first_serial;
724 /* if the streaming mode requires it, pre-buffer */
725 if (self->streaming == STREAMING_REQUIREMENT_DESIRED ||
726 self->streaming == STREAMING_REQUIREMENT_REQUIRED) {
727 gboolean prebuffer_ok;
729 g_mutex_lock(self->slab_mutex);
730 prebuffer_ok = slab_source_prebuffer(self);
731 g_mutex_unlock(self->slab_mutex);
739 /* Called with the slab_mutex held, this does the work of slab_source_get when
740 * reading from the disk cache. Note that this explicitly releases the
741 * slab_mutex during execution - do not depend on any protected values across a
742 * call to this function. The mutex is held on return. */
744 slab_source_get_from_disk(
745 XferDestTaperSplitter *self,
746 slab_source_state *state,
749 XferElement *elt = XFER_ELEMENT(self);
750 gsize bytes_needed = self->slab_size;
751 gsize slab_offset = 0;
753 /* NOTE: slab_mutex is held, but we don't need it here, so release it for the moment */
754 g_mutex_unlock(self->slab_mutex);
756 g_assert(state->next_serial == serial);
758 while (bytes_needed > 0) {
759 gsize read_size, bytes_read;
761 if (state->slice_fd < 0) {
762 g_assert(state->slice);
763 if (state->slice->filename) {
764 /* regular cache_inform file - just open it */
765 state->slice_fd = open(state->slice->filename, O_RDONLY, 0);
766 if (state->slice_fd < 0) {
767 xfer_cancel_with_error(XFER_ELEMENT(self),
768 _("Could not open '%s' for reading: %s"),
769 state->slice->filename, strerror(errno));
773 /* wait for the disk_cache_thread to open the disk_cache_read_fd, and then copy it */
774 g_mutex_lock(self->state_mutex);
775 while (self->disk_cache_read_fd == -1 && !elt->cancelled) {
776 DBG(9, "waiting for disk_cache_thread to start up");
777 g_cond_wait(self->state_cond, self->state_mutex);
779 DBG(9, "done waiting");
780 state->slice_fd = self->disk_cache_read_fd;
781 g_mutex_unlock(self->state_mutex);
784 if (lseek(state->slice_fd, state->slice->offset, SEEK_SET) == -1) {
785 xfer_cancel_with_error(XFER_ELEMENT(self),
786 _("Could not seek '%s' for reading: %s"),
787 state->slice->filename? state->slice->filename : "(cache file)",
792 state->slice_remaining = state->slice->length;
795 read_size = MIN(state->slice_remaining, bytes_needed);
796 bytes_read = full_read(state->slice_fd,
797 state->tmp_slab->base + slab_offset,
799 if (bytes_read < read_size) {
800 xfer_cancel_with_error(XFER_ELEMENT(self),
801 _("Error reading '%s': %s"),
802 state->slice->filename? state->slice->filename : "(cache file)",
803 errno? strerror(errno) : _("Unexpected EOF"));
807 state->slice_remaining -= bytes_read;
808 if (state->slice_remaining == 0) {
809 if (close(state->slice_fd) < 0) {
810 xfer_cancel_with_error(XFER_ELEMENT(self),
811 _("Could not close fd %d: %s"),
812 state->slice_fd, strerror(errno));
815 state->slice_fd = -1;
816 state->slice = state->slice->next;
819 bytes_needed -= bytes_read;
820 slab_offset += bytes_read;
823 state->tmp_slab->serial = state->next_serial++;
825 g_mutex_lock(self->slab_mutex);
826 return state->tmp_slab;
829 g_mutex_lock(self->slab_mutex);
831 self->last_part_successful = FALSE;
832 self->no_more_parts = TRUE;
836 /* Called with the slab_mutex held, this function gets the slab with the given
837 * serial number, waiting if necessary for that slab to be available. Note
838 * that the slab_mutex may be released during execution, although it is always
842 XferDestTaperSplitter *self,
843 slab_source_state *state,
846 XferElement *elt = (XferElement *)self;
848 /* device_slab is only NULL if we're following the slab train, so wait for
850 if (!self->device_slab) {
851 /* if the streaming mode requires it, pre-buffer */
852 if (self->streaming == STREAMING_REQUIREMENT_DESIRED) {
853 if (!slab_source_prebuffer(self))
856 /* fall through to make sure we have a device_slab;
857 * slab_source_prebuffer doesn't guarantee device_slab != NULL */
860 while (self->device_slab == NULL && !elt->cancelled) {
861 DBG(9, "waiting for the next slab");
862 g_cond_wait(self->slab_cond, self->slab_mutex);
864 DBG(9, "done waiting");
870 /* device slab is now set, and only this thread can change it */
871 g_assert(self->device_slab);
873 /* if the next item in the device slab is the one we want, then the job is
875 if (G_LIKELY(serial == self->device_slab->serial))
876 return self->device_slab;
878 /* otherwise, we're reading from disk */
879 g_assert(serial < self->device_slab->serial);
880 return slab_source_get_from_disk(self, state, serial);
883 self->last_part_successful = FALSE;
884 self->no_more_parts = TRUE;
888 /* Called without the slab_mutex held, this frees any resources assigned
889 * to the slab source state */
892 XferDestTaperSplitter *self,
893 slab_source_state *state)
895 if (state->slice_fd != -1)
896 close(state->slice_fd);
898 if (state->tmp_slab) {
899 g_mutex_lock(self->slab_mutex);
900 free_slab(state->tmp_slab);
901 g_mutex_unlock(self->slab_mutex);
905 /* Called without the slab_mutex, this writes the given slab to the device */
907 write_slab_to_device(
908 XferDestTaperSplitter *self,
911 XferElement *elt = XFER_ELEMENT(self);
912 gpointer buf = slab->base;
913 gsize remaining = slab->size;
915 while (remaining && !elt->cancelled) {
916 gsize write_size = MIN(self->block_size, remaining);
918 ok = device_write_block(self->device, write_size, buf);
920 self->bytes_written += slab->size - remaining;
922 /* TODO: handle an error without is_eom
923 * differently/fatally? or at least with a warning? */
924 self->last_part_successful = FALSE;
925 self->no_more_parts = FALSE;
930 self->slab_bytes_written += write_size;
931 remaining -= write_size;
934 if (elt->cancelled) {
935 self->last_part_successful = FALSE;
936 self->no_more_parts = TRUE;
940 self->bytes_written += slab->size;
941 self->slab_bytes_written = 0;
946 device_thread_write_part(
947 XferDestTaperSplitter *self)
949 GTimer *timer = g_timer_new();
951 slab_source_state src_state;
952 guint64 serial, stop_serial;
953 gboolean eof = FALSE;
956 self->last_part_successful = FALSE;
957 self->bytes_written = 0;
959 if (!device_start_file(self->device, self->part_header))
962 dumpfile_free(self->part_header);
963 self->part_header = NULL;
965 fileno = self->device->file;
966 g_assert(fileno > 0);
968 if (!slab_source_setup(self, &src_state))
971 g_timer_start(timer);
973 stop_serial = self->part_stop_serial;
974 g_mutex_lock(self->slab_mutex);
975 for (serial = self->part_first_serial; serial < stop_serial && !eof; serial++) {
976 Slab *slab = slab_source_get(self, &src_state, serial);
977 DBG(8, "writing slab %p (serial %ju) to device", slab, serial);
978 g_mutex_unlock(self->slab_mutex);
982 eof = slab->size < self->slab_size;
984 if (!write_slab_to_device(self, slab))
987 g_mutex_lock(self->slab_mutex);
988 DBG(8, "wrote slab %p to device", slab);
990 /* if we're reading from the slab train, advance self->device_slab. */
991 if (slab == self->device_slab) {
992 next_slab(self, &self->device_slab);
995 g_mutex_unlock(self->slab_mutex);
997 /* if we write all of the blocks, but the finish_file fails, then likely
998 * there was some buffering going on in the device driver, and the blocks
999 * did not all make it to permanent storage -- so it's a failed part. */
1000 if (!device_finish_file(self->device))
1003 slab_source_free(self, &src_state);
1005 self->last_part_successful = TRUE;
1006 self->no_more_parts = eof;
1009 g_timer_stop(timer);
1011 msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
1012 msg->size = self->bytes_written;
1013 msg->duration = g_timer_elapsed(timer, NULL);
1014 msg->partnum = self->partnum;
1015 msg->fileno = fileno;
1016 msg->successful = self->last_part_successful;
1017 msg->eom = !self->last_part_successful;
1018 msg->eof = self->no_more_parts;
1020 if (self->last_part_successful)
1023 g_timer_destroy(timer);
1028 /* Called with the status_mutex held, this frees any cached data for
1029 * a successful part */
1032 XferDestTaperSplitter *self)
1034 if (self->use_mem_cache && self->mem_cache_slab) {
1035 /* move up the mem_cache_slab to point to the first slab in
1036 * the next part (probably NULL at this point), so that the
1037 * reader can continue reading data into the new mem cache
1039 g_mutex_lock(self->slab_mutex);
1040 unref_slab(self, self->mem_cache_slab);
1041 self->mem_cache_slab = self->device_slab;
1042 if (self->mem_cache_slab)
1043 self->mem_cache_slab->refcount++;
1044 g_mutex_unlock(self->slab_mutex);
1047 /* the disk_cache_thread takes care of freeing its cache */
1048 else if (self->disk_cache_dirname)
1051 /* if we have part_slices, fast-forward them. Note that we should have a
1052 * full part's worth of slices by now. */
1053 else if (self->part_slices) {
1054 guint64 bytes_remaining = self->slabs_per_part * self->slab_size;
1055 FileSlice *slice = self->part_slices;
1057 /* consume slices until we've eaten the whole part */
1058 while (bytes_remaining > 0) {
1060 g_critical("Not all data in part was represented to cache_inform");
1062 if (slice->length <= bytes_remaining) {
1063 bytes_remaining -= slice->length;
1065 self->part_slices = slice->next;
1066 g_free(slice->filename);
1068 slice = self->part_slices;
1070 slice->length -= bytes_remaining;
1071 slice->offset += bytes_remaining;
1082 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
1083 XferElement *elt = XFER_ELEMENT(self);
1086 DBG(1, "(this is the device thread)");
1088 if (self->disk_cache_dirname) {
1089 GError *error = NULL;
1090 self->disk_cache_thread = g_thread_create(disk_cache_thread, (gpointer)self, TRUE, &error);
1091 if (!self->disk_cache_thread) {
1092 g_critical(_("Error creating new thread: %s (%s)"),
1093 error->message, errno? strerror(errno) : _("no error code"));
1097 /* This is the outer loop, that loops once for each split part written to
1099 g_mutex_lock(self->state_mutex);
1101 /* wait until the main thread un-pauses us, and check that we have
1102 * the relevant device info available (block_size) */
1103 while (self->paused && !elt->cancelled) {
1104 DBG(9, "waiting to be unpaused");
1105 g_cond_wait(self->state_cond, self->state_mutex);
1107 DBG(9, "done waiting");
1112 g_mutex_unlock(self->state_mutex);
1113 self->slab_bytes_written = 0;
1114 DBG(2, "beginning to write part");
1115 msg = device_thread_write_part(self);
1116 DBG(2, "done writing part");
1117 g_mutex_lock(self->state_mutex);
1119 /* release any cache of a successful part, but don't bother at EOF */
1120 if (msg->successful && !msg->eof)
1121 release_part_cache(self);
1123 xfer_queue_message(elt->xfer, msg);
1125 /* if this is the last part, we're done with the part loop */
1126 if (self->no_more_parts)
1129 /* pause ourselves and await instructions from the main thread */
1130 self->paused = TRUE;
1133 g_mutex_unlock(self->state_mutex);
1135 /* make sure the other thread is done before we send XMSG_DONE */
1136 if (self->disk_cache_thread)
1137 g_thread_join(self->disk_cache_thread);
1139 /* tell the main thread we're done */
1140 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1149 /* called with the slab_mutex held, this adds the reader_slab to the head of
1150 * the slab train and signals the condition variable. */
1152 add_reader_slab_to_train(
1153 XferDestTaperSplitter *self)
1155 Slab *slab = self->reader_slab;
1157 DBG(3, "adding slab of new data to the slab train");
1159 if (self->newest_slab) {
1160 self->newest_slab->next = slab;
1163 self->newest_slab->refcount--;
1166 self->newest_slab = slab; /* steal reader_slab's ref */
1167 self->reader_slab = NULL;
1169 /* steal reader_slab's reference for newest_slab */
1171 /* if any of the other pointers are waiting for this slab, update them */
1172 if (self->disk_cache_dirname && !self->disk_cacher_slab) {
1173 self->disk_cacher_slab = slab;
1176 if (self->use_mem_cache && !self->mem_cache_slab) {
1177 self->mem_cache_slab = slab;
1180 if (!self->device_slab) {
1181 self->device_slab = slab;
1184 if (!self->oldest_slab) {
1185 self->oldest_slab = slab;
1189 g_cond_broadcast(self->slab_cond);
1198 XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
1201 DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
1203 /* do nothing if cancelled */
1204 if (G_UNLIKELY(elt->cancelled)) {
1205 goto free_and_finish;
1209 if (G_UNLIKELY(buf == NULL)) {
1210 /* send off the last, probably partial slab */
1211 g_mutex_lock(self->slab_mutex);
1213 /* create a new, empty slab if necessary */
1214 if (!self->reader_slab) {
1215 self->reader_slab = alloc_slab(self, FALSE);
1216 if (!self->reader_slab) {
1217 /* we've been cancelled while waiting for a slab */
1218 g_mutex_unlock(self->slab_mutex);
1220 /* wait for the xfer to cancel, so we don't get another buffer
1221 * pushed to us (and do so *without* the mutex held) */
1222 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1224 goto free_and_finish;
1226 self->reader_slab->serial = self->next_serial++;
1229 add_reader_slab_to_train(self);
1230 g_mutex_unlock(self->slab_mutex);
1232 goto free_and_finish;
1239 /* get a fresh slab, if needed */
1240 if (G_UNLIKELY(!self->reader_slab) || self->reader_slab->size == self->slab_size) {
1241 g_mutex_lock(self->slab_mutex);
1242 if (self->reader_slab)
1243 add_reader_slab_to_train(self);
1244 self->reader_slab = alloc_slab(self, FALSE);
1245 if (!self->reader_slab) {
1246 /* we've been cancelled while waiting for a slab */
1247 g_mutex_unlock(self->slab_mutex);
1249 /* wait for the xfer to cancel, so we don't get another buffer
1250 * pushed to us (and do so *without* the mutex held) */
1251 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1253 goto free_and_finish;
1255 self->reader_slab->serial = self->next_serial++;
1256 g_mutex_unlock(self->slab_mutex);
1262 copy_size = MIN(self->slab_size - self->reader_slab->size, size);
1263 memcpy(self->reader_slab->base+self->reader_slab->size, p, copy_size);
1265 self->reader_slab->size += copy_size;
1283 XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
1284 GError *error = NULL;
1286 self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
1287 if (!self->device_thread) {
1288 g_critical(_("Error creating new thread: %s (%s)"),
1289 error->message, errno? strerror(errno) : _("no error code"));
1298 gboolean expect_eof)
1300 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
1303 /* chain up first */
1304 rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
1306 /* then signal all of our condition variables, so that threads waiting on them
1307 * wake up and see elt->cancelled. */
1308 g_mutex_lock(self->state_mutex);
1309 g_cond_broadcast(self->state_cond);
1310 g_mutex_unlock(self->state_mutex);
1312 g_mutex_lock(self->slab_mutex);
1313 g_cond_broadcast(self->slab_cond);
1314 g_cond_broadcast(self->slab_free_cond);
1315 g_mutex_unlock(self->slab_mutex);
1322 XferDestTaper *xdtself,
1323 gboolean retry_part,
1326 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
1328 g_assert(self->device != NULL);
1329 g_assert(!self->device->in_file);
1330 g_assert(header != NULL);
1332 DBG(1, "start_part(retry_part=%d)", retry_part);
1334 g_mutex_lock(self->state_mutex);
1335 g_assert(self->paused);
1336 g_assert(!self->no_more_parts);
1338 if (self->part_header)
1339 dumpfile_free(self->part_header);
1340 self->part_header = dumpfile_copy(header);
1343 if (!self->use_mem_cache && !self->part_slices) {
1344 g_mutex_unlock(self->state_mutex);
1345 xfer_cancel_with_error(XFER_ELEMENT(self),
1346 _("Failed part was not cached; cannot retry"));
1349 g_assert(!self->last_part_successful);
1350 self->retry_part = TRUE;
1352 g_assert(self->last_part_successful);
1353 self->retry_part = FALSE;
1354 self->part_first_serial = self->part_stop_serial;
1355 if (self->part_size != 0) {
1356 self->part_stop_serial = self->part_first_serial + self->slabs_per_part;
1358 /* set part_stop_serial to an effectively infinite value */
1359 self->part_stop_serial = G_MAXUINT64;
1363 DBG(1, "unpausing");
1364 self->paused = FALSE;
1365 g_cond_broadcast(self->state_cond);
1367 g_mutex_unlock(self->state_mutex);
1372 XferDestTaper *xdtself,
1375 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
1378 /* short-circuit if nothing is changing */
1379 if (self->device == device)
1382 g_mutex_lock(self->state_mutex);
1384 g_object_unref(self->device);
1385 self->device = device;
1386 g_object_ref(device);
1388 /* get this new device's streaming requirements */
1389 bzero(&val, sizeof(val));
1390 if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
1391 || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
1392 g_warning("Couldn't get streaming type for %s", self->device->device_name);
1393 self->streaming = STREAMING_REQUIREMENT_REQUIRED;
1395 self->streaming = g_value_get_enum(&val);
1397 g_value_unset(&val);
1399 /* check that the blocksize hasn't changed */
1400 if (self->block_size != device->block_size) {
1401 g_mutex_unlock(self->state_mutex);
1402 xfer_cancel_with_error(XFER_ELEMENT(self),
1403 _("All devices used by the taper must have the same block size"));
1406 g_mutex_unlock(self->state_mutex);
1411 XferDestTaper *xdtself,
1412 const char *filename,
1416 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
1417 FileSlice *slice, *iter;
1419 DBG(1, "cache_inform(\"%s\", %jd, %jd)", filename, (intmax_t)offset, (intmax_t)length);
1421 /* do we even need this info? */
1422 if (self->disk_cache_dirname || self->use_mem_cache || self->part_size == 0)
1425 /* handle the (admittedly unlikely) event that length is larger than gsize.
1426 * Hopefully if sizeof(off_t) = sizeof(gsize), this will get optimized out */
1427 while (sizeof(off_t) > sizeof(gsize) && length > (off_t)SIZE_MAX) {
1428 cache_inform_impl(xdtself, filename, offset, (off_t)SIZE_MAX);
1429 offset += (off_t)SIZE_MAX;
1430 length -= (off_t)SIZE_MAX;
1433 slice = g_new0(FileSlice, 1);
1434 slice->filename = g_strdup(filename);
1435 slice->offset = offset;
1436 slice->length = (gsize)length;
1438 g_mutex_lock(self->state_mutex);
1439 if (self->part_slices) {
1440 for (iter = self->part_slices; iter->next; iter = iter->next) {}
1443 self->part_slices = slice;
1445 g_mutex_unlock(self->state_mutex);
1449 get_part_bytes_written_impl(
1450 XferDestTaper *xdtself)
1452 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
1453 return self->bytes_written + self->slab_bytes_written;
1460 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
1461 elt->can_generate_eof = FALSE;
1463 self->state_mutex = g_mutex_new();
1464 self->state_cond = g_cond_new();
1465 self->slab_mutex = g_mutex_new();
1466 self->slab_cond = g_cond_new();
1467 self->slab_free_cond = g_cond_new();
1469 self->last_part_successful = TRUE;
1470 self->paused = TRUE;
1471 self->part_stop_serial = 0;
1472 self->disk_cache_read_fd = -1;
1473 self->disk_cache_write_fd = -1;
1480 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(obj_self);
1481 Slab *slab, *next_slab;
1482 FileSlice *slice, *next_slice;
1484 if (self->disk_cache_dirname)
1485 g_free(self->disk_cache_dirname);
1487 g_mutex_free(self->state_mutex);
1488 g_cond_free(self->state_cond);
1490 g_mutex_free(self->slab_mutex);
1491 g_cond_free(self->slab_cond);
1492 g_cond_free(self->slab_free_cond);
1494 /* free the slab train, without reference to the refcounts */
1495 for (slab = self->oldest_slab; slab != NULL; slab = next_slab) {
1496 next_slab = slab->next;
1499 self->disk_cacher_slab = NULL;
1500 self->mem_cache_slab = NULL;
1501 self->device_slab = NULL;
1502 self->oldest_slab = NULL;
1503 self->newest_slab = NULL;
1505 if (self->reader_slab) {
1506 free_slab(self->reader_slab);
1507 self->reader_slab = NULL;
1510 for (slice = self->part_slices; slice; slice = next_slice) {
1511 next_slice = slice->next;
1512 g_free(slice->filename);
1516 if (self->part_header)
1517 dumpfile_free(self->part_header);
1519 if (self->disk_cache_read_fd != -1)
1520 close(self->disk_cache_read_fd); /* ignore error */
1521 if (self->disk_cache_write_fd != -1)
1522 close(self->disk_cache_write_fd); /* ignore error */
1525 g_object_unref(self->device);
1528 G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1533 XferDestTaperSplitterClass * selfc)
1535 XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1536 XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
1537 GObjectClass *goc = G_OBJECT_CLASS(selfc);
1538 static xfer_element_mech_pair_t mech_pairs[] = {
1539 { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 1},
1540 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1543 klass->start = start_impl;
1544 klass->cancel = cancel_impl;
1545 klass->push_buffer = push_buffer_impl;
1546 xdt_klass->start_part = start_part_impl;
1547 xdt_klass->use_device = use_device_impl;
1548 xdt_klass->cache_inform = cache_inform_impl;
1549 xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
1550 goc->finalize = finalize_impl;
1552 klass->perl_class = "Amanda::Xfer::Dest::Taper::Splitter";
1553 klass->mech_pairs = mech_pairs;
1555 parent_class = g_type_class_peek_parent(selfc);
1559 xfer_dest_taper_splitter_get_type (void)
1561 static GType type = 0;
1563 if G_UNLIKELY(type == 0) {
1564 static const GTypeInfo info = {
1565 sizeof (XferDestTaperSplitterClass),
1566 (GBaseInitFunc) NULL,
1567 (GBaseFinalizeFunc) NULL,
1568 (GClassInitFunc) class_init,
1569 (GClassFinalizeFunc) NULL,
1570 NULL /* class_data */,
1571 sizeof (XferDestTaperSplitter),
1572 0 /* n_preallocs */,
1573 (GInstanceInitFunc) instance_init,
1577 type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperSplitter", &info, 0);
1588 xfer_dest_taper_splitter(
1589 Device *first_device,
1592 gboolean use_mem_cache,
1593 const char *disk_cache_dirname)
1595 XferDestTaperSplitter *self = (XferDestTaperSplitter *)g_object_new(XFER_DEST_TAPER_SPLITTER_TYPE, NULL);
1597 self->max_memory = max_memory;
1598 self->part_size = part_size;
1600 self->device = first_device;
1601 g_object_ref(self->device);
1603 /* pick only one caching mechanism, caller! */
1604 g_assert(!use_mem_cache || !disk_cache_dirname);
1606 /* and if part size is zero, then we don't do any caching */
1607 if (part_size == 0) {
1608 g_assert(!use_mem_cache && !disk_cache_dirname);
1611 self->use_mem_cache = use_mem_cache;
1612 if (disk_cache_dirname) {
1613 self->disk_cache_dirname = g_strdup(disk_cache_dirname);
1615 self->part_slices = g_new0(FileSlice, 1);
1616 self->part_slices->filename = NULL; /* indicates "use disk_cache_read_fd" */
1617 self->part_slices->offset = 0;
1618 self->part_slices->length = 0; /* will be filled in in start_part */
1621 /* calculate the device-dependent parameters */
1622 self->block_size = first_device->block_size;
1624 /* The slab size should be large enough to justify the overhead of all
1625 * of the mutexes, but it needs to be small enough to have a few slabs
1626 * available so that the threads are not constantly waiting on one
1627 * another. The choice is sixteen blocks, not more than a quarter of
1628 * the part size, and not more than 10MB. If we're not using the mem
1629 * cache, then avoid exceeding max_memory by keeping the slab size less
1630 * than a quarter of max_memory. */
1632 self->slab_size = self->block_size * 16;
1633 if (self->part_size)
1634 self->slab_size = MIN(self->slab_size, self->part_size / 4);
1635 self->slab_size = MIN(self->slab_size, 10*1024*1024);
1636 if (!self->use_mem_cache)
1637 self->slab_size = MIN(self->slab_size, self->max_memory / 4);
1639 /* round slab size up to the nearest multiple of the block size */
1641 ((self->slab_size + self->block_size - 1) / self->block_size) * self->block_size;
1643 /* round part size up to a multiple of the slab size */
1644 if (self->part_size != 0) {
1645 self->slabs_per_part = (self->part_size + self->slab_size - 1) / self->slab_size;
1646 self->part_size = self->slabs_per_part * self->slab_size;
1648 self->slabs_per_part = 0;
1651 /* fill in the file slice's length, now that we know the real part size */
1652 if (self->disk_cache_dirname)
1653 self->part_slices->length = self->part_size;
1655 if (self->use_mem_cache) {
1656 self->max_slabs = self->slabs_per_part;
1658 self->max_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
1661 /* Note that max_slabs == 1 will cause deadlocks, due to some assumptions in
1662 * alloc_slab, so we check here that it's at least 2. */
1663 if (self->max_slabs < 2)
1664 self->max_slabs = 2;
1666 DBG(1, "using slab_size %zu and max_slabs %ju", self->slab_size, (uintmax_t)self->max_slabs);
1668 return XFER_ELEMENT(self);