2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2009, 2010 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 as published
7 * by the Free Software Foundation.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
24 #include "xfer-device.h"
28 /* A transfer destination that writes an entire dumpfile to one or more files
29 * on one or more devices, without any caching. This destination supports both
30 * LEOM-based splitting (in which parts are never rewound) and cache_inform-based
31 * splitting (in which rewound parts are read from holding disk). */
34 * File Slices - Cache Information
36 * The cache_inform implementation adds cache information to a linked list of
37 * these objects, in order. The objects are arranged in a linked list, and
38 * describe the files in which the part data is stored. Note that we assume
39 * that slices are added *before* they are needed: the xfer element will fail
40 * if it tries to rewind and does not find a suitable slice.
42 * The slices should be "fast forwarded" after every part, so that the first
43 * byte in part_slices is the first byte of the part; when a retry of a part is
44 * required, use the iterator methods to properly open the various files and do
48 typedef struct FileSlice {
49 struct FileSlice *next;
51 /* fully-qualified filename to read from (or NULL to read from
52 * disk_cache_read_fd in XferDestTaperCacher) */
55 /* offset in file to start at */
58 /* length of data to read */
66 static GObjectClass *parent_class = NULL;
68 typedef struct XferDestTaperSplitter {
69 XferDestTaper __parent__;
73 * These values are supplied to the constructor, and can be assumed
74 * constant for the lifetime of the element.
77 /* Maximum size of each part (bytes) */
80 /* the device's need for streaming (it's assumed that all subsequent devices
81 * have the same needs) */
82 StreamingRequirement streaming;
84 /* block size expected by the target device */
87 /* TRUE if this element is expecting slices via cache_inform */
88 gboolean expect_cache_inform;
90 /* The thread doing the actual writes to tape; this also handles buffering
92 GThread *device_thread;
96 * This buffer holds MAX_MEMORY bytes of data (rounded up to the next
97 * blocksize), and serves as the interface between the device_thread and
98 * the thread calling push_buffer. Ring_length is the total length of the
99 * buffer in bytes, while ring_count is the number of data bytes currently
100 * in the buffer. The ring_add_cond is signalled when data is added to the
101 * buffer, while ring_free_cond is signalled when data is removed. Both
102 * are governed by ring_mutex, and both are signalled when the transfer is
107 GCond *ring_add_cond, *ring_free_cond;
109 gsize ring_length, ring_count;
110 gsize ring_head, ring_tail;
111 gboolean ring_head_at_eof;
115 * "state" includes all of the variables below (including device
116 * parameters). Note that the device_thread holdes this mutex for the
117 * entire duration of writing a part.
119 * state_mutex should always be locked before ring_mutex, if both are to be
120 * held simultaneously.
124 volatile gboolean paused;
126 /* The device to write to, and the header to write to it */
127 Device *volatile device;
128 dumpfile_t *volatile part_header;
130 /* bytes to read from cached slices before reading from the ring buffer */
131 guint64 bytes_to_read_from_slices;
133 /* part number in progress */
134 volatile guint64 partnum;
136 /* status of the last part */
137 gboolean last_part_eof;
138 gboolean last_part_eom;
139 gboolean last_part_successful;
141 /* true if the element is done writing to devices */
142 gboolean no_more_parts;
144 /* total bytes written in the current part */
145 volatile guint64 part_bytes_written;
147 /* The list of active slices for the current part. The cache_inform method
148 * appends to this list. It is safe to read this linked list, beginning at
149 * the head, *if* you can guarantee that slices will not be fast-forwarded
150 * in the interim. The finalize method for this class will take care of
151 * freeing any leftover slices. Take the part_slices mutex while modifying
152 * the links in this list. */
153 FileSlice *part_slices;
154 GMutex *part_slices_mutex;
155 } XferDestTaperSplitter;
157 static GType xfer_dest_taper_splitter_get_type(void);
158 #define XFER_DEST_TAPER_SPLITTER_TYPE (xfer_dest_taper_splitter_get_type())
159 #define XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter)
160 #define XFER_DEST_TAPER_SPLITTER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter const)
161 #define XFER_DEST_TAPER_SPLITTER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
162 #define IS_XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_splitter_get_type ())
163 #define XFER_DEST_TAPER_SPLITTER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
166 XferDestTaperClass __parent__;
168 } XferDestTaperSplitterClass;
174 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
176 _xdt_dbg(const char *fmt, ...)
181 arglist_start(argp, fmt);
182 g_vsnprintf(msg, sizeof(msg), fmt, argp);
184 g_debug("XDT thd-%p: %s", g_thread_self(), msg);
187 /* "Fast forward" the slice list by the given length. This will free any
188 * slices that are no longer necessary, and adjust the offset and length of the
189 * first remaining slice. This assumes the state mutex is locked during its
192 * @param self: element
193 * @param length: number of bytes to fast forward
197 XferDestTaperSplitter *self,
202 /* consume slices until we've eaten the whole part */
203 g_mutex_lock(self->part_slices_mutex);
205 g_assert(self->part_slices);
206 slice = self->part_slices;
208 if (slice->length <= length) {
209 length -= slice->length;
211 self->part_slices = slice->next;
213 g_free(slice->filename);
215 slice = self->part_slices;
217 slice->length -= length;
218 slice->offset += length;
222 g_mutex_unlock(self->part_slices_mutex);
229 /* A struct for use in iterating over data in the slices */
230 typedef struct SliceIterator {
234 /* file descriptor of the current file, or -1 if it's not open yet */
237 /* bytes remaining in this slice */
238 guint64 slice_remaining;
241 /* Utility functions for SliceIterator */
243 /* Begin iterating over slices, starting at the first byte of the first slice.
244 * Initializes a pre-allocated SliceIterator. The caller must ensure that
245 * fast_forward_slices is not called while an iteration is in
250 XferDestTaperSplitter *self,
254 iter->slice_remaining = 0;
255 g_mutex_lock(self->part_slices_mutex);
256 iter->slice = self->part_slices;
257 /* it's safe to unlock this because, at worst, a new entry will
258 * be appended while the iterator is in progress */
259 g_mutex_unlock(self->part_slices_mutex);
263 /* Get a block of data from the iterator, returning a pointer to a buffer
264 * containing the data; the buffer remains the property of the iterator.
265 * Returns NULL on error, after calling xfer_cancel_with_error with an
266 * appropriate error message. This function does not block, so it does not
267 * check for cancellation.
271 XferDestTaperSplitter *self,
276 gsize buf_offset = 0;
277 XferElement *elt = XFER_ELEMENT(self);
279 g_assert(iter != NULL);
280 g_assert(buf != NULL);
282 while (bytes_needed > 0) {
286 if (iter->cur_fd < 0) {
289 g_assert(iter->slice != NULL);
290 g_assert(iter->slice->filename != NULL);
292 iter->cur_fd = open(iter->slice->filename, O_RDONLY, 0);
293 if (iter->cur_fd < 0) {
294 xfer_cancel_with_error(elt,
295 _("Could not open '%s' for reading: %s"),
296 iter->slice->filename, strerror(errno));
300 iter->slice_remaining = iter->slice->length;
301 offset = iter->slice->offset;
303 if (lseek(iter->cur_fd, offset, SEEK_SET) == -1) {
304 xfer_cancel_with_error(elt,
305 _("Could not seek '%s' for reading: %s"),
306 iter->slice->filename, strerror(errno));
311 read_size = MIN(iter->slice_remaining, bytes_needed);
312 bytes_read = full_read(iter->cur_fd,
315 if (bytes_read < 0 || (gsize)bytes_read < read_size) {
316 xfer_cancel_with_error(elt,
317 _("Error reading '%s': %s"),
318 iter->slice->filename,
319 errno? strerror(errno) : _("Unexpected EOF"));
323 iter->slice_remaining -= bytes_read;
324 buf_offset += bytes_read;
325 bytes_needed -= bytes_read;
327 if (iter->slice_remaining <= 0) {
328 if (close(iter->cur_fd) < 0) {
329 xfer_cancel_with_error(elt,
330 _("Could not close fd %d: %s"),
331 iter->cur_fd, strerror(errno));
336 iter->slice = iter->slice->next;
347 /* Free the iterator's resources */
352 if (iter->cur_fd >= 0)
360 /* Wait for at least one block, or EOF, to be available in the ring buffer.
361 * Called with the ring mutex held. */
363 device_thread_wait_for_block(
364 XferDestTaperSplitter *self)
366 XferElement *elt = XFER_ELEMENT(self);
367 gsize bytes_needed = self->device->block_size;
370 /* for any kind of streaming, we need to fill the entire buffer before the
372 if (self->part_bytes_written == 0 && self->streaming != STREAMING_REQUIREMENT_NONE)
373 bytes_needed = self->ring_length;
380 if (self->ring_count >= bytes_needed)
383 if (self->ring_head_at_eof)
387 g_cond_wait(self->ring_add_cond, self->ring_mutex);
389 /* in STREAMING_REQUIREMENT_REQUIRED, once we decide to wait for more bytes,
390 * we need to wait for the entire buffer to fill */
391 if (self->streaming == STREAMING_REQUIREMENT_REQUIRED)
392 bytes_needed = self->ring_length;
395 usable = MIN(self->ring_count, bytes_needed);
397 usable = MIN(usable, self->part_size - self->part_bytes_written);
402 /* Mark WRITTEN bytes as free in the ring buffer. Called with the ring mutex
405 device_thread_consume_block(
406 XferDestTaperSplitter *self,
409 self->ring_count -= written;
410 self->ring_tail += written;
411 if (self->ring_tail >= self->ring_length)
412 self->ring_tail -= self->ring_length;
413 g_cond_broadcast(self->ring_free_cond);
416 /* Write an entire part. Called with the state_mutex held */
418 device_thread_write_part(
419 XferDestTaperSplitter *self)
421 GTimer *timer = g_timer_new();
422 XferElement *elt = XFER_ELEMENT(self);
424 enum { PART_EOF, PART_LEOM, PART_EOP, PART_FAILED } part_status = PART_FAILED;
428 self->part_bytes_written = 0;
430 g_timer_start(timer);
432 /* write the header; if this fails or hits LEOM, we consider this a
433 * successful 0-byte part */
434 if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
435 part_status = PART_LEOM;
439 fileno = self->device->file;
440 g_assert(fileno > 0);
442 /* free the header, now that it's written */
443 dumpfile_free(self->part_header);
444 self->part_header = NULL;
446 /* First, read the requisite number of bytes from the part_slices, if the part was
448 if (self->bytes_to_read_from_slices) {
450 gsize to_write = self->block_size;
451 gpointer buf = g_malloc(to_write);
452 gboolean successful = TRUE;
453 guint64 bytes_from_slices = self->bytes_to_read_from_slices;
455 DBG(5, "reading %ju bytes from slices", (uintmax_t)bytes_from_slices);
457 iterate_slices(self, &iter);
458 while (bytes_from_slices) {
461 if (!iterator_get_block(self, &iter, buf, to_write)) {
462 part_status = PART_FAILED;
467 /* note that it's OK to reference these ring_* vars here, as they
468 * are static at this point */
469 ok = device_write_block(self->device, (guint)to_write, buf);
472 part_status = PART_FAILED;
477 self->part_bytes_written += to_write;
478 bytes_from_slices -= to_write;
480 if (self->part_size && self->part_bytes_written >= self->part_size) {
481 part_status = PART_EOP;
484 } else if (self->device->is_eom) {
485 part_status = PART_LEOM;
491 iterator_free(&iter);
494 /* if we didn't finish, get out of here now */
499 g_mutex_lock(self->ring_mutex);
504 /* wait for at least one block, and (if necessary) prebuffer */
505 to_write = device_thread_wait_for_block(self);
506 to_write = MIN(to_write, self->device->block_size);
511 part_status = PART_EOF;
515 g_mutex_unlock(self->ring_mutex);
516 DBG(8, "writing %ju bytes to device", (uintmax_t)to_write);
518 /* note that it's OK to reference these ring_* vars here, as they
519 * are static at this point */
520 ok = device_write_block(self->device, (guint)to_write,
521 self->ring_buffer + self->ring_tail);
522 g_mutex_lock(self->ring_mutex);
525 part_status = PART_FAILED;
529 self->part_bytes_written += to_write;
530 device_thread_consume_block(self, to_write);
532 if (self->part_size && self->part_bytes_written >= self->part_size) {
533 part_status = PART_EOP;
535 } else if (self->device->is_eom) {
536 part_status = PART_LEOM;
540 g_mutex_unlock(self->ring_mutex);
543 /* if we write all of the blocks, but the finish_file fails, then likely
544 * there was some buffering going on in the device driver, and the blocks
545 * did not all make it to permanent storage -- so it's a failed part. Note
546 * that we try to finish_file even if the part failed, just to be thorough. */
547 if (self->device->in_file) {
548 if (!device_finish_file(self->device))
549 if (!elt->cancelled) {
550 part_status = PART_FAILED;
554 if (elt->cancelled) {
555 g_timer_destroy(timer);
561 msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
562 msg->size = self->part_bytes_written;
563 msg->duration = g_timer_elapsed(timer, NULL);
564 msg->partnum = self->partnum;
565 msg->fileno = fileno;
566 msg->successful = self->last_part_successful = part_status != PART_FAILED;
567 msg->eom = self->last_part_eom = (part_status == PART_LEOM || !msg->successful);
568 msg->eof = self->last_part_eof = part_status == PART_EOF;
570 /* time runs backward on some test boxes, so make sure this is positive */
571 if (msg->duration < 0) msg->duration = 0;
575 self->no_more_parts = msg->eof || (!msg->successful && !self->expect_cache_inform);
577 g_timer_destroy(timer);
586 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
587 XferElement *elt = XFER_ELEMENT(self);
590 DBG(1, "(this is the device thread)");
592 /* This is the outer loop, that loops once for each split part written to
594 g_mutex_lock(self->state_mutex);
596 /* wait until the main thread un-pauses us, and check that we have
597 * the relevant device info available (block_size) */
598 while (self->paused && !elt->cancelled) {
599 DBG(9, "waiting to be unpaused");
600 g_cond_wait(self->state_cond, self->state_mutex);
602 DBG(9, "done waiting");
607 DBG(2, "beginning to write part");
608 msg = device_thread_write_part(self);
609 DBG(2, "done writing part");
611 if (!msg) /* cancelled */
614 /* release the slices for this part, if there were any slices */
615 if (msg->successful && self->expect_cache_inform) {
616 fast_forward_slices(self, msg->size);
619 xfer_queue_message(elt->xfer, msg);
621 /* pause ourselves and await instructions from the main thread */
624 /* if this is the last part, we're done with the part loop */
625 if (self->no_more_parts)
628 g_mutex_unlock(self->state_mutex);
630 /* tell the main thread we're done */
631 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
646 XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
649 DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
651 /* do nothing if cancelled */
652 if (G_UNLIKELY(elt->cancelled)) {
653 goto free_and_finish;
657 if (G_UNLIKELY(buf == NULL)) {
658 /* indicate EOF to the device thread */
659 g_mutex_lock(self->ring_mutex);
660 self->ring_head_at_eof = TRUE;
661 g_cond_broadcast(self->ring_add_cond);
662 g_mutex_unlock(self->ring_mutex);
663 goto free_and_finish;
666 /* push the block into the ring buffer, in pieces if necessary */
667 g_mutex_lock(self->ring_mutex);
671 /* wait for some space */
672 while (self->ring_count == self->ring_length && !elt->cancelled) {
673 DBG(9, "waiting for any space to buffer pushed data");
674 g_cond_wait(self->ring_free_cond, self->ring_mutex);
676 DBG(9, "done waiting");
679 goto unlock_and_free_and_finish;
681 /* only copy to the end of the buffer, if the available space wraps
682 * around to the beginning */
683 avail = MIN(size, self->ring_length - self->ring_count);
684 avail = MIN(avail, self->ring_length - self->ring_head);
686 /* copy AVAIL bytes into the ring buf (knowing it's contiguous) */
687 memmove(self->ring_buffer + self->ring_head, p, avail);
689 /* reset the ring variables to represent this state */
690 self->ring_count += avail;
691 self->ring_head += avail; /* will, at most, hit ring_length */
692 if (self->ring_head == self->ring_length)
694 p = (gpointer)((guchar *)p + avail);
697 /* and give the device thread a notice that data is ready */
698 g_cond_broadcast(self->ring_add_cond);
701 unlock_and_free_and_finish:
702 g_mutex_unlock(self->ring_mutex);
717 XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
718 GError *error = NULL;
720 self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
721 if (!self->device_thread) {
722 g_critical(_("Error creating new thread: %s (%s)"),
723 error->message, errno? strerror(errno) : _("no error code"));
734 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
738 rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
740 /* then signal all of our condition variables, so that threads waiting on them
741 * wake up and see elt->cancelled. */
742 g_mutex_lock(self->ring_mutex);
743 g_cond_broadcast(self->ring_add_cond);
744 g_cond_broadcast(self->ring_free_cond);
745 g_mutex_unlock(self->ring_mutex);
747 g_mutex_lock(self->state_mutex);
748 g_cond_broadcast(self->state_cond);
749 g_mutex_unlock(self->state_mutex);
760 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
762 g_assert(self->device != NULL);
763 g_assert(!self->device->in_file);
764 g_assert(header != NULL);
766 DBG(1, "start_part()");
768 /* we can only retry the part if we're getting slices via cache_inform's */
770 if (self->last_part_successful) {
771 xfer_cancel_with_error(XFER_ELEMENT(self),
772 _("Previous part did not fail; cannot retry"));
776 if (!self->expect_cache_inform) {
777 xfer_cancel_with_error(XFER_ELEMENT(self),
778 _("No cache for previous failed part; cannot retry"));
782 self->bytes_to_read_from_slices = self->part_bytes_written;
784 /* don't read any bytes from the slices, since we're not retrying */
785 self->bytes_to_read_from_slices = 0;
788 g_mutex_lock(self->state_mutex);
789 g_assert(self->paused);
790 g_assert(!self->no_more_parts);
792 if (self->part_header)
793 dumpfile_free(self->part_header);
794 self->part_header = dumpfile_copy(header);
797 self->paused = FALSE;
798 g_cond_broadcast(self->state_cond);
800 g_mutex_unlock(self->state_mutex);
805 XferDestTaper *xdtself,
808 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
809 StreamingRequirement newstreaming;
812 DBG(1, "use_device(%s)%s", device->device_name, (device == self->device)? " (no change)":"");
814 /* short-circuit if nothing is changing */
815 if (self->device == device)
818 g_mutex_lock(self->state_mutex);
820 g_object_unref(self->device);
821 self->device = device;
822 g_object_ref(device);
824 /* get this new device's streaming requirements */
825 bzero(&val, sizeof(val));
826 if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
827 || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
828 g_warning("Couldn't get streaming type for %s", self->device->device_name);
830 newstreaming = g_value_get_enum(&val);
831 if (newstreaming != self->streaming)
832 g_warning("New device has different streaming requirements from the original; "
833 "ignoring new requirement");
837 /* check that the blocksize hasn't changed */
838 if (self->block_size != device->block_size) {
839 g_mutex_unlock(self->state_mutex);
840 xfer_cancel_with_error(XFER_ELEMENT(self),
841 _("All devices used by the taper must have the same block size"));
844 g_mutex_unlock(self->state_mutex);
850 const char *filename,
854 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
855 FileSlice *slice = g_new(FileSlice, 1), *iter;
858 slice->filename = g_strdup(filename);
859 slice->offset = offset;
860 slice->length = length;
862 g_mutex_lock(self->part_slices_mutex);
863 if (self->part_slices) {
864 for (iter = self->part_slices; iter->next; iter = iter->next) {}
867 self->part_slices = slice;
869 g_mutex_unlock(self->part_slices_mutex);
873 get_part_bytes_written_impl(
874 XferDestTaper *xdtself)
876 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
878 /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
879 * partial write to the 64-bit value on a 32-bit system). This is ok for
880 * the moment, as it's only informational, but be warned. */
881 return self->part_bytes_written;
888 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
889 elt->can_generate_eof = FALSE;
891 self->state_mutex = g_mutex_new();
892 self->state_cond = g_cond_new();
893 self->ring_mutex = g_mutex_new();
894 self->ring_add_cond = g_cond_new();
895 self->ring_free_cond = g_cond_new();
896 self->part_slices_mutex = g_mutex_new();
900 self->part_header = NULL;
902 self->part_bytes_written = 0;
903 self->part_slices = NULL;
910 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(obj_self);
911 FileSlice *slice, *next_slice;
913 g_mutex_free(self->state_mutex);
914 g_cond_free(self->state_cond);
916 g_mutex_free(self->ring_mutex);
917 g_cond_free(self->ring_add_cond);
918 g_cond_free(self->ring_free_cond);
920 g_mutex_free(self->part_slices_mutex);
922 for (slice = self->part_slices; slice; slice = next_slice) {
923 next_slice = slice->next;
925 g_free(slice->filename);
929 if (self->ring_buffer)
930 g_free(self->ring_buffer);
932 if (self->part_header)
933 dumpfile_free(self->part_header);
936 g_object_unref(self->device);
939 G_OBJECT_CLASS(parent_class)->finalize(obj_self);
944 XferDestTaperSplitterClass * selfc)
946 XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
947 XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
948 GObjectClass *goc = G_OBJECT_CLASS(selfc);
949 static xfer_element_mech_pair_t mech_pairs[] = {
950 { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 1},
951 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
954 klass->start = start_impl;
955 klass->cancel = cancel_impl;
956 klass->push_buffer = push_buffer_impl;
957 xdt_klass->start_part = start_part_impl;
958 xdt_klass->use_device = use_device_impl;
959 xdt_klass->cache_inform = cache_inform_impl;
960 xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
961 goc->finalize = finalize_impl;
963 klass->perl_class = "Amanda::Xfer::Dest::Taper::Splitter";
964 klass->mech_pairs = mech_pairs;
966 parent_class = g_type_class_peek_parent(selfc);
970 xfer_dest_taper_splitter_get_type (void)
972 static GType type = 0;
974 if G_UNLIKELY(type == 0) {
975 static const GTypeInfo info = {
976 sizeof (XferDestTaperSplitterClass),
977 (GBaseInitFunc) NULL,
978 (GBaseFinalizeFunc) NULL,
979 (GClassInitFunc) class_init,
980 (GClassFinalizeFunc) NULL,
981 NULL /* class_data */,
982 sizeof (XferDestTaperSplitter),
984 (GInstanceInitFunc) instance_init,
988 type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperSplitter", &info, 0);
999 xfer_dest_taper_splitter(
1000 Device *first_device,
1003 gboolean expect_cache_inform)
1005 XferDestTaperSplitter *self = (XferDestTaperSplitter *)g_object_new(XFER_DEST_TAPER_SPLITTER_TYPE, NULL);
1008 /* max_memory and part_size get rounded up to the next multiple of
1010 max_memory = ((max_memory + first_device->block_size - 1)
1011 / first_device->block_size) * first_device->block_size;
1013 part_size = ((part_size + first_device->block_size - 1)
1014 / first_device->block_size) * first_device->block_size;
1016 self->part_size = part_size;
1018 self->device = first_device;
1020 g_object_ref(self->device);
1021 self->block_size = first_device->block_size;
1022 self->paused = TRUE;
1023 self->no_more_parts = FALSE;
1025 /* set up a ring buffer of size max_memory */
1026 self->ring_length = max_memory;
1027 self->ring_buffer = g_malloc(max_memory);
1028 self->ring_head = self->ring_tail = 0;
1029 self->ring_count = 0;
1030 self->ring_head_at_eof = 0;
1032 /* get this new device's streaming requirements */
1033 bzero(&val, sizeof(val));
1034 if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
1035 || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
1036 g_warning("Couldn't get streaming type for %s", self->device->device_name);
1037 self->streaming = STREAMING_REQUIREMENT_REQUIRED;
1039 self->streaming = g_value_get_enum(&val);
1041 g_value_unset(&val);
1043 /* grab data from cache_inform, just in case we hit PEOM */
1044 self->expect_cache_inform = expect_cache_inform;
1046 return XFER_ELEMENT(self);