2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
25 #include "xfer-device.h"
29 /* A transfer destination that writes an entire dumpfile to one or more files
30 * on one or more devices, without any caching. This destination supports both
31 * LEOM-based splitting (in which parts are never rewound) and cache_inform-based
32 * splitting (in which rewound parts are read from holding disk). */
35 * File Slices - Cache Information
37 * The cache_inform implementation adds cache information to a linked list of
38 * these objects, in order. The objects are arranged in a linked list, and
39 * describe the files in which the part data is stored. Note that we assume
40 * that slices are added *before* they are needed: the xfer element will fail
41 * if it tries to rewind and does not find a suitable slice.
43 * The slices should be "fast forwarded" after every part, so that the first
44 * byte in part_slices is the first byte of the part; when a retry of a part is
45 * required, use the iterator methods to properly open the various files and do
49 typedef struct FileSlice {
50 struct FileSlice *next;
52 /* fully-qualified filename to read from (or NULL to read from
53 * disk_cache_read_fd in XferDestTaperCacher) */
56 /* offset in file to start at */
59 /* length of data to read */
67 static GObjectClass *parent_class = NULL;
69 typedef struct XferDestTaperSplitter {
70 XferDestTaper __parent__;
74 * These values are supplied to the constructor, and can be assumed
75 * constant for the lifetime of the element.
78 /* Maximum size of each part (bytes) */
81 /* the device's need for streaming (it's assumed that all subsequent devices
82 * have the same needs) */
83 StreamingRequirement streaming;
85 /* block size expected by the target device */
88 /* TRUE if this element is expecting slices via cache_inform */
89 gboolean expect_cache_inform;
91 /* The thread doing the actual writes to tape; this also handles buffering
93 GThread *device_thread;
97 * This buffer holds MAX_MEMORY bytes of data (rounded up to the next
98 * blocksize), and serves as the interface between the device_thread and
99 * the thread calling push_buffer. Ring_length is the total length of the
100 * buffer in bytes, while ring_count is the number of data bytes currently
101 * in the buffer. The ring_add_cond is signalled when data is added to the
102 * buffer, while ring_free_cond is signalled when data is removed. Both
103 * are governed by ring_mutex, and both are signalled when the transfer is
108 GCond *ring_add_cond, *ring_free_cond;
110 gsize ring_length, ring_count;
111 gsize ring_head, ring_tail;
112 gboolean ring_head_at_eof;
116 * "state" includes all of the variables below (including device
117 * parameters). Note that the device_thread holdes this mutex for the
118 * entire duration of writing a part.
120 * state_mutex should always be locked before ring_mutex, if both are to be
121 * held simultaneously.
125 volatile gboolean paused;
127 /* The device to write to, and the header to write to it */
128 Device *volatile device;
129 dumpfile_t *volatile part_header;
131 /* bytes to read from cached slices before reading from the ring buffer */
132 guint64 bytes_to_read_from_slices;
134 /* part number in progress */
135 volatile guint64 partnum;
137 /* status of the last part */
138 gboolean last_part_eof;
139 gboolean last_part_eom;
140 gboolean last_part_successful;
142 /* true if the element is done writing to devices */
143 gboolean no_more_parts;
145 /* total bytes written in the current part */
146 volatile guint64 part_bytes_written;
148 /* The list of active slices for the current part. The cache_inform method
149 * appends to this list. It is safe to read this linked list, beginning at
150 * the head, *if* you can guarantee that slices will not be fast-forwarded
151 * in the interim. The finalize method for this class will take care of
152 * freeing any leftover slices. Take the part_slices mutex while modifying
153 * the links in this list. */
154 FileSlice *part_slices;
155 GMutex *part_slices_mutex;
156 } XferDestTaperSplitter;
158 static GType xfer_dest_taper_splitter_get_type(void);
159 #define XFER_DEST_TAPER_SPLITTER_TYPE (xfer_dest_taper_splitter_get_type())
160 #define XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter)
161 #define XFER_DEST_TAPER_SPLITTER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter const)
162 #define XFER_DEST_TAPER_SPLITTER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
163 #define IS_XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_splitter_get_type ())
164 #define XFER_DEST_TAPER_SPLITTER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
167 XferDestTaperClass __parent__;
169 } XferDestTaperSplitterClass;
175 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
177 _xdt_dbg(const char *fmt, ...)
182 arglist_start(argp, fmt);
183 g_vsnprintf(msg, sizeof(msg), fmt, argp);
185 g_debug("XDT: %s", msg);
188 /* "Fast forward" the slice list by the given length. This will free any
189 * slices that are no longer necessary, and adjust the offset and length of the
190 * first remaining slice. This assumes the state mutex is locked during its
193 * @param self: element
194 * @param length: number of bytes to fast forward
198 XferDestTaperSplitter *self,
203 /* consume slices until we've eaten the whole part */
204 g_mutex_lock(self->part_slices_mutex);
206 g_assert(self->part_slices);
207 slice = self->part_slices;
209 if (slice->length <= length) {
210 length -= slice->length;
212 self->part_slices = slice->next;
214 g_free(slice->filename);
216 slice = self->part_slices;
218 slice->length -= length;
219 slice->offset += length;
223 g_mutex_unlock(self->part_slices_mutex);
230 /* A struct for use in iterating over data in the slices */
231 typedef struct SliceIterator {
235 /* file descriptor of the current file, or -1 if it's not open yet */
238 /* bytes remaining in this slice */
239 guint64 slice_remaining;
242 /* Utility functions for SliceIterator */
244 /* Begin iterating over slices, starting at the first byte of the first slice.
245 * Initializes a pre-allocated SliceIterator. The caller must ensure that
246 * fast_forward_slices is not called while an iteration is in
251 XferDestTaperSplitter *self,
255 iter->slice_remaining = 0;
256 g_mutex_lock(self->part_slices_mutex);
257 iter->slice = self->part_slices;
258 /* it's safe to unlock this because, at worst, a new entry will
259 * be appended while the iterator is in progress */
260 g_mutex_unlock(self->part_slices_mutex);
264 /* Get a block of data from the iterator, returning a pointer to a buffer
265 * containing the data; the buffer remains the property of the iterator.
266 * Returns NULL on error, after calling xfer_cancel_with_error with an
267 * appropriate error message. This function does not block, so it does not
268 * check for cancellation.
272 XferDestTaperSplitter *self,
277 gsize buf_offset = 0;
278 XferElement *elt = XFER_ELEMENT(self);
280 g_assert(iter != NULL);
281 g_assert(buf != NULL);
283 while (bytes_needed > 0) {
287 if (iter->cur_fd < 0) {
290 g_assert(iter->slice != NULL);
291 g_assert(iter->slice->filename != NULL);
293 iter->cur_fd = open(iter->slice->filename, O_RDONLY, 0);
294 if (iter->cur_fd < 0) {
295 xfer_cancel_with_error(elt,
296 _("Could not open '%s' for reading: %s"),
297 iter->slice->filename, strerror(errno));
301 iter->slice_remaining = iter->slice->length;
302 offset = iter->slice->offset;
304 if (lseek(iter->cur_fd, offset, SEEK_SET) == -1) {
305 xfer_cancel_with_error(elt,
306 _("Could not seek '%s' for reading: %s"),
307 iter->slice->filename, strerror(errno));
312 read_size = MIN(iter->slice_remaining, bytes_needed);
313 bytes_read = full_read(iter->cur_fd,
316 if (bytes_read < 0 || (gsize)bytes_read < read_size) {
317 xfer_cancel_with_error(elt,
318 _("Error reading '%s': %s"),
319 iter->slice->filename,
320 errno? strerror(errno) : _("Unexpected EOF"));
324 iter->slice_remaining -= bytes_read;
325 buf_offset += bytes_read;
326 bytes_needed -= bytes_read;
328 if (iter->slice_remaining <= 0) {
329 if (close(iter->cur_fd) < 0) {
330 xfer_cancel_with_error(elt,
331 _("Could not close fd %d: %s"),
332 iter->cur_fd, strerror(errno));
337 iter->slice = iter->slice->next;
348 /* Free the iterator's resources */
353 if (iter->cur_fd >= 0)
361 /* Wait for at least one block, or EOF, to be available in the ring buffer.
362 * Called with the ring mutex held. */
364 device_thread_wait_for_block(
365 XferDestTaperSplitter *self)
367 XferElement *elt = XFER_ELEMENT(self);
368 gsize bytes_needed = self->device->block_size;
371 /* for any kind of streaming, we need to fill the entire buffer before the
373 if (self->part_bytes_written == 0 && self->streaming != STREAMING_REQUIREMENT_NONE)
374 bytes_needed = self->ring_length;
381 if (self->ring_count >= bytes_needed)
384 if (self->ring_head_at_eof)
388 g_cond_wait(self->ring_add_cond, self->ring_mutex);
390 /* in STREAMING_REQUIREMENT_REQUIRED, once we decide to wait for more bytes,
391 * we need to wait for the entire buffer to fill */
392 if (self->streaming == STREAMING_REQUIREMENT_REQUIRED)
393 bytes_needed = self->ring_length;
396 usable = MIN(self->ring_count, bytes_needed);
398 usable = MIN(usable, self->part_size - self->part_bytes_written);
403 /* Mark WRITTEN bytes as free in the ring buffer. Called with the ring mutex
406 device_thread_consume_block(
407 XferDestTaperSplitter *self,
410 self->ring_count -= written;
411 self->ring_tail += written;
412 if (self->ring_tail >= self->ring_length)
413 self->ring_tail -= self->ring_length;
414 g_cond_broadcast(self->ring_free_cond);
417 /* Write an entire part. Called with the state_mutex held */
419 device_thread_write_part(
420 XferDestTaperSplitter *self)
422 GTimer *timer = g_timer_new();
423 XferElement *elt = XFER_ELEMENT(self);
425 enum { PART_EOF, PART_LEOM, PART_EOP, PART_FAILED } part_status = PART_FAILED;
429 self->part_bytes_written = 0;
431 g_timer_start(timer);
433 /* write the header; if this fails or hits LEOM, we consider this a
434 * successful 0-byte part */
435 if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
436 part_status = PART_LEOM;
440 fileno = self->device->file;
441 g_assert(fileno > 0);
443 /* free the header, now that it's written */
444 dumpfile_free(self->part_header);
445 self->part_header = NULL;
447 /* First, read the requisite number of bytes from the part_slices, if the part was
449 if (self->bytes_to_read_from_slices) {
451 gsize to_write = self->block_size;
452 gpointer buf = g_malloc(to_write);
453 gboolean successful = TRUE;
454 guint64 bytes_from_slices = self->bytes_to_read_from_slices;
456 DBG(5, "reading %ju bytes from slices", (uintmax_t)bytes_from_slices);
458 iterate_slices(self, &iter);
459 while (bytes_from_slices) {
462 if (!iterator_get_block(self, &iter, buf, to_write)) {
463 part_status = PART_FAILED;
468 /* note that it's OK to reference these ring_* vars here, as they
469 * are static at this point */
470 ok = device_write_block(self->device, (guint)to_write, buf);
473 part_status = PART_FAILED;
478 self->part_bytes_written += to_write;
479 bytes_from_slices -= to_write;
481 if (self->part_size && self->part_bytes_written >= self->part_size) {
482 part_status = PART_EOP;
485 } else if (self->device->is_eom) {
486 part_status = PART_LEOM;
492 iterator_free(&iter);
495 /* if we didn't finish, get out of here now */
500 g_mutex_lock(self->ring_mutex);
505 /* wait for at least one block, and (if necessary) prebuffer */
506 to_write = device_thread_wait_for_block(self);
507 to_write = MIN(to_write, self->device->block_size);
512 part_status = PART_EOF;
516 g_mutex_unlock(self->ring_mutex);
517 DBG(8, "writing %ju bytes to device", (uintmax_t)to_write);
519 /* note that it's OK to reference these ring_* vars here, as they
520 * are static at this point */
521 ok = device_write_block(self->device, (guint)to_write,
522 self->ring_buffer + self->ring_tail);
523 g_mutex_lock(self->ring_mutex);
526 part_status = PART_FAILED;
530 self->part_bytes_written += to_write;
531 device_thread_consume_block(self, to_write);
533 if (self->part_size && self->part_bytes_written >= self->part_size) {
534 part_status = PART_EOP;
536 } else if (self->device->is_eom) {
537 part_status = PART_LEOM;
541 g_mutex_unlock(self->ring_mutex);
544 /* if we write all of the blocks, but the finish_file fails, then likely
545 * there was some buffering going on in the device driver, and the blocks
546 * did not all make it to permanent storage -- so it's a failed part. Note
547 * that we try to finish_file even if the part failed, just to be thorough. */
548 if (self->device->in_file) {
549 if (!device_finish_file(self->device))
550 if (!elt->cancelled) {
551 part_status = PART_FAILED;
557 msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
558 msg->size = self->part_bytes_written;
559 msg->duration = g_timer_elapsed(timer, NULL);
560 msg->partnum = self->partnum;
561 msg->fileno = fileno;
562 msg->successful = self->last_part_successful = part_status != PART_FAILED;
563 msg->eom = self->last_part_eom = part_status == PART_LEOM || self->device->is_eom;
564 msg->eof = self->last_part_eof = part_status == PART_EOF;
566 /* time runs backward on some test boxes, so make sure this is positive */
567 if (msg->duration < 0) msg->duration = 0;
569 if (msg->successful && msg->size > 0)
571 self->no_more_parts = msg->eof || (!msg->successful && !self->expect_cache_inform);
573 g_timer_destroy(timer);
582 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
583 XferElement *elt = XFER_ELEMENT(self);
586 DBG(1, "(this is the device thread)");
588 /* This is the outer loop, that loops once for each split part written to
590 g_mutex_lock(self->state_mutex);
592 /* wait until the main thread un-pauses us, and check that we have
593 * the relevant device info available (block_size) */
594 while (self->paused && !elt->cancelled) {
595 DBG(9, "waiting to be unpaused");
596 g_cond_wait(self->state_cond, self->state_mutex);
598 DBG(9, "done waiting");
603 DBG(2, "beginning to write part");
604 msg = device_thread_write_part(self);
605 DBG(2, "done writing part");
607 if (!msg) /* cancelled */
610 /* release the slices for this part, if there were any slices */
611 if (msg->successful && self->expect_cache_inform) {
612 fast_forward_slices(self, msg->size);
615 xfer_queue_message(elt->xfer, msg);
617 /* pause ourselves and await instructions from the main thread */
620 /* if this is the last part, we're done with the part loop */
621 if (self->no_more_parts)
624 g_mutex_unlock(self->state_mutex);
626 /* tell the main thread we're done */
627 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
642 XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
645 DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
647 /* do nothing if cancelled */
648 if (G_UNLIKELY(elt->cancelled)) {
649 goto free_and_finish;
653 if (G_UNLIKELY(buf == NULL)) {
654 /* indicate EOF to the device thread */
655 g_mutex_lock(self->ring_mutex);
656 self->ring_head_at_eof = TRUE;
657 g_cond_broadcast(self->ring_add_cond);
658 g_mutex_unlock(self->ring_mutex);
659 goto free_and_finish;
662 /* push the block into the ring buffer, in pieces if necessary */
663 g_mutex_lock(self->ring_mutex);
667 /* wait for some space */
668 while (self->ring_count == self->ring_length && !elt->cancelled) {
669 DBG(9, "waiting for any space to buffer pushed data");
670 g_cond_wait(self->ring_free_cond, self->ring_mutex);
672 DBG(9, "done waiting");
675 goto unlock_and_free_and_finish;
677 /* only copy to the end of the buffer, if the available space wraps
678 * around to the beginning */
679 avail = MIN(size, self->ring_length - self->ring_count);
680 avail = MIN(avail, self->ring_length - self->ring_head);
682 /* copy AVAIL bytes into the ring buf (knowing it's contiguous) */
683 memmove(self->ring_buffer + self->ring_head, p, avail);
685 /* reset the ring variables to represent this state */
686 self->ring_count += avail;
687 self->ring_head += avail; /* will, at most, hit ring_length */
688 if (self->ring_head == self->ring_length)
690 p = (gpointer)((guchar *)p + avail);
693 /* and give the device thread a notice that data is ready */
694 g_cond_broadcast(self->ring_add_cond);
697 unlock_and_free_and_finish:
698 g_mutex_unlock(self->ring_mutex);
713 XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
714 GError *error = NULL;
716 self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
717 if (!self->device_thread) {
718 g_critical(_("Error creating new thread: %s (%s)"),
719 error->message, errno? strerror(errno) : _("no error code"));
730 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
734 rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
736 /* then signal all of our condition variables, so that threads waiting on them
737 * wake up and see elt->cancelled. */
738 g_mutex_lock(self->ring_mutex);
739 g_cond_broadcast(self->ring_add_cond);
740 g_cond_broadcast(self->ring_free_cond);
741 g_mutex_unlock(self->ring_mutex);
743 g_mutex_lock(self->state_mutex);
744 g_cond_broadcast(self->state_cond);
745 g_mutex_unlock(self->state_mutex);
756 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
758 g_assert(self->device != NULL);
759 g_assert(!self->device->in_file);
760 g_assert(header != NULL);
762 DBG(1, "start_part()");
764 /* we can only retry the part if we're getting slices via cache_inform's */
766 if (self->last_part_successful) {
767 xfer_cancel_with_error(XFER_ELEMENT(self),
768 _("Previous part did not fail; cannot retry"));
772 if (!self->expect_cache_inform) {
773 xfer_cancel_with_error(XFER_ELEMENT(self),
774 _("No cache for previous failed part; cannot retry"));
778 self->bytes_to_read_from_slices = self->part_bytes_written;
780 /* don't read any bytes from the slices, since we're not retrying */
781 self->bytes_to_read_from_slices = 0;
784 g_mutex_lock(self->state_mutex);
785 g_assert(self->paused);
786 g_assert(!self->no_more_parts);
788 if (self->part_header)
789 dumpfile_free(self->part_header);
790 self->part_header = dumpfile_copy(header);
793 self->paused = FALSE;
794 g_cond_broadcast(self->state_cond);
796 g_mutex_unlock(self->state_mutex);
801 XferDestTaper *xdtself,
804 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
805 StreamingRequirement newstreaming;
808 DBG(1, "use_device(%s)%s", device->device_name, (device == self->device)? " (no change)":"");
810 /* short-circuit if nothing is changing */
811 if (self->device == device)
814 g_mutex_lock(self->state_mutex);
816 g_object_unref(self->device);
817 self->device = device;
818 g_object_ref(device);
820 /* get this new device's streaming requirements */
821 bzero(&val, sizeof(val));
822 if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
823 || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
824 g_warning("Couldn't get streaming type for %s", self->device->device_name);
826 newstreaming = g_value_get_enum(&val);
827 if (newstreaming != self->streaming)
828 g_warning("New device has different streaming requirements from the original; "
829 "ignoring new requirement");
833 /* check that the blocksize hasn't changed */
834 if (self->block_size != device->block_size) {
835 g_mutex_unlock(self->state_mutex);
836 xfer_cancel_with_error(XFER_ELEMENT(self),
837 _("All devices used by the taper must have the same block size"));
840 g_mutex_unlock(self->state_mutex);
846 const char *filename,
850 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
851 FileSlice *slice = g_new(FileSlice, 1), *iter;
854 slice->filename = g_strdup(filename);
855 slice->offset = offset;
856 slice->length = length;
858 g_mutex_lock(self->part_slices_mutex);
859 if (self->part_slices) {
860 for (iter = self->part_slices; iter->next; iter = iter->next) {}
863 self->part_slices = slice;
865 g_mutex_unlock(self->part_slices_mutex);
869 get_part_bytes_written_impl(
870 XferDestTaper *xdtself)
872 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
874 /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
875 * partial write to the 64-bit value on a 32-bit system). This is ok for
876 * the moment, as it's only informational, but be warned. */
878 return device_get_bytes_written(self->device);
880 return self->part_bytes_written;
888 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
889 elt->can_generate_eof = FALSE;
891 self->state_mutex = g_mutex_new();
892 self->state_cond = g_cond_new();
893 self->ring_mutex = g_mutex_new();
894 self->ring_add_cond = g_cond_new();
895 self->ring_free_cond = g_cond_new();
896 self->part_slices_mutex = g_mutex_new();
900 self->part_header = NULL;
902 self->part_bytes_written = 0;
903 self->part_slices = NULL;
910 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(obj_self);
911 FileSlice *slice, *next_slice;
913 g_mutex_free(self->state_mutex);
914 g_cond_free(self->state_cond);
916 g_mutex_free(self->ring_mutex);
917 g_cond_free(self->ring_add_cond);
918 g_cond_free(self->ring_free_cond);
920 g_mutex_free(self->part_slices_mutex);
922 for (slice = self->part_slices; slice; slice = next_slice) {
923 next_slice = slice->next;
925 g_free(slice->filename);
929 if (self->ring_buffer)
930 g_free(self->ring_buffer);
932 if (self->part_header)
933 dumpfile_free(self->part_header);
936 g_object_unref(self->device);
939 G_OBJECT_CLASS(parent_class)->finalize(obj_self);
944 XferDestTaperSplitterClass * selfc)
946 XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
947 XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
948 GObjectClass *goc = G_OBJECT_CLASS(selfc);
949 static xfer_element_mech_pair_t mech_pairs[] = {
950 { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 1},
951 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
954 klass->start = start_impl;
955 klass->cancel = cancel_impl;
956 klass->push_buffer = push_buffer_impl;
957 xdt_klass->start_part = start_part_impl;
958 xdt_klass->use_device = use_device_impl;
959 xdt_klass->cache_inform = cache_inform_impl;
960 xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
961 goc->finalize = finalize_impl;
963 klass->perl_class = "Amanda::Xfer::Dest::Taper::Splitter";
964 klass->mech_pairs = mech_pairs;
966 parent_class = g_type_class_peek_parent(selfc);
970 xfer_dest_taper_splitter_get_type (void)
972 static GType type = 0;
974 if G_UNLIKELY(type == 0) {
975 static const GTypeInfo info = {
976 sizeof (XferDestTaperSplitterClass),
977 (GBaseInitFunc) NULL,
978 (GBaseFinalizeFunc) NULL,
979 (GClassInitFunc) class_init,
980 (GClassFinalizeFunc) NULL,
981 NULL /* class_data */,
982 sizeof (XferDestTaperSplitter),
984 (GInstanceInitFunc) instance_init,
988 type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperSplitter", &info, 0);
999 xfer_dest_taper_splitter(
1000 Device *first_device,
1003 gboolean expect_cache_inform)
1005 XferDestTaperSplitter *self = (XferDestTaperSplitter *)g_object_new(XFER_DEST_TAPER_SPLITTER_TYPE, NULL);
1008 /* max_memory and part_size get rounded up to the next multiple of
1010 max_memory = ((max_memory + first_device->block_size - 1)
1011 / first_device->block_size) * first_device->block_size;
1013 part_size = ((part_size + first_device->block_size - 1)
1014 / first_device->block_size) * first_device->block_size;
1016 self->part_size = part_size;
1018 self->device = first_device;
1020 g_object_ref(self->device);
1021 self->block_size = first_device->block_size;
1022 self->paused = TRUE;
1023 self->no_more_parts = FALSE;
1025 /* set up a ring buffer of size max_memory */
1026 self->ring_length = max_memory;
1027 self->ring_buffer = g_malloc(max_memory);
1028 self->ring_head = self->ring_tail = 0;
1029 self->ring_count = 0;
1030 self->ring_head_at_eof = 0;
1032 /* get this new device's streaming requirements */
1033 bzero(&val, sizeof(val));
1034 if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
1035 || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
1036 g_warning("Couldn't get streaming type for %s", self->device->device_name);
1037 self->streaming = STREAMING_REQUIREMENT_REQUIRED;
1039 self->streaming = g_value_get_enum(&val);
1041 g_value_unset(&val);
1043 /* grab data from cache_inform, just in case we hit PEOM */
1044 self->expect_cache_inform = expect_cache_inform;
1046 return XFER_ELEMENT(self);