X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fqueueing.c;fp=device-src%2Fqueueing.c;h=0000000000000000000000000000000000000000;hb=2627875b7d18858bc1f9f7652811e4d8c15a23eb;hp=2ec0e8c96c399b199a37ab9ec5b71a98d2d36a3c;hpb=fb2bd066c2f8b34addafe48d62550e3033a59431;p=debian%2Famanda diff --git a/device-src/queueing.c b/device-src/queueing.c deleted file mode 100644 index 2ec0e8c..0000000 --- a/device-src/queueing.c +++ /dev/null @@ -1,548 +0,0 @@ -/* - * Copyright (c) 2005 Zmanda, Inc. All Rights Reserved. - * - * This library is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License version 2.1 as - * published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - * License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this library; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. - * - * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120 - * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com - */ - -#include "queueing.h" -#include "device.h" -#include "semaphore.h" -#include "amanda.h" - -/* Queueing framework here. */ -typedef struct { - guint block_size; - ProducerFunctor producer; - gpointer producer_user_data; - ConsumerFunctor consumer; - gpointer consumer_user_data; - GAsyncQueue *data_queue, *free_queue; - semaphore_t *free_memory; - StreamingRequirement streaming_mode; -} queue_data_t; - -static queue_buffer_t *invent_buffer(void) { - queue_buffer_t *rval; - rval = malloc(sizeof(*rval)); - - rval->data = NULL; - rval->alloc_size = 0; - rval->data_size = 0; - rval->offset = 0; - - return rval; -} - -void free_buffer(queue_buffer_t *buf) { - if (buf != NULL) - amfree(buf->data); - amfree(buf); -} - -static queue_buffer_t * merge_buffers(queue_buffer_t *buf1, - queue_buffer_t *buf2) { - if (buf1 == NULL) - return buf2; - else if (buf2 == NULL) - return buf1; - - if (buf2->offset >= buf1->data_size) { - /* We can fit buf1 at the beginning of buf2. */ - memcpy(buf2->data + buf2->offset - buf1->data_size, - buf1->data + buf1->offset, - buf1->data_size); - buf2->offset -= buf1->data_size; - buf2->data_size += buf1->data_size; - free_buffer(buf1); - return buf2; - } else if (buf1->alloc_size - buf1->offset - buf1->data_size - >= buf2->data_size) { - /* We can fit buf2 at the end of buf1. */ - memcpy(buf1->data + buf1->offset + buf1->data_size, - buf2->data + buf2->offset, buf2->data_size); - buf1->data_size += buf2->data_size; - free_buffer(buf2); - return buf1; - } else { - /* We can grow buf1 and put everything there. */ - if (buf1->offset != 0) { - /* But first we have to fix up buf1. */ - memmove(buf1->data, buf1->data + buf1->offset, buf1->data_size); - buf1->offset = 0; - } - buf1->alloc_size = buf1->data_size + buf2->data_size; - buf1->data = realloc(buf1->data, buf1->alloc_size); - memcpy(buf1->data + buf1->data_size, buf2->data + buf2->offset, - buf2->data_size); - buf1->data_size = buf1->alloc_size; - free_buffer(buf2); - return buf1; - } -} - -/* Invalidate the first "bytes" bytes of the buffer, by adjusting the - offset and data size. */ -static void consume_buffer(queue_buffer_t* buf, int bytes) { - buf->offset += bytes; - buf->data_size -= bytes; -} - -/* Looks at the buffer to see how much free space it has. If it has more than - * twice the data size of unused space at the end, or more than four times - * the data size of unused space at the beginning, then that space is - * reclaimed. */ -static void heatshrink_buffer(queue_buffer_t *buf) { - if (buf == NULL) - return; - - if (G_UNLIKELY(buf->data_size * 4 > buf->offset)) { - /* Consolodate with memmove. We will reclaim the space in the next - * step. */ - memmove(buf->data, buf->data + buf->offset, buf->data_size); - buf->offset = 0; - } - - if (buf->alloc_size > buf->data_size*2 + buf->offset) { - buf->alloc_size = buf->data_size + buf->offset; - buf->data = realloc(buf->data, buf->alloc_size); - } -} - -static gpointer do_producer_thread(gpointer datap) { - queue_data_t* data = datap; - - for (;;) { - queue_buffer_t *buf; - gboolean result; - - semaphore_decrement(data->free_memory, 0); - buf = g_async_queue_try_pop(data->free_queue); - if (buf != NULL && buf->data == NULL) { - /* Consumer is finished, then so are we. */ - amfree(buf); - return GINT_TO_POINTER(TRUE); - } - - if (buf == NULL) { - buf = invent_buffer(); - } - buf->offset = 0; - buf->data_size = 0; - - result = data->producer(data->producer_user_data, buf, - data->block_size); - - // Producers can allocate way too much memory. - heatshrink_buffer(buf); - - if (buf->data_size > 0) { - semaphore_force_adjust(data->free_memory, -buf->alloc_size); - - g_async_queue_push(data->data_queue, buf); - buf = NULL; - } else { - g_assert(result != PRODUCER_MORE); - free_buffer(buf); - buf = NULL; - } - - - if (result == PRODUCER_MORE) { - continue; - } else { - /* We are finished (and the first to do so). */ - g_async_queue_push(data->data_queue, invent_buffer()); - semaphore_force_set(data->free_memory, INT_MIN); - - return GINT_TO_POINTER(result == PRODUCER_FINISHED); - } - } -} - -static gpointer do_consumer_thread(gpointer datap) { - queue_data_t* data = datap; - gboolean finished = FALSE; - queue_buffer_t *buf = NULL; - - if (data->streaming_mode != STREAMING_REQUIREMENT_NONE) { - semaphore_wait_empty(data->free_memory); - } - - for (;;) { - gboolean result; - - if (finished) { - return GINT_TO_POINTER(TRUE); - } - - while (buf == NULL || buf->data_size < data->block_size) { - queue_buffer_t *next_buf; - if (data->streaming_mode == STREAMING_REQUIREMENT_DESIRED) { - do { - next_buf = g_async_queue_try_pop(data->data_queue); - if (next_buf == NULL) { - semaphore_wait_empty(data->free_memory); - } - } while (next_buf == NULL); - } else { - next_buf = g_async_queue_pop(data->data_queue); - g_assert(next_buf != NULL); - } - - if (next_buf->data == NULL) { - /* Producer is finished, then so are we.*/ - free_buffer(next_buf); - if (buf != NULL) { - /* But we can't quit yet, we have a buffer to flush.*/ - finished = TRUE; - break; - } else { - /* We are so outta here. */ - return GINT_TO_POINTER(TRUE); - } - } - - semaphore_increment(data->free_memory, next_buf->alloc_size); - - buf = merge_buffers(buf, next_buf); - } - - result = data->consumer(data->consumer_user_data, buf); - - if (result > 0) { - consume_buffer(buf, result); - if (buf->data_size == 0) { - g_async_queue_push(data->free_queue, buf); - buf = NULL; - } - continue; - } else { - free_buffer(buf); - return GINT_TO_POINTER(FALSE); - } - } -} - -/* Empties a buffer queue and frees all the buffers associated with it. - * - * If full_cleanup is TRUE, then we delete the queue itself. - * If full_cleanup is FALSE, then we leave the queue around, with a - * signal element in it. */ -static void cleanup_buffer_queue(GAsyncQueue *Q, gboolean full_cleanup) { - g_async_queue_lock(Q); - for (;;) { - queue_buffer_t *buftmp; - buftmp = g_async_queue_try_pop_unlocked(Q); - if (buftmp == NULL) - break; - - free_buffer(buftmp); - } - if (!full_cleanup) - g_async_queue_push_unlocked(Q, invent_buffer()); - - g_async_queue_unlock(Q); - - if (full_cleanup) - g_async_queue_unref(Q); -} - -/* This function sacrifices performance, but will still work just - fine, on systems where threads are not supported. */ -static queue_result_flags -do_unthreaded_consumer_producer_queue(guint block_size, - ProducerFunctor producer, - gpointer producer_user_data, - ConsumerFunctor consumer, - gpointer consumer_user_data) { - queue_buffer_t *buf = NULL, *next_buf = NULL; - gboolean finished = FALSE; - queue_result_flags rval = 0; - - /* The basic theory of operation here is to read until we have - enough data to write, then write until we don't.. */ - while (!finished) { - int result; - - while ((buf == NULL || buf->data_size < block_size) && !finished) { - if (next_buf == NULL) - next_buf = invent_buffer(); - - result = producer(producer_user_data, next_buf, block_size); - - if (result != PRODUCER_MORE) { - finished = TRUE; - if (result != PRODUCER_FINISHED) { - rval |= QUEUE_PRODUCER_ERROR; - } - } - - buf = merge_buffers(buf, next_buf); - next_buf = NULL; - } - - while (buf != NULL && buf->data_size > 0 && - (buf->data_size >= block_size || finished)) { - result = consumer(consumer_user_data, buf); - - if (result > 0) { - consume_buffer(buf, result); - if (buf->data_size == 0) { - next_buf = buf; - buf = NULL; - } - } else { - finished = TRUE; - rval |= QUEUE_CONSUMER_ERROR; - break; - } - } - } - - free_buffer(buf); - free_buffer(next_buf); - return rval; -} - -gboolean do_consumer_producer_queue(ProducerFunctor producer, - gpointer producer_user_data, - ConsumerFunctor consumer, - gpointer consumer_user_data) { - return QUEUE_SUCCESS == - do_consumer_producer_queue_full(producer, producer_user_data, - consumer, consumer_user_data, - 0, DEFAULT_MAX_BUFFER_MEMORY, - STREAMING_REQUIREMENT_NONE); -} - -queue_result_flags -do_consumer_producer_queue_full(ProducerFunctor producer, - gpointer producer_user_data, - ConsumerFunctor consumer, - gpointer consumer_user_data, - int block_size, - size_t max_memory, - StreamingRequirement streaming_mode) { - GThread * producer_thread; - GThread * consumer_thread; - queue_data_t queue_data; - gpointer producer_result; - gpointer consumer_result; - queue_result_flags rval; - - if (block_size <= 0) { - block_size = DISK_BLOCK_BYTES; - } - - g_return_val_if_fail(producer != NULL, FALSE); - g_return_val_if_fail(consumer != NULL, FALSE); - - if (!g_thread_supported()) { - return do_unthreaded_consumer_producer_queue(block_size, producer, - producer_user_data, - consumer, - consumer_user_data); - } - - queue_data.block_size = block_size; - queue_data.producer = producer; - queue_data.producer_user_data = producer_user_data; - queue_data.consumer = consumer; - queue_data.consumer_user_data = consumer_user_data; - queue_data.streaming_mode = streaming_mode; - - queue_data.data_queue = g_async_queue_new(); - queue_data.free_queue = g_async_queue_new(); - - max_memory = MAX(1,MIN(max_memory, INT_MAX / 2)); - queue_data.free_memory = semaphore_new_with_value(max_memory); - - producer_thread = g_thread_create(do_producer_thread, &queue_data, - TRUE, - NULL /* FIXME: Should handle - errors. */); - consumer_thread = g_thread_create(do_consumer_thread, &queue_data, - TRUE, - NULL /* FIXME: Should handle - errors. */); - - /* The order of cleanup here is very important, to avoid deadlock. */ - /* 1) Reap the consumer. */ - consumer_result = g_thread_join(consumer_thread); - /* 2) Stop the producer. */ - semaphore_force_set(queue_data.free_memory, -1); - /* 3) Cleanup the free queue; add a signal flag. */ - cleanup_buffer_queue(queue_data.free_queue, FALSE); - /* 4) Restart the producer (so it can exit). */ - semaphore_force_set(queue_data.free_memory, INT_MAX); - /* 5) Reap the producer. */ - producer_result = g_thread_join(producer_thread); - - cleanup_buffer_queue(queue_data.free_queue, TRUE); - cleanup_buffer_queue(queue_data.data_queue, TRUE); - - semaphore_free(queue_data.free_memory); - - rval = 0; - if (!GPOINTER_TO_INT(producer_result)) { - rval |= QUEUE_PRODUCER_ERROR; - } - if (!GPOINTER_TO_INT(consumer_result)) { - rval |= QUEUE_CONSUMER_ERROR; - } - return rval; -} - -/* Commonly-useful producers and consumers below. */ - -producer_result_t device_read_producer(gpointer devicep, - queue_buffer_t *buffer, - int hint_size G_GNUC_UNUSED) { - Device* device; - - device = (Device*) devicep; - g_assert(IS_DEVICE(device)); - - buffer->offset = 0; - for (;;) { - int result, read_size; - read_size = buffer->alloc_size; - result = device_read_block(device, buffer->data, &read_size); - if (result > 0) { - buffer->data_size = read_size; - return PRODUCER_MORE; - } else if (result == 0) { - buffer->data = realloc(buffer->data, read_size); - buffer->alloc_size = read_size; - } else if (device->is_eof) { - return PRODUCER_FINISHED; - } else { - buffer->data_size = 0; - return PRODUCER_ERROR; - } - } -} - -int device_write_consumer(gpointer devicep, queue_buffer_t *buffer) { - Device* device; - unsigned int write_size; - static gboolean wrote_blocksize = FALSE; - - device = (Device*) devicep; - g_assert(IS_DEVICE(device)); - write_size = MIN(buffer->data_size, - device_write_max_size(device)); - - if (!wrote_blocksize) { - wrote_blocksize = TRUE; - dbprintf("USING BLOCKSIZE %d bytes\n", write_size); - } - - if (device_write_block(device, write_size, - buffer->data + buffer->offset, - buffer->data_size < - device_write_min_size(device))) { - /* Success! */ - return write_size; - } else { - /* Nope, really an error. */ - return -1; - } -} - -producer_result_t fd_read_producer(gpointer fdp, queue_buffer_t *buffer, - int hint_size) { - int fd; - - fd = GPOINTER_TO_INT(fdp); - g_assert(fd >= 0); - g_assert(buffer->data_size == 0); - - buffer->offset = 0; - - if (buffer->data == NULL) { - /* Set up the buffer. */ - buffer->data = malloc(hint_size); - buffer->alloc_size = hint_size; - } - - for (;;) { - int result; - result = read(fd, buffer->data, buffer->alloc_size); - - if (result > 0) { - buffer->data_size = result; - return PRODUCER_MORE; - } else if (result == 0) { - /* End of file. */ - return PRODUCER_FINISHED; - } else if (0 -#ifdef EAGAIN - || errno == EAGAIN -#endif -#ifdef EWOULDBLOCK - || errno == EWOULDBLOCK -#endif -#ifdef EINTR - || errno == EINTR -#endif - ) { - /* Try again. */ - continue; - } else { - /* Error occured. */ - g_fprintf(stderr, "Error reading fd %d: %s\n", fd, strerror(errno)); - return PRODUCER_ERROR; - } - } -} - -int fd_write_consumer(gpointer fdp, queue_buffer_t *buffer) { - int fd; - - fd = GPOINTER_TO_INT(fdp); - g_assert(fd >= 0); - - g_return_val_if_fail(buffer->data_size > 0, 0); - - for (;;) { - int write_size; - write_size = write(fd, buffer->data + buffer->offset, - buffer->data_size); - - if (write_size > 0) { - return write_size; - } else if (0 -#ifdef EAGAIN - || errno == EAGAIN -#endif -#ifdef EWOULDBLOCK - || errno == EWOULDBLOCK -#endif -#ifdef EINTR - || errno == EINTR -#endif - ) { - /* Try again. */ - continue; - } else { - /* Error occured. */ - g_fprintf(stderr, "Error writing fd %d: %s\n", fd, strerror(errno)); - return -1; - } - } -}