2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 as published
7 * by the Free Software Foundation.
9 * This program is distributed in the hope that it will be useful, but
10 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * You should have received a copy of the GNU General Public License along
15 * with this program; if not, write to the Free Software Foundation, Inc.,
16 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
18 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
24 #include "xfer-device.h"
28 /* A transfer destination that writes an entire dumpfile to one or more files
29 * on one or more devices, without any caching. This destination supports both
30 * LEOM-based splitting (in which parts are never rewound) and cache_inform-based
31 * splitting (in which rewound parts are read from holding disk). */
34 * File Slices - Cache Information
36 * The cache_inform implementation adds cache information to a linked list of
37 * these objects, in order. The objects are arranged in a linked list, and
38 * describe the files in which the part data is stored. Note that we assume
39 * that slices are added *before* they are needed: the xfer element will fail
40 * if it tries to rewind and does not find a suitable slice.
42 * The slices should be "fast forwarded" after every part, so that the first
43 * byte in part_slices is the first byte of the part; when a retry of a part is
44 * required, use the iterator methods to properly open the various files and do
48 typedef struct FileSlice {
49 struct FileSlice *next;
51 /* fully-qualified filename to read from (or NULL to read from
52 * disk_cache_read_fd in XferDestTaperCacher) */
55 /* offset in file to start at */
58 /* length of data to read */
66 static GObjectClass *parent_class = NULL;
68 typedef struct XferDestTaperSplitter {
69 XferDestTaper __parent__;
73 * These values are supplied to the constructor, and can be assumed
74 * constant for the lifetime of the element.
77 /* Maximum size of each part (bytes) */
80 /* the device's need for streaming (it's assumed that all subsequent devices
81 * have the same needs) */
82 StreamingRequirement streaming;
84 /* block size expected by the target device */
87 /* TRUE if this element is expecting slices via cache_inform */
88 gboolean expect_cache_inform;
90 /* The thread doing the actual writes to tape; this also handles buffering
92 GThread *device_thread;
96 * This buffer holds MAX_MEMORY bytes of data (rounded up to the next
97 * blocksize), and serves as the interface between the device_thread and
98 * the thread calling push_buffer. Ring_length is the total length of the
99 * buffer in bytes, while ring_count is the number of data bytes currently
100 * in the buffer. The ring_add_cond is signalled when data is added to the
101 * buffer, while ring_free_cond is signalled when data is removed. Both
102 * are governed by ring_mutex, and both are signalled when the transfer is
107 GCond *ring_add_cond, *ring_free_cond;
109 gsize ring_length, ring_count;
110 gsize ring_head, ring_tail;
111 gboolean ring_head_at_eof;
115 * "state" includes all of the variables below (including device
116 * parameters). Note that the device_thread holdes this mutex for the
117 * entire duration of writing a part.
119 * state_mutex should always be locked before ring_mutex, if both are to be
120 * held simultaneously.
124 volatile gboolean paused;
126 /* The device to write to, and the header to write to it */
127 Device *volatile device;
128 dumpfile_t *volatile part_header;
130 /* bytes to read from cached slices before reading from the ring buffer */
131 guint64 bytes_to_read_from_slices;
133 /* part number in progress */
134 volatile guint64 partnum;
136 /* status of the last part */
137 gboolean last_part_eof;
138 gboolean last_part_eom;
139 gboolean last_part_successful;
141 /* true if the element is done writing to devices */
142 gboolean no_more_parts;
144 /* total bytes written in the current part */
145 volatile guint64 part_bytes_written;
147 /* The list of active slices for the current part. The cache_inform method
148 * appends to this list. It is safe to read this linked list, beginning at
149 * the head, *if* you can guarantee that slices will not be fast-forwarded
150 * in the interim. The finalize method for this class will take care of
151 * freeing any leftover slices. Take the part_slices mutex while modifying
152 * the links in this list. */
153 FileSlice *part_slices;
154 GMutex *part_slices_mutex;
155 } XferDestTaperSplitter;
157 static GType xfer_dest_taper_splitter_get_type(void);
158 #define XFER_DEST_TAPER_SPLITTER_TYPE (xfer_dest_taper_splitter_get_type())
159 #define XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter)
160 #define XFER_DEST_TAPER_SPLITTER_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitter const)
161 #define XFER_DEST_TAPER_SPLITTER_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
162 #define IS_XFER_DEST_TAPER_SPLITTER(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_dest_taper_splitter_get_type ())
163 #define XFER_DEST_TAPER_SPLITTER_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_dest_taper_splitter_get_type(), XferDestTaperSplitterClass)
166 XferDestTaperClass __parent__;
168 } XferDestTaperSplitterClass;
174 #define DBG(LEVEL, ...) if (debug_taper >= LEVEL) { _xdt_dbg(__VA_ARGS__); }
176 _xdt_dbg(const char *fmt, ...)
181 arglist_start(argp, fmt);
182 g_vsnprintf(msg, sizeof(msg), fmt, argp);
184 g_debug("XDT: %s", msg);
187 /* "Fast forward" the slice list by the given length. This will free any
188 * slices that are no longer necessary, and adjust the offset and length of the
189 * first remaining slice. This assumes the state mutex is locked during its
192 * @param self: element
193 * @param length: number of bytes to fast forward
197 XferDestTaperSplitter *self,
202 /* consume slices until we've eaten the whole part */
203 g_mutex_lock(self->part_slices_mutex);
205 g_assert(self->part_slices);
206 slice = self->part_slices;
208 if (slice->length <= length) {
209 length -= slice->length;
211 self->part_slices = slice->next;
213 g_free(slice->filename);
215 slice = self->part_slices;
217 slice->length -= length;
218 slice->offset += length;
222 g_mutex_unlock(self->part_slices_mutex);
229 /* A struct for use in iterating over data in the slices */
230 typedef struct SliceIterator {
234 /* file descriptor of the current file, or -1 if it's not open yet */
237 /* bytes remaining in this slice */
238 guint64 slice_remaining;
241 /* Utility functions for SliceIterator */
243 /* Begin iterating over slices, starting at the first byte of the first slice.
244 * Initializes a pre-allocated SliceIterator. The caller must ensure that
245 * fast_forward_slices is not called while an iteration is in
250 XferDestTaperSplitter *self,
254 iter->slice_remaining = 0;
255 g_mutex_lock(self->part_slices_mutex);
256 iter->slice = self->part_slices;
257 /* it's safe to unlock this because, at worst, a new entry will
258 * be appended while the iterator is in progress */
259 g_mutex_unlock(self->part_slices_mutex);
263 /* Get a block of data from the iterator, returning a pointer to a buffer
264 * containing the data; the buffer remains the property of the iterator.
265 * Returns NULL on error, after calling xfer_cancel_with_error with an
266 * appropriate error message. This function does not block, so it does not
267 * check for cancellation.
271 XferDestTaperSplitter *self,
276 gsize buf_offset = 0;
277 XferElement *elt = XFER_ELEMENT(self);
279 g_assert(iter != NULL);
280 g_assert(buf != NULL);
282 while (bytes_needed > 0) {
286 if (iter->cur_fd < 0) {
289 g_assert(iter->slice != NULL);
290 g_assert(iter->slice->filename != NULL);
292 iter->cur_fd = open(iter->slice->filename, O_RDONLY, 0);
293 if (iter->cur_fd < 0) {
294 xfer_cancel_with_error(elt,
295 _("Could not open '%s' for reading: %s"),
296 iter->slice->filename, strerror(errno));
300 iter->slice_remaining = iter->slice->length;
301 offset = iter->slice->offset;
303 if (lseek(iter->cur_fd, offset, SEEK_SET) == -1) {
304 xfer_cancel_with_error(elt,
305 _("Could not seek '%s' for reading: %s"),
306 iter->slice->filename, strerror(errno));
311 read_size = MIN(iter->slice_remaining, bytes_needed);
312 bytes_read = full_read(iter->cur_fd,
315 if (bytes_read < 0 || (gsize)bytes_read < read_size) {
316 xfer_cancel_with_error(elt,
317 _("Error reading '%s': %s"),
318 iter->slice->filename,
319 errno? strerror(errno) : _("Unexpected EOF"));
323 iter->slice_remaining -= bytes_read;
324 buf_offset += bytes_read;
325 bytes_needed -= bytes_read;
327 if (iter->slice_remaining <= 0) {
328 if (close(iter->cur_fd) < 0) {
329 xfer_cancel_with_error(elt,
330 _("Could not close fd %d: %s"),
331 iter->cur_fd, strerror(errno));
336 iter->slice = iter->slice->next;
347 /* Free the iterator's resources */
352 if (iter->cur_fd >= 0)
360 /* Wait for at least one block, or EOF, to be available in the ring buffer.
361 * Called with the ring mutex held. */
363 device_thread_wait_for_block(
364 XferDestTaperSplitter *self)
366 XferElement *elt = XFER_ELEMENT(self);
367 gsize bytes_needed = self->device->block_size;
370 /* for any kind of streaming, we need to fill the entire buffer before the
372 if (self->part_bytes_written == 0 && self->streaming != STREAMING_REQUIREMENT_NONE)
373 bytes_needed = self->ring_length;
380 if (self->ring_count >= bytes_needed)
383 if (self->ring_head_at_eof)
387 g_cond_wait(self->ring_add_cond, self->ring_mutex);
389 /* in STREAMING_REQUIREMENT_REQUIRED, once we decide to wait for more bytes,
390 * we need to wait for the entire buffer to fill */
391 if (self->streaming == STREAMING_REQUIREMENT_REQUIRED)
392 bytes_needed = self->ring_length;
395 usable = MIN(self->ring_count, bytes_needed);
397 usable = MIN(usable, self->part_size - self->part_bytes_written);
402 /* Mark WRITTEN bytes as free in the ring buffer. Called with the ring mutex
405 device_thread_consume_block(
406 XferDestTaperSplitter *self,
409 self->ring_count -= written;
410 self->ring_tail += written;
411 if (self->ring_tail >= self->ring_length)
412 self->ring_tail -= self->ring_length;
413 g_cond_broadcast(self->ring_free_cond);
416 /* Write an entire part. Called with the state_mutex held */
418 device_thread_write_part(
419 XferDestTaperSplitter *self)
421 GTimer *timer = g_timer_new();
422 XferElement *elt = XFER_ELEMENT(self);
424 enum { PART_EOF, PART_LEOM, PART_EOP, PART_FAILED } part_status = PART_FAILED;
428 self->part_bytes_written = 0;
430 g_timer_start(timer);
432 /* write the header; if this fails or hits LEOM, we consider this a
433 * successful 0-byte part */
434 if (!device_start_file(self->device, self->part_header) || self->device->is_eom) {
435 part_status = PART_LEOM;
439 fileno = self->device->file;
440 g_assert(fileno > 0);
442 /* free the header, now that it's written */
443 dumpfile_free(self->part_header);
444 self->part_header = NULL;
446 /* First, read the requisite number of bytes from the part_slices, if the part was
448 if (self->bytes_to_read_from_slices) {
450 gsize to_write = self->block_size;
451 gpointer buf = g_malloc(to_write);
452 gboolean successful = TRUE;
453 guint64 bytes_from_slices = self->bytes_to_read_from_slices;
455 DBG(5, "reading %ju bytes from slices", (uintmax_t)bytes_from_slices);
457 iterate_slices(self, &iter);
458 while (bytes_from_slices) {
461 if (!iterator_get_block(self, &iter, buf, to_write)) {
462 part_status = PART_FAILED;
467 /* note that it's OK to reference these ring_* vars here, as they
468 * are static at this point */
469 ok = device_write_block(self->device, (guint)to_write, buf);
472 part_status = PART_FAILED;
477 self->part_bytes_written += to_write;
478 bytes_from_slices -= to_write;
480 if (self->part_size && self->part_bytes_written >= self->part_size) {
481 part_status = PART_EOP;
484 } else if (self->device->is_eom) {
485 part_status = PART_LEOM;
491 iterator_free(&iter);
494 /* if we didn't finish, get out of here now */
499 g_mutex_lock(self->ring_mutex);
504 /* wait for at least one block, and (if necessary) prebuffer */
505 to_write = device_thread_wait_for_block(self);
506 to_write = MIN(to_write, self->device->block_size);
511 part_status = PART_EOF;
515 g_mutex_unlock(self->ring_mutex);
516 DBG(8, "writing %ju bytes to device", (uintmax_t)to_write);
518 /* note that it's OK to reference these ring_* vars here, as they
519 * are static at this point */
520 ok = device_write_block(self->device, (guint)to_write,
521 self->ring_buffer + self->ring_tail);
522 g_mutex_lock(self->ring_mutex);
525 part_status = PART_FAILED;
529 self->part_bytes_written += to_write;
530 device_thread_consume_block(self, to_write);
532 if (self->part_size && self->part_bytes_written >= self->part_size) {
533 part_status = PART_EOP;
535 } else if (self->device->is_eom) {
536 part_status = PART_LEOM;
540 g_mutex_unlock(self->ring_mutex);
543 /* if we write all of the blocks, but the finish_file fails, then likely
544 * there was some buffering going on in the device driver, and the blocks
545 * did not all make it to permanent storage -- so it's a failed part. Note
546 * that we try to finish_file even if the part failed, just to be thorough. */
547 if (self->device->in_file) {
548 if (!device_finish_file(self->device))
549 if (!elt->cancelled) {
550 part_status = PART_FAILED;
556 msg = xmsg_new(XFER_ELEMENT(self), XMSG_PART_DONE, 0);
557 msg->size = self->part_bytes_written;
558 msg->duration = g_timer_elapsed(timer, NULL);
559 msg->partnum = self->partnum;
560 msg->fileno = fileno;
561 msg->successful = self->last_part_successful = part_status != PART_FAILED;
562 msg->eom = self->last_part_eom = part_status == PART_LEOM || self->device->is_eom;
563 msg->eof = self->last_part_eof = part_status == PART_EOF;
565 /* time runs backward on some test boxes, so make sure this is positive */
566 if (msg->duration < 0) msg->duration = 0;
570 self->no_more_parts = msg->eof || (!msg->successful && !self->expect_cache_inform);
572 g_timer_destroy(timer);
581 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(data);
582 XferElement *elt = XFER_ELEMENT(self);
585 DBG(1, "(this is the device thread)");
587 /* This is the outer loop, that loops once for each split part written to
589 g_mutex_lock(self->state_mutex);
591 /* wait until the main thread un-pauses us, and check that we have
592 * the relevant device info available (block_size) */
593 while (self->paused && !elt->cancelled) {
594 DBG(9, "waiting to be unpaused");
595 g_cond_wait(self->state_cond, self->state_mutex);
597 DBG(9, "done waiting");
602 DBG(2, "beginning to write part");
603 msg = device_thread_write_part(self);
604 DBG(2, "done writing part");
606 if (!msg) /* cancelled */
609 /* release the slices for this part, if there were any slices */
610 if (msg->successful && self->expect_cache_inform) {
611 fast_forward_slices(self, msg->size);
614 xfer_queue_message(elt->xfer, msg);
616 /* pause ourselves and await instructions from the main thread */
619 /* if this is the last part, we're done with the part loop */
620 if (self->no_more_parts)
623 g_mutex_unlock(self->state_mutex);
625 /* tell the main thread we're done */
626 xfer_queue_message(XFER_ELEMENT(self)->xfer, xmsg_new(XFER_ELEMENT(self), XMSG_DONE, 0));
641 XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
644 DBG(3, "push_buffer(%p, %ju)", buf, (uintmax_t)size);
646 /* do nothing if cancelled */
647 if (G_UNLIKELY(elt->cancelled)) {
648 goto free_and_finish;
652 if (G_UNLIKELY(buf == NULL)) {
653 /* indicate EOF to the device thread */
654 g_mutex_lock(self->ring_mutex);
655 self->ring_head_at_eof = TRUE;
656 g_cond_broadcast(self->ring_add_cond);
657 g_mutex_unlock(self->ring_mutex);
658 goto free_and_finish;
661 /* push the block into the ring buffer, in pieces if necessary */
662 g_mutex_lock(self->ring_mutex);
666 /* wait for some space */
667 while (self->ring_count == self->ring_length && !elt->cancelled) {
668 DBG(9, "waiting for any space to buffer pushed data");
669 g_cond_wait(self->ring_free_cond, self->ring_mutex);
671 DBG(9, "done waiting");
674 goto unlock_and_free_and_finish;
676 /* only copy to the end of the buffer, if the available space wraps
677 * around to the beginning */
678 avail = MIN(size, self->ring_length - self->ring_count);
679 avail = MIN(avail, self->ring_length - self->ring_head);
681 /* copy AVAIL bytes into the ring buf (knowing it's contiguous) */
682 memmove(self->ring_buffer + self->ring_head, p, avail);
684 /* reset the ring variables to represent this state */
685 self->ring_count += avail;
686 self->ring_head += avail; /* will, at most, hit ring_length */
687 if (self->ring_head == self->ring_length)
689 p = (gpointer)((guchar *)p + avail);
692 /* and give the device thread a notice that data is ready */
693 g_cond_broadcast(self->ring_add_cond);
696 unlock_and_free_and_finish:
697 g_mutex_unlock(self->ring_mutex);
712 XferDestTaperSplitter *self = (XferDestTaperSplitter *)elt;
713 GError *error = NULL;
715 self->device_thread = g_thread_create(device_thread, (gpointer)self, FALSE, &error);
716 if (!self->device_thread) {
717 g_critical(_("Error creating new thread: %s (%s)"),
718 error->message, errno? strerror(errno) : _("no error code"));
729 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
733 rv = XFER_ELEMENT_CLASS(parent_class)->cancel(elt, expect_eof);
735 /* then signal all of our condition variables, so that threads waiting on them
736 * wake up and see elt->cancelled. */
737 g_mutex_lock(self->ring_mutex);
738 g_cond_broadcast(self->ring_add_cond);
739 g_cond_broadcast(self->ring_free_cond);
740 g_mutex_unlock(self->ring_mutex);
742 g_mutex_lock(self->state_mutex);
743 g_cond_broadcast(self->state_cond);
744 g_mutex_unlock(self->state_mutex);
755 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
757 g_assert(self->device != NULL);
758 g_assert(!self->device->in_file);
759 g_assert(header != NULL);
761 DBG(1, "start_part()");
763 /* we can only retry the part if we're getting slices via cache_inform's */
765 if (self->last_part_successful) {
766 xfer_cancel_with_error(XFER_ELEMENT(self),
767 _("Previous part did not fail; cannot retry"));
771 if (!self->expect_cache_inform) {
772 xfer_cancel_with_error(XFER_ELEMENT(self),
773 _("No cache for previous failed part; cannot retry"));
777 self->bytes_to_read_from_slices = self->part_bytes_written;
779 /* don't read any bytes from the slices, since we're not retrying */
780 self->bytes_to_read_from_slices = 0;
783 g_mutex_lock(self->state_mutex);
784 g_assert(self->paused);
785 g_assert(!self->no_more_parts);
787 if (self->part_header)
788 dumpfile_free(self->part_header);
789 self->part_header = dumpfile_copy(header);
792 self->paused = FALSE;
793 g_cond_broadcast(self->state_cond);
795 g_mutex_unlock(self->state_mutex);
800 XferDestTaper *xdtself,
803 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
804 StreamingRequirement newstreaming;
807 DBG(1, "use_device(%s)%s", device->device_name, (device == self->device)? " (no change)":"");
809 /* short-circuit if nothing is changing */
810 if (self->device == device)
813 g_mutex_lock(self->state_mutex);
815 g_object_unref(self->device);
816 self->device = device;
817 g_object_ref(device);
819 /* get this new device's streaming requirements */
820 bzero(&val, sizeof(val));
821 if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
822 || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
823 g_warning("Couldn't get streaming type for %s", self->device->device_name);
825 newstreaming = g_value_get_enum(&val);
826 if (newstreaming != self->streaming)
827 g_warning("New device has different streaming requirements from the original; "
828 "ignoring new requirement");
832 /* check that the blocksize hasn't changed */
833 if (self->block_size != device->block_size) {
834 g_mutex_unlock(self->state_mutex);
835 xfer_cancel_with_error(XFER_ELEMENT(self),
836 _("All devices used by the taper must have the same block size"));
839 g_mutex_unlock(self->state_mutex);
845 const char *filename,
849 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdt);
850 FileSlice *slice = g_new(FileSlice, 1), *iter;
853 slice->filename = g_strdup(filename);
854 slice->offset = offset;
855 slice->length = length;
857 g_mutex_lock(self->part_slices_mutex);
858 if (self->part_slices) {
859 for (iter = self->part_slices; iter->next; iter = iter->next) {}
862 self->part_slices = slice;
864 g_mutex_unlock(self->part_slices_mutex);
868 get_part_bytes_written_impl(
869 XferDestTaper *xdtself)
871 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(xdtself);
873 /* NOTE: this access is unsafe and may return inconsistent results (e.g, a
874 * partial write to the 64-bit value on a 32-bit system). This is ok for
875 * the moment, as it's only informational, but be warned. */
877 return device_get_bytes_written(self->device);
879 return self->part_bytes_written;
887 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(elt);
888 elt->can_generate_eof = FALSE;
890 self->state_mutex = g_mutex_new();
891 self->state_cond = g_cond_new();
892 self->ring_mutex = g_mutex_new();
893 self->ring_add_cond = g_cond_new();
894 self->ring_free_cond = g_cond_new();
895 self->part_slices_mutex = g_mutex_new();
899 self->part_header = NULL;
901 self->part_bytes_written = 0;
902 self->part_slices = NULL;
909 XferDestTaperSplitter *self = XFER_DEST_TAPER_SPLITTER(obj_self);
910 FileSlice *slice, *next_slice;
912 g_mutex_free(self->state_mutex);
913 g_cond_free(self->state_cond);
915 g_mutex_free(self->ring_mutex);
916 g_cond_free(self->ring_add_cond);
917 g_cond_free(self->ring_free_cond);
919 g_mutex_free(self->part_slices_mutex);
921 for (slice = self->part_slices; slice; slice = next_slice) {
922 next_slice = slice->next;
924 g_free(slice->filename);
928 if (self->ring_buffer)
929 g_free(self->ring_buffer);
931 if (self->part_header)
932 dumpfile_free(self->part_header);
935 g_object_unref(self->device);
938 G_OBJECT_CLASS(parent_class)->finalize(obj_self);
943 XferDestTaperSplitterClass * selfc)
945 XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
946 XferDestTaperClass *xdt_klass = XFER_DEST_TAPER_CLASS(selfc);
947 GObjectClass *goc = G_OBJECT_CLASS(selfc);
948 static xfer_element_mech_pair_t mech_pairs[] = {
949 { XFER_MECH_PUSH_BUFFER, XFER_MECH_NONE, 1, 1},
950 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
953 klass->start = start_impl;
954 klass->cancel = cancel_impl;
955 klass->push_buffer = push_buffer_impl;
956 xdt_klass->start_part = start_part_impl;
957 xdt_klass->use_device = use_device_impl;
958 xdt_klass->cache_inform = cache_inform_impl;
959 xdt_klass->get_part_bytes_written = get_part_bytes_written_impl;
960 goc->finalize = finalize_impl;
962 klass->perl_class = "Amanda::Xfer::Dest::Taper::Splitter";
963 klass->mech_pairs = mech_pairs;
965 parent_class = g_type_class_peek_parent(selfc);
969 xfer_dest_taper_splitter_get_type (void)
971 static GType type = 0;
973 if G_UNLIKELY(type == 0) {
974 static const GTypeInfo info = {
975 sizeof (XferDestTaperSplitterClass),
976 (GBaseInitFunc) NULL,
977 (GBaseFinalizeFunc) NULL,
978 (GClassInitFunc) class_init,
979 (GClassFinalizeFunc) NULL,
980 NULL /* class_data */,
981 sizeof (XferDestTaperSplitter),
983 (GInstanceInitFunc) instance_init,
987 type = g_type_register_static (XFER_DEST_TAPER_TYPE, "XferDestTaperSplitter", &info, 0);
998 xfer_dest_taper_splitter(
999 Device *first_device,
1002 gboolean expect_cache_inform)
1004 XferDestTaperSplitter *self = (XferDestTaperSplitter *)g_object_new(XFER_DEST_TAPER_SPLITTER_TYPE, NULL);
1007 /* max_memory and part_size get rounded up to the next multiple of
1009 max_memory = ((max_memory + first_device->block_size - 1)
1010 / first_device->block_size) * first_device->block_size;
1012 part_size = ((part_size + first_device->block_size - 1)
1013 / first_device->block_size) * first_device->block_size;
1015 self->part_size = part_size;
1017 self->device = first_device;
1019 g_object_ref(self->device);
1020 self->block_size = first_device->block_size;
1021 self->paused = TRUE;
1022 self->no_more_parts = FALSE;
1024 /* set up a ring buffer of size max_memory */
1025 self->ring_length = max_memory;
1026 self->ring_buffer = g_malloc(max_memory);
1027 self->ring_head = self->ring_tail = 0;
1028 self->ring_count = 0;
1029 self->ring_head_at_eof = 0;
1031 /* get this new device's streaming requirements */
1032 bzero(&val, sizeof(val));
1033 if (!device_property_get(self->device, PROPERTY_STREAMING, &val)
1034 || !G_VALUE_HOLDS(&val, STREAMING_REQUIREMENT_TYPE)) {
1035 g_warning("Couldn't get streaming type for %s", self->device->device_name);
1036 self->streaming = STREAMING_REQUIREMENT_REQUIRED;
1038 self->streaming = g_value_get_enum(&val);
1040 g_value_unset(&val);
1042 /* grab data from cache_inform, just in case we hit PEOM */
1043 self->expect_cache_inform = expect_cache_inform;
1045 return XFER_ELEMENT(self);