2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
25 #include "xfer-device.h"
29 /* A transfer destination that writes an entire dumpfile to one or more files
30 * on one or more devices, caching each part so that it can be rewritten on a
31 * subsequent volume in the event of an unexpected EOM. This is designed to
32 * work in concert with Amanda::Taper::Scribe. */
35 * - capture EOF early enough to avoid wasting a tape when the part size is an even multiple of the volume size - maybe reader thread can just go back and tag previous slab with EOF in that case?
36 * - use mmap to make the disk-cacher thread unnecessary, if supported, by simply mapping slabs into the disk cache file
37 * - can we find a way to fall back to mem_cache when the disk cache gets ENOSPC? Does it even make sense to try, since this would change the part size?
38 * - distinguish some permanent device errors and do not retry the part? (this will be a change of behavior)
44 * Slabs are larger than blocks, and are the unit on which the element
45 * operates. They are designed to be a few times larger than a block, to
46 * achieve a corresponding reduction in the number of locks and unlocks used
47 * per block, and similar reduction in the the amount of memory overhead
54 /* counts incoming pointers: the preceding slab's 'next' pointer, and pointers
55 * from any processes operating on the slab */
58 /* number of this slab in the sequence, global to this element's lifetime.
59 * Since this counts slabs, which are about 1M, this can address 16
60 * yottabytes of data before wrapping. */
63 /* slab size; this is only less than the element's slab size if the
64 * transfer is at EOF. */
67 /* base of the slab_size buffer */
75 static GObjectClass *parent_class = NULL;
77 typedef struct XferDestTaperCacher {
78 XferDestTaper __parent__;
82 * These values are supplied to the constructor, and can be assumed
83 * constant for the lifetime of the element.
86 /* maximum buffer space to use for streaming; this is unrelated to the
87 * fallback_splitsize */
90 /* split buffering info; if we're doing memory buffering, use_mem_cache is
91 * true; if we're doing disk buffering, disk_cache_dirname is non-NULL and
92 * contains the (allocated) filename of the cache file. In any
93 * case, part_size gives the desired part size. If part_size is zero, then
94 * no splitting takes place (so part_size is effectively infinite). */
95 gboolean use_mem_cache;
96 char *disk_cache_dirname;
97 guint64 part_size; /* (bytes) */
103 /* The thread doing the actual writes to tape; this also handles buffering
105 GThread *device_thread;
107 /* The thread writing slabs to the disk cache, if any */
108 GThread *disk_cache_thread;
112 * All in-memory data is contained in a linked list called the "slab
113 * train". Various components are operating simultaneously at different
114 * points in this train. Data from the upstream XferElement is appended to
115 * the head of the train, and the device thread follows along behind,
116 * writing data to the device. When caching parts in memory, the slab
117 * train just grows to eventually contain the whole part. When using an
118 * on-disk cache, the disk cache thread writes the tail of the train to
119 * disk, freeing slabs to be re-used at the head of the train. Some
120 * careful coordination of these components allows them to operate as
121 * independently as possible within the limits of the user's configuration.
123 * Slabs are rarely, if ever, freed: the oldest_slab reference generally
124 * ensures that all slabs have refcount > 0, and this pointer is only
125 * advanced when re-using slabs that have been flushed to the disk cache or
126 * when freeing slabs after completion of the transfer. */
128 /* pointers into the slab train are all protected by this mutex. Note that
129 * the slabs themselves can be manipulated without this lock; it's only
130 * when changing the pointers that the mutex must be held. Furthermore, a
131 * foo_slab variable which is not NULL will not be changed except by its
132 * controlling thread (disk_cacher_slab is controlled by disk_cache_thread,
133 * and device_slab is controlled by device_thread). This means that a
134 * controlling thread can drop the slab_mutex once it has ensured its slab
137 * Slab_cond is notified when a new slab is made available from the reader.
138 * Slab_free_cond is notified when a slab becomes available for
141 * Any thread waiting on either condition variable should also check
142 * elt->cancelled, and act appropriately if awakened in a cancelled state.
144 GMutex *slab_mutex; GCond *slab_cond; GCond *slab_free_cond;
146 /* slabs in progress by each thread, or NULL if the thread is waiting on
147 * slab_cond. These can only be changed by their respective threads, except
148 * when they are NULL (in which case the reader will point them to a new
149 * slab and signal the slab_cond). */
150 Slab *volatile disk_cacher_slab;
151 Slab *volatile mem_cache_slab;
152 Slab *volatile device_slab;
154 /* tail and head of the slab train */
155 Slab *volatile oldest_slab;
156 Slab *volatile newest_slab;
158 /* thread-specific information
160 * These values are only used by one thread, and thus are not
161 * subject to any locking or concurrency constraints.
164 /* slab in progress by the reader (not in the slab train) */
167 /* the serial to be assigned to reader_slab */
170 /* bytes written to the device in this part */
171 guint64 bytes_written;
173 /* bytes written to the device in the current slab */
174 guint64 slab_bytes_written;
178 * "state" includes all of the variables below (including device
179 * parameters). Note that the device_thread reads state values when
180 * paused is false without locking the mutex. No other thread should
181 * change state when the element is not paused.
183 * If there is every any reason to lock both mutexes, acquire this one
186 * Any thread waiting on this condition variable should also check
187 * elt->cancelled, and act appropriately if awakened in a cancelled state.
191 volatile gboolean paused;
193 /* The device to write to, and the header to write to it */
194 Device *volatile device;
195 dumpfile_t *volatile part_header;
197 /* If true, when unpaused, the device should begin at the beginning of the
198 * cache; if false, it should proceed to the next part. */
199 volatile gboolean retry_part;
201 /* If true, the previous part was completed successfully; only used for
203 volatile gboolean last_part_successful;
205 /* part number in progress */
206 volatile guint64 partnum;
208 /* if true, the main thread should *not* call start_part */
209 volatile gboolean no_more_parts;
211 /* the first serial in this part, and the serial to stop at */
212 volatile guint64 part_first_serial, part_stop_serial;
214 /* read and write file descriptors for the disk cache file, in use by the
215 * disk_cache_thread. If these are -1, wait on state_cond until they are
216 * not; once the value is set, it will not change. */
217 volatile int disk_cache_read_fd;
218 volatile int disk_cache_write_fd;
222 * Note that these values aren't known until we begin writing to the
223 * device; if block_size is zero, threads should block on state_cond until
224 * it is nonzero, at which point all of the dependent fields will have
225 * their correct values. Note that, since this value never changes after
226 * it has been set, it is safe to read block_size without acquiring the
229 /* this device's need for streaming */
230 StreamingRequirement streaming;
232 /* block size expected by the target device */
235 /* Size of a slab - some multiple of the block size */
238 /* maximum number of slabs allowed, rounded up to the next whole slab. If
239 * using mem cache, this is the equivalent of part_size bytes; otherwise,
240 * it is equivalent to max_memory bytes. */
243 /* number of slabs in a part */
244 guint64 slabs_per_part;
245 } XferDestTaperCacher;
247 static GType xfer_dest_taper_cacher_get_type(void);
248 #define XFER_DEST_TAPER_CACHER_TYPE (xfer_dest_taper_cacher_get_type())
249 #define XFER_DEST_TAPER_CACHER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_cacher_get_type(), XferDestTaperCacher)
250 #define XFER_DEST_TAPER_CACHER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_cacher_get_type(), XferDestTaperCacher const)
251 #define XFER_DEST_TAPER_CACHER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_cacher_get_type(), XferDestTaperCacherClass)
252 #define IS_XFER_DEST_TAPER_CACHER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_cacher_get_type ())
253 #define XFER_DEST_TAPER_CACHER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_cacher_get_type(), XferDestTaperCacherClass)
256 XferDestTaperClass __parent__;
258 } XferDestTaperCacherClass;
264 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
266 _xdt_dbg(const char *fmt, ...)
271 arglist_start(argp, fmt);
272 g_vsnprintf(msg, sizeof(msg), fmt, argp);
274 g_debug("XDT: %s", msg);
281 /* called with the slab_mutex held, this gets a new slab to write into, with
282 * refcount 1. It will block if max_memory slabs are already in use, and mem
283 * caching is not in use, although allocation may be forced with the 'force'
286 * If the memory allocation cannot be satisfied due to system constraints,
287 * this function will send an XMSG_ERROR, wait for the transfer to cancel, and
288 * return NULL. If the transfer is cancelled by some other means while this
289 * function is blocked awaiting a free slab, it will return NULL.
291 * @param self: the xfer element
292 * @param force: allocate a slab even if it would exceed max_memory
293 * @returns: a new slab, or NULL if the xfer is cancelled
297 XferDestTaperCacher *self,
300 XferElement *elt = XFER_ELEMENT(self);
303 DBG(8, "alloc_slab(force=%d)", force);
305 /* throttle based on maximum number of extant slabs */
310 self->oldest_slab->refcount > 1 &&
311 (self->newest_slab->serial - self->oldest_slab->serial + 1) >= self->max_slabs)) {
312 DBG(9, "waiting for available slab");
313 g_cond_wait(self->slab_free_cond, self->slab_mutex);
315 DBG(9, "done waiting");
321 /* if the oldest slab doesn't have anything else pointing to it, just use
323 if (self->oldest_slab && self->oldest_slab->refcount == 1) {
324 rv = self->oldest_slab;
325 self->oldest_slab = rv->next;
327 rv = g_new0(Slab, 1);
329 rv->base = g_try_malloc(self->slab_size);
331 xfer_cancel_with_error(XFER_ELEMENT(self),
332 _("Could not allocate %zu bytes of memory: %s"), self->slab_size, strerror(errno));
343 /* called with the slab_mutex held, this frees the given slave entirely. The
344 * reference count is not consulted.
346 * @param slab: slab to free
359 /* called with the slab_mutex held, this decrements the refcount of the
362 * @param self: xfer element
363 * @param slab: slab to free
367 XferDestTaperCacher *self,
370 g_assert(slab->refcount > 1);
372 if (G_UNLIKELY(slab->refcount == 1 && slab == self->oldest_slab)) {
373 g_cond_broadcast(self->slab_free_cond);
374 } else if (G_UNLIKELY(slab->refcount == 0)) {
379 /* called with the slab_mutex held, this sets *slabp to *slabp->next,
380 * adjusting refcounts appropriately, and returns the new value
382 * @param self: xfer element
383 * @param slabp: slab pointer to advance
384 * @returns: new value of *slabp
388 XferDestTaperCacher *self,
389 Slab * volatile *slabp)
393 if (!slabp || !*slabp)
396 next = (*slabp)->next;
400 unref_slab(self, *slabp);
409 * The disk cache thread's job is simply to follow along the slab train at
410 * maximum speed, writing slabs to the disk cache file. */
414 XferDestTaperCacher *self)
418 g_assert(self->disk_cache_read_fd == -1);
419 g_assert(self->disk_cache_write_fd == -1);
421 g_mutex_lock(self->state_mutex);
422 filename = g_strdup_printf("%s/amanda-split-buffer-XXXXXX",
423 self->disk_cache_dirname);
425 self->disk_cache_write_fd = g_mkstemp(filename);
426 if (self->disk_cache_write_fd < 0) {
427 g_mutex_unlock(self->state_mutex);
428 xfer_cancel_with_error(XFER_ELEMENT(self),
429 _("Error creating cache file in '%s': %s"), self->disk_cache_dirname,
435 /* open a separate copy of the file for reading */
436 self->disk_cache_read_fd = open(filename, O_RDONLY);
437 if (self->disk_cache_read_fd < 0) {
438 g_mutex_unlock(self->state_mutex);
439 xfer_cancel_with_error(XFER_ELEMENT(self),
440 _("Error opening cache file in '%s': %s"), self->disk_cache_dirname,
446 /* signal anyone waiting for this value */
447 g_cond_broadcast(self->state_cond);
448 g_mutex_unlock(self->state_mutex);
450 /* errors from unlink are not fatal */
451 if (unlink(filename) < 0) {
452 g_warning("While unlinking '%s': %s (ignored)", filename, strerror(errno));
463 XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(data);
464 XferElement *elt = XFER_ELEMENT(self);
466 DBG(1, "(this is the disk cache thread)");
468 /* open up the disk cache file first */
469 if (!open_disk_cache_fds(self))
472 while (!elt->cancelled) {
477 /* rewind to the begining of the disk cache file */
478 if (lseek(self->disk_cache_write_fd, 0, SEEK_SET) == -1) {
479 xfer_cancel_with_error(XFER_ELEMENT(self),
480 _("Error seeking disk cache file in '%s': %s"), self->disk_cache_dirname,
485 /* we need to sit and wait for the next part to begin, first making sure
486 * we have a slab .. */
487 g_mutex_lock(self->slab_mutex);
488 while (!self->disk_cacher_slab && !elt->cancelled) {
489 DBG(9, "waiting for a disk slab");
490 g_cond_wait(self->slab_cond, self->slab_mutex);
492 DBG(9, "done waiting");
493 g_mutex_unlock(self->slab_mutex);
498 /* this slab is now fixed until this thread changes it */
499 g_assert(self->disk_cacher_slab != NULL);
501 /* and then making sure we're ready to write that slab. */
502 g_mutex_lock(self->state_mutex);
503 while ((self->paused ||
504 (self->disk_cacher_slab && self->disk_cacher_slab->serial > self->part_first_serial))
505 && !elt->cancelled) {
506 DBG(9, "waiting for the disk slab to become current and un-paused");
507 g_cond_wait(self->state_cond, self->state_mutex);
509 DBG(9, "done waiting");
511 stop_serial = self->part_stop_serial;
512 g_mutex_unlock(self->state_mutex);
517 g_mutex_lock(self->slab_mutex);
518 slab = self->disk_cacher_slab;
520 while (!eop && !eof) {
521 /* if we're at the head of the slab train, wait for more data */
522 while (!self->disk_cacher_slab && !elt->cancelled) {
523 DBG(9, "waiting for the next disk slab");
524 g_cond_wait(self->slab_cond, self->slab_mutex);
526 DBG(9, "done waiting");
531 /* drop the lock long enough to write the slab; the refcount
532 * protects the slab during this time */
533 slab = self->disk_cacher_slab;
534 g_mutex_unlock(self->slab_mutex);
536 if (full_write(self->disk_cache_write_fd, slab->base, slab->size) < slab->size) {
537 xfer_cancel_with_error(XFER_ELEMENT(self),
538 _("Error writing to disk cache file in '%s': %s"), self->disk_cache_dirname,
543 eof = slab->size < self->slab_size;
544 eop = (slab->serial + 1 == stop_serial);
546 g_mutex_lock(self->slab_mutex);
547 next_slab(self, &self->disk_cacher_slab);
549 g_mutex_unlock(self->slab_mutex);
552 /* this very thread should have just set this value to NULL, and since it's
553 * EOF, there should not be any 'next' slab */
554 g_assert(self->disk_cacher_slab == NULL);
565 * The device thread's job is to write slabs to self->device, applying whatever
566 * streaming algorithms are required. It does this by alternately getting the
567 * next slab from a "slab source" and writing that slab to the device. Most of
568 * the slab source functions assume that self->slab_mutex is held, but may
569 * release the mutex (either explicitly or via a g_cond_wait), so it is not
570 * valid to assume that any slab pointers remain unchanged after a slab_source
571 * function invocation.
574 /* This struct tracks the current state of the slab source */
575 typedef struct slab_source_state {
576 /* temporary slab used for reading from disk */
579 /* next serial to read from disk */
583 /* Called with the slab_mutex held, this function pre-buffers enough data into the slab
584 * train to meet the device's streaming needs. */
586 slab_source_prebuffer(
587 XferDestTaperCacher *self)
589 XferElement *elt = XFER_ELEMENT(self);
590 guint64 prebuffer_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
594 /* always prebuffer at least one slab, even if max_memory is 0 */
595 if (prebuffer_slabs == 0) prebuffer_slabs = 1;
597 /* pre-buffering is not necessary if we're retrying a part */
598 if (self->retry_part)
601 /* pre-buffering means waiting until we have at least prebuffer_slabs in the
602 * slab train ahead of the device_slab, or the newest slab is at EOF. */
603 while (!elt->cancelled) {
604 gboolean eof_or_eop = FALSE;
606 /* see if there's enough data yet */
607 for (i = 0, slab = self->device_slab;
608 i < prebuffer_slabs && slab != NULL;
609 i++, slab = slab->next) {
610 eof_or_eop = (slab->size < self->slab_size)
611 || (slab->serial + 1 == self->part_stop_serial);
613 if (i == prebuffer_slabs || eof_or_eop)
616 DBG(9, "prebuffering wait");
617 g_cond_wait(self->slab_cond, self->slab_mutex);
619 DBG(9, "done waiting");
621 if (elt->cancelled) {
622 self->last_part_successful = FALSE;
623 self->no_more_parts = TRUE;
630 /* Called without the slab_mutex held, this function sets up a new slab_source_state
631 * object based on the configuratino of the Xfer Element. */
632 static inline gboolean
634 XferDestTaperCacher *self,
635 slab_source_state *state)
637 XferElement *elt = XFER_ELEMENT(self);
638 state->tmp_slab = NULL;
639 state->next_serial = G_MAXUINT64;
641 /* if we're to retry the part, rewind to the beginning */
642 if (self->retry_part) {
643 if (self->use_mem_cache) {
644 /* rewind device_slab to point to the mem_cache_slab */
645 g_mutex_lock(self->slab_mutex);
646 if (self->device_slab)
647 unref_slab(self, self->device_slab);
648 self->device_slab = self->mem_cache_slab;
649 if(self->device_slab != NULL)
650 self->device_slab->refcount++;
651 g_mutex_unlock(self->slab_mutex);
653 g_mutex_lock(self->slab_mutex);
655 /* we're going to read from the disk cache until we get to the oldest useful
656 * slab in memory, so it had best exist */
657 g_assert(self->oldest_slab != NULL);
659 /* point device_slab at the oldest slab we have */
660 self->oldest_slab->refcount++;
661 if (self->device_slab)
662 unref_slab(self, self->device_slab);
663 self->device_slab = self->oldest_slab;
665 /* and increment it until it is at least the slab we want to start from */
666 while (self->device_slab->serial < self->part_first_serial) {
667 next_slab(self, &self->device_slab);
670 /* get a new, temporary slab for use while reading */
671 state->tmp_slab = alloc_slab(self, TRUE);
673 g_mutex_unlock(self->slab_mutex);
675 if (!state->tmp_slab) {
676 /* if we couldn't allocate a slab, then we're cancelled, so we're done with
678 self->last_part_successful = FALSE;
679 self->no_more_parts = TRUE;
683 state->tmp_slab->size = self->slab_size;
684 state->next_serial = self->part_first_serial;
686 /* We're reading from the disk cache, so we need a file descriptor
687 * to read from, so wait for disk_cache_thread to open the
688 * disk_cache_read_fd */
689 g_assert(self->disk_cache_dirname);
690 g_mutex_lock(self->state_mutex);
691 while (self->disk_cache_read_fd == -1 && !elt->cancelled) {
692 DBG(9, "waiting for disk_cache_thread to set disk_cache_read_fd");
693 g_cond_wait(self->state_cond, self->state_mutex);
695 DBG(9, "done waiting");
696 g_mutex_unlock(self->state_mutex);
698 if (elt->cancelled) {
699 self->last_part_successful = FALSE;
700 self->no_more_parts = TRUE;
704 /* rewind to the beginning */
705 if (lseek(self->disk_cache_read_fd, 0, SEEK_SET) == -1) {
706 xfer_cancel_with_error(XFER_ELEMENT(self),
707 _("Could not seek disk cache file for reading: %s"),
709 self->last_part_successful = FALSE;
710 self->no_more_parts = TRUE;
716 /* if the streaming mode requires it, pre-buffer */
717 if (self->streaming == STREAMING_REQUIREMENT_DESIRED ||
718 self->streaming == STREAMING_REQUIREMENT_REQUIRED) {
719 gboolean prebuffer_ok;
721 g_mutex_lock(self->slab_mutex);
722 prebuffer_ok = slab_source_prebuffer(self);
723 g_mutex_unlock(self->slab_mutex);
731 /* Called with the slab_mutex held, this does the work of slab_source_get when
732 * reading from the disk cache. Note that this explicitly releases the
733 * slab_mutex during execution - do not depend on any protected values across a
734 * call to this function. The mutex is held on return. */
736 slab_source_get_from_disk(
737 XferDestTaperCacher *self,
738 slab_source_state *state,
741 XferDestTaper *xdt = XFER_DEST_TAPER(self);
744 g_assert(state->next_serial == serial);
746 /* NOTE: slab_mutex is held, but we don't need it here, so release it for the moment */
747 g_mutex_unlock(self->slab_mutex);
749 bytes_read = full_read(self->disk_cache_read_fd,
750 state->tmp_slab->base,
752 if ((gsize)bytes_read < self->slab_size) {
753 xfer_cancel_with_error(XFER_ELEMENT(xdt),
754 _("Error reading disk cache: %s"),
755 errno? strerror(errno) : _("Unexpected EOF"));
759 state->tmp_slab->serial = state->next_serial++;
760 g_mutex_lock(self->slab_mutex);
761 return state->tmp_slab;
764 g_mutex_lock(self->slab_mutex);
765 self->last_part_successful = FALSE;
766 self->no_more_parts = TRUE;
770 /* Called with the slab_mutex held, this function gets the slab with the given
771 * serial number, waiting if necessary for that slab to be available. Note
772 * that the slab_mutex may be released during execution, although it is always
776 XferDestTaperCacher *self,
777 slab_source_state *state,
780 XferElement *elt = (XferElement *)self;
782 /* device_slab is only NULL if we're following the slab train, so wait for
784 if (!self->device_slab) {
785 /* if the streaming mode requires it, pre-buffer */
786 if (self->streaming == STREAMING_REQUIREMENT_DESIRED) {
787 if (!slab_source_prebuffer(self))
790 /* fall through to make sure we have a device_slab;
791 * slab_source_prebuffer doesn't guarantee device_slab != NULL */
794 while (self->device_slab == NULL && !elt->cancelled) {
795 DBG(9, "waiting for the next slab");
796 g_cond_wait(self->slab_cond, self->slab_mutex);
798 DBG(9, "done waiting");
804 /* device slab is now set, and only this thread can change it */
805 g_assert(self->device_slab);
807 /* if the next item in the device slab is the one we want, then the job is
809 if (G_LIKELY(serial == self->device_slab->serial))
810 return self->device_slab;
812 /* otherwise, we're reading from disk */
813 g_assert(serial < self->device_slab->serial);
814 return slab_source_get_from_disk(self, state, serial);
817 self->last_part_successful = FALSE;
818 self->no_more_parts = TRUE;
822 /* Called without the slab_mutex held, this frees any resources assigned
823 * to the slab source state */
826 XferDestTaperCacher *self,
827 slab_source_state *state)
829 if (state->tmp_slab) {
830 g_mutex_lock(self->slab_mutex);
831 free_slab(state->tmp_slab);
832 g_mutex_unlock(self->slab_mutex);
836 /* Called without the slab_mutex, this writes the given slab to the device */
838 write_slab_to_device(
839 XferDestTaperCacher *self,
842 XferElement *elt = XFER_ELEMENT(self);
843 gpointer buf = slab->base;
844 gsize remaining = slab->size;
846 while (remaining && !elt->cancelled) {
847 gsize write_size = MIN(self->block_size, remaining);
849 ok = device_write_block(self->device, write_size, buf);
851 self->bytes_written += slab->size - remaining;
853 /* TODO: handle an error without is_eom
854 * differently/fatally? or at least with a warning? */
855 self->last_part_successful = FALSE;
856 self->no_more_parts = FALSE;
861 self->slab_bytes_written += write_size;
862 remaining -= write_size;
865 if (elt->cancelled) {
866 self->last_part_successful = FALSE;
867 self->no_more_parts = TRUE;
871 self->bytes_written += slab->size;
872 self->slab_bytes_written = 0;
877 device_thread_write_part(
878 XferDestTaperCacher *self)
880 GTimer *timer = g_timer_new();
882 slab_source_state src_state = {0, 0};
883 guint64 serial, stop_serial;
884 gboolean eof = FALSE;
887 int slab_source_set = 0;
889 self->last_part_successful = FALSE;
890 self->bytes_written = 0;
892 if (!device_start_file(self->device, self->part_header)) {
897 dumpfile_free(self->part_header);
898 self->part_header = NULL;
900 fileno = self->device->file;
901 g_assert(fileno > 0);
903 if (!slab_source_setup(self, &src_state))
907 g_timer_start(timer);
909 stop_serial = self->part_stop_serial;
910 g_mutex_lock(self->slab_mutex);
911 for (serial = self->part_first_serial; serial < stop_serial && !eof; serial++) {
912 Slab *slab = slab_source_get(self, &src_state, serial);
913 DBG(8, "writing slab %p (serial %ju) to device", slab, serial);
914 g_mutex_unlock(self->slab_mutex);
920 eof = slab->size < self->slab_size;
922 if (!write_slab_to_device(self, slab)) {
927 g_mutex_lock(self->slab_mutex);
928 DBG(8, "wrote slab %p to device", slab);
930 /* if we're reading from the slab train, advance self->device_slab. */
931 if (slab == self->device_slab) {
932 next_slab(self, &self->device_slab);
935 g_mutex_unlock(self->slab_mutex);
938 /* if we write all of the blocks, but the finish_file fails, then likely
939 * there was some buffering going on in the device driver, and the blocks
940 * did not all make it to permanent storage -- so it's a failed part. */
941 if (self->device->in_file && !device_finish_file(self->device))
944 if (slab_source_set) {
945 slab_source_free(self, &src_state);
949 self->last_part_successful = TRUE;
950 self->no_more_parts = eof;
955 msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
956 msg->size = self->bytes_written;
957 msg->duration = g_timer_elapsed(timer, NULL);
958 msg->partnum = self->partnum;
959 msg->fileno = fileno;
960 msg->successful = self->last_part_successful;
961 msg->eom = !self->last_part_successful;
962 msg->eof = self->no_more_parts;
964 /* time runs backward on some test boxes, so make sure this is positive */
965 if (msg->duration < 0) msg->duration = 0;
967 if (self->last_part_successful)
970 g_timer_destroy(timer);
975 /* Called with the status_mutex held, this frees any cached data for
976 * a successful part */
979 XferDestTaperCacher *self)
981 if (self->use_mem_cache && self->mem_cache_slab) {
982 /* move up the mem_cache_slab to point to the first slab in
983 * the next part (probably NULL at this point), so that the
984 * reader can continue reading data into the new mem cache
986 g_mutex_lock(self->slab_mutex);
987 unref_slab(self, self->mem_cache_slab);
988 self->mem_cache_slab = self->device_slab;
989 if (self->mem_cache_slab)
990 self->mem_cache_slab->refcount++;
991 g_mutex_unlock(self->slab_mutex);
994 /* the disk cache gets reused automatically (rewinding to offset 0), so
995 * there's nothing else to do */
1002 XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(data);
1003 XferElement *elt = XFER_ELEMENT(self);
1006 DBG(1, "(this is the device thread)");
1008 if (self->disk_cache_dirname) {
1009 GError *error = NULL;
1010 self->disk_cache_thread = g_thread_create(disk_cache_thread, (gpointer)self, TRUE, &error);
1011 if (!self->disk_cache_thread) {
1012 g_critical(_("Error creating new thread: %s (%s)"),
1013 error->message, errno? strerror(errno) : _("no error code"));
1017 /* This is the outer loop, that loops once for each split part written to
1019 g_mutex_lock(self->state_mutex);
1021 /* wait until the main thread un-pauses us, and check that we have
1022 * the relevant device info available (block_size) */
1023 while (self->paused && !elt->cancelled) {
1024 DBG(9, "waiting to be unpaused");
1025 g_cond_wait(self->state_cond, self->state_mutex);
1027 DBG(9, "done waiting");
1032 g_mutex_unlock(self->state_mutex);
1033 self->slab_bytes_written = 0;
1034 DBG(2, "beginning to write part");
1035 msg = device_thread_write_part(self);
1036 DBG(2, "done writing part");
1037 g_mutex_lock(self->state_mutex);
1039 /* release any cache of a successful part, but don't bother at EOF */
1040 if (msg->successful && !msg->eof)
1041 release_part_cache(self);
1043 xfer_queue_message(elt->xfer, msg);
1045 /* if this is the last part, we're done with the part loop */
1046 if (self->no_more_parts)
1049 /* pause ourselves and await instructions from the main thread */
1050 self->paused = TRUE;
1053 g_mutex_unlock(self->state_mutex);
1055 /* make sure the other thread is done before we send XMSG_DONE */
1056 if (self->disk_cache_thread)
1057 g_thread_join(self->disk_cache_thread);
1059 /* tell the main thread we're done */
1060 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
1069 /* called with the slab_mutex held, this adds the reader_slab to the head of
1070 * the slab train and signals the condition variable. */
1072 add_reader_slab_to_train(
1073 XferDestTaperCacher *self)
1075 Slab *slab = self->reader_slab;
1077 DBG(3, "adding slab of new data to the slab train");
1079 if (self->newest_slab) {
1080 self->newest_slab->next = slab;
1083 self->newest_slab->refcount--;
1086 self->newest_slab = slab; /* steal reader_slab's ref */
1087 self->reader_slab = NULL;
1089 /* steal reader_slab's reference for newest_slab */
1091 /* if any of the other pointers are waiting for this slab, update them */
1092 if (self->disk_cache_dirname && !self->disk_cacher_slab) {
1093 self->disk_cacher_slab = slab;
1096 if (self->use_mem_cache && !self->mem_cache_slab) {
1097 self->mem_cache_slab = slab;
1100 if (!self->device_slab) {
1101 self->device_slab = slab;
1104 if (!self->oldest_slab) {
1105 self->oldest_slab = slab;
1109 g_cond_broadcast(self->slab_cond);
1118 XferDestTaperCacher *self = (XferDestTaperCacher *)elt;
1121 DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
1123 /* do nothing if cancelled */
1124 if (G_UNLIKELY(elt->cancelled)) {
1125 goto free_and_finish;
1129 if (G_UNLIKELY(buf == NULL)) {
1130 /* send off the last, probably partial slab */
1131 g_mutex_lock(self->slab_mutex);
1133 /* create a new, empty slab if necessary */
1134 if (!self->reader_slab) {
1135 self->reader_slab = alloc_slab(self, FALSE);
1136 if (!self->reader_slab) {
1137 /* we've been cancelled while waiting for a slab */
1138 g_mutex_unlock(self->slab_mutex);
1140 /* wait for the xfer to cancel, so we don't get another buffer
1141 * pushed to us (and do so *without* the mutex held) */
1142 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1144 goto free_and_finish;
1146 self->reader_slab->serial = self->next_serial++;
1149 add_reader_slab_to_train(self);
1150 g_mutex_unlock(self->slab_mutex);
1152 goto free_and_finish;
1159 /* get a fresh slab, if needed */
1160 if (G_UNLIKELY(!self->reader_slab) || self->reader_slab->size == self->slab_size) {
1161 g_mutex_lock(self->slab_mutex);
1162 if (self->reader_slab)
1163 add_reader_slab_to_train(self);
1164 self->reader_slab = alloc_slab(self, FALSE);
1165 if (!self->reader_slab) {
1166 /* we've been cancelled while waiting for a slab */
1167 g_mutex_unlock(self->slab_mutex);
1169 /* wait for the xfer to cancel, so we don't get another buffer
1170 * pushed to us (and do so *without* the mutex held) */
1171 wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
1173 goto free_and_finish;
1175 self->reader_slab->serial = self->next_serial++;
1176 g_mutex_unlock(self->slab_mutex);
1182 copy_size = MIN(self->slab_size - self->reader_slab->size, size);
1183 memcpy(self->reader_slab->base+self->reader_slab->size, p, copy_size);
1185 self->reader_slab->size += copy_size;
1203 XferDestTaperCacher *self = (XferDestTaperCacher *)elt;
1204 GError *error = NULL;
1206 self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
1207 if (!self->device_thread) {
1208 g_critical(_("Error creating new thread: %s (%s)"),
1209 error->message, errno? strerror(errno) : _("no error code"));
1218 gboolean expect_eof)
1220 XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(elt);
1223 /* chain up first */
1224 rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
1226 /* then signal all of our condition variables, so that threads waiting on them
1227 * wake up and see elt->cancelled. */
1228 g_mutex_lock(self->slab_mutex);
1229 g_cond_broadcast(self->slab_cond);
1230 g_cond_broadcast(self->slab_free_cond);
1231 g_mutex_unlock(self->slab_mutex);
1233 g_mutex_lock(self->state_mutex);
1234 g_cond_broadcast(self->state_cond);
1235 g_mutex_unlock(self->state_mutex);
1243 gboolean retry_part,
1246 XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(xdt);
1248 g_assert(self->device != NULL);
1249 g_assert(!self->device->in_file);
1250 g_assert(header != NULL);
1252 DBG(1, "start_part(retry_part=%d)", retry_part);
1254 g_mutex_lock(self->state_mutex);
1255 g_assert(self->paused);
1256 g_assert(!self->no_more_parts);
1258 if (self->part_header)
1259 dumpfile_free(self->part_header);
1260 self->part_header = dumpfile_copy(header);
1263 g_assert(!self->last_part_successful);
1264 self->retry_part = TRUE;
1266 g_assert(self->last_part_successful);
1267 self->retry_part = FALSE;
1268 self->part_first_serial = self->part_stop_serial;
1269 if (self->part_size != 0) {
1270 self->part_stop_serial = self->part_first_serial + self->slabs_per_part;
1272 /* set part_stop_serial to an effectively infinite value */
1273 self->part_stop_serial = G_MAXUINT64;
1277 DBG(1, "unpausing");
1278 self->paused = FALSE;
1279 g_cond_broadcast(self->state_cond);
1281 g_mutex_unlock(self->state_mutex);
1289 XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(xdt);
1292 /* short-circuit if nothing is changing */
1293 if (self->device == device)
1296 g_mutex_lock(self->state_mutex);
1298 g_object_unref(self->device);
1299 self->device = device;
1300 g_object_ref(device);
1302 /* get this new device's streaming requirements */
1303 bzero(&val, sizeof(val));
1304 if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
1305 || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
1306 g_warning("Couldn't get streaming type for %s", self->device->device_name);
1307 self->streaming = STREAMING_REQUIREMENT_REQUIRED;
1309 self->streaming = g_value_get_enum(&val);
1311 g_value_unset(&val);
1313 /* check that the blocksize hasn't changed */
1314 if (self->block_size != device->block_size) {
1315 g_mutex_unlock(self->state_mutex);
1316 xfer_cancel_with_error(XFER_ELEMENT(self),
1317 _("All devices used by the taper must have the same block size"));
1320 g_mutex_unlock(self->state_mutex);
1324 get_part_bytes_written_impl(
1327 XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(xdt);
1329 /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
1330 * partial write to the 64-bit value on a 32-bit system). This is ok for
1331 * the moment, as it's only informational, but be warned. */
1333 return device_get_bytes_written(self->device);
1335 return self->bytes_written + self->slab_bytes_written;
1344 XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(elt);
1345 elt->can_generate_eof = FALSE;
1347 self->state_mutex = g_mutex_new();
1348 self->state_cond = g_cond_new();
1349 self->slab_mutex = g_mutex_new();
1350 self->slab_cond = g_cond_new();
1351 self->slab_free_cond = g_cond_new();
1353 self->last_part_successful = TRUE;
1354 self->paused = TRUE;
1355 self->part_stop_serial = 0;
1356 self->disk_cache_read_fd = -1;
1357 self->disk_cache_write_fd = -1;
1364 XferDestTaperCacher *self = XFER_DEST_TAPER_CACHER(obj_self);
1365 Slab *slab, *next_slab;
1367 if (self->disk_cache_dirname)
1368 g_free(self->disk_cache_dirname);
1370 g_mutex_free(self->state_mutex);
1371 g_cond_free(self->state_cond);
1373 g_mutex_free(self->slab_mutex);
1374 g_cond_free(self->slab_cond);
1375 g_cond_free(self->slab_free_cond);
1377 /* free the slab train, without reference to the refcounts */
1378 for (slab = self->oldest_slab; slab != NULL; slab = next_slab) {
1379 next_slab = slab->next;
1382 self->disk_cacher_slab = NULL;
1383 self->mem_cache_slab = NULL;
1384 self->device_slab = NULL;
1385 self->oldest_slab = NULL;
1386 self->newest_slab = NULL;
1388 if (self->reader_slab) {
1389 free_slab(self->reader_slab);
1390 self->reader_slab = NULL;
1393 if (self->part_header)
1394 dumpfile_free(self->part_header);
1396 if (self->disk_cache_read_fd != -1)
1397 close(self->disk_cache_read_fd); /* ignore error */
1398 if (self->disk_cache_write_fd != -1)
1399 close(self->disk_cache_write_fd); /* ignore error */
1402 g_object_unref(self->device);
1405 G_OBJECT_CLASS(parent_class)->finalize(obj_self);
1410 XferDestTaperCacherClass * selfc)
1412 XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
1413 XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
1414 GObjectClass *goc = G_OBJECT_CLASS(selfc);
1415 static xfer_element_mech_pair_t mech_pairs[] = {
1416 { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 1},
1417 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
1420 klass->start = start_impl;
1421 klass->cancel = cancel_impl;
1422 klass->push_buffer = push_buffer_impl;
1423 xdt_klass->start_part = start_part_impl;
1424 xdt_klass->use_device = use_device_impl;
1425 xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
1426 goc->finalize = finalize_impl;
1428 klass->perl_class = "Amanda::Xfer::Dest::Taper::Cacher";
1429 klass->mech_pairs = mech_pairs;
1431 parent_class = g_type_class_peek_parent(selfc);
1435 xfer_dest_taper_cacher_get_type (void)
1437 static GType type = 0;
1439 if G_UNLIKELY(type == 0) {
1440 static const GTypeInfo info = {
1441 sizeof (XferDestTaperCacherClass),
1442 (GBaseInitFunc) NULL,
1443 (GBaseFinalizeFunc) NULL,
1444 (GClassInitFunc) class_init,
1445 (GClassFinalizeFunc) NULL,
1446 NULL /* class_data */,
1447 sizeof (XferDestTaperCacher),
1448 0 /* n_preallocs */,
1449 (GInstanceInitFunc) instance_init,
1453 type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperCacher", &info, 0);
1464 xfer_dest_taper_cacher(
1465 Device *first_device,
1468 gboolean use_mem_cache,
1469 const char *disk_cache_dirname)
1471 XferDestTaperCacher *self = (XferDestTaperCacher *)g_object_new(XFER_DEST_TAPER_CACHER_TYPE, NULL);
1473 self->max_memory = max_memory;
1474 self->part_size = part_size;
1476 self->device = first_device;
1477 g_object_ref(self->device);
1479 /* pick only one caching mechanism, caller! */
1481 g_assert(!disk_cache_dirname);
1482 if (disk_cache_dirname)
1483 g_assert(!use_mem_cache);
1485 /* and if part size is zero, then we don't do any caching */
1486 g_assert(part_size != 0 || (!use_mem_cache && !disk_cache_dirname));
1488 self->use_mem_cache = use_mem_cache;
1489 if (disk_cache_dirname)
1490 self->disk_cache_dirname = g_strdup(disk_cache_dirname);
1492 /* calculate the device-dependent parameters */
1493 self->block_size = first_device->block_size;
1495 /* The slab size should be large enough to justify the overhead of all
1496 * of the mutexes, but it needs to be small enough to have a few slabs
1497 * available so that the threads are not constantly waiting on one
1498 * another. The choice is sixteen blocks, not more than a quarter of
1499 * the part size, and not more than 10MB. If we're not using the mem
1500 * cache, then avoid exceeding max_memory by keeping the slab size less
1501 * than a quarter of max_memory. */
1503 self->slab_size = self->block_size * 16;
1504 if (self->part_size)
1505 self->slab_size = MIN(self->slab_size, self->part_size / 4);
1506 self->slab_size = MIN(self->slab_size, 10*1024*1024);
1508 self->slab_size = MIN(self->slab_size, self->max_memory / 4);
1510 /* round slab size up to the nearest multiple of the block size */
1512 ((self->slab_size + self->block_size - 1) / self->block_size) * self->block_size;
1514 /* round part size up to a multiple of the slab size */
1515 if (self->part_size != 0) {
1516 self->slabs_per_part = (self->part_size + self->slab_size - 1) / self->slab_size;
1517 self->part_size = self->slabs_per_part * self->slab_size;
1519 self->slabs_per_part = 0;
1523 if (use_mem_cache) {
1524 self->max_slabs = self->slabs_per_part; /* increase max_slabs to serve as mem buf */
1526 self->max_slabs = (self->max_memory + self->slab_size - 1) / self->slab_size;
1529 /* Note that max_slabs == 1 will cause deadlocks, due to some assumptions in
1530 * alloc_slab, so we check here that it's at least 2. */
1531 if (self->max_slabs < 2)
1532 self->max_slabs = 2;
1534 DBG(1, "using slab_size %zu and max_slabs %ju", self->slab_size, (uintmax_t)self->max_slabs);
1536 return XFER_ELEMENT(self);