X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=common-src%2Fqueueing.c;fp=common-src%2Fqueueing.c;h=672d844c299b2795ff5bfba8e178b927cc2a1dff;hb=2627875b7d18858bc1f9f7652811e4d8c15a23eb;hp=0000000000000000000000000000000000000000;hpb=fb2bd066c2f8b34addafe48d62550e3033a59431;p=debian%2Famanda diff --git a/common-src/queueing.c b/common-src/queueing.c new file mode 100644 index 0000000..672d844 --- /dev/null +++ b/common-src/queueing.c @@ -0,0 +1,524 @@ +/* + * Copyright (c) 2005 Zmanda, Inc. All Rights Reserved. + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License version 2.1 as + * published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + * + * Contact information: Zmanda Inc., 465 S Mathlida Ave, Suite 300 + * Sunnyvale, CA 94086, USA, or: http://www.zmanda.com + */ + +#include "queueing.h" +#include "semaphore.h" +#include "amanda.h" + +/* Queueing framework here. */ +typedef struct { + size_t block_size; + StreamingRequirement streaming_mode; + + ProducerFunctor producer; + gpointer producer_user_data; + + ConsumerFunctor consumer; + gpointer consumer_user_data; + + GAsyncQueue *data_queue, *free_queue; + semaphore_t *free_memory; +} queue_data_t; + +static queue_buffer_t *invent_buffer(void) { + queue_buffer_t *rval; + rval = g_new(queue_buffer_t, 1); + + rval->data = NULL; + rval->alloc_size = 0; + rval->data_size = 0; + rval->offset = 0; + + return rval; +} + +void free_buffer(queue_buffer_t *buf) { + if (buf != NULL) + amfree(buf->data); + amfree(buf); +} + +static queue_buffer_t * merge_buffers(queue_buffer_t *buf1, + queue_buffer_t *buf2) { + if (buf1 == NULL) + return buf2; + else if (buf2 == NULL) + return buf1; + + if (buf2->offset >= buf1->data_size) { + /* We can fit buf1 at the beginning of buf2. */ + memcpy(buf2->data + buf2->offset - buf1->data_size, + buf1->data + buf1->offset, + buf1->data_size); + buf2->offset -= buf1->data_size; + buf2->data_size += buf1->data_size; + free_buffer(buf1); + return buf2; + } else if (buf1->alloc_size - buf1->offset - buf1->data_size + >= buf2->data_size) { + /* We can fit buf2 at the end of buf1. */ + memcpy(buf1->data + buf1->offset + buf1->data_size, + buf2->data + buf2->offset, buf2->data_size); + buf1->data_size += buf2->data_size; + free_buffer(buf2); + return buf1; + } else { + /* We can grow buf1 and put everything there. */ + if (buf1->offset != 0) { + /* But first we have to fix up buf1. */ + memmove(buf1->data, buf1->data + buf1->offset, buf1->data_size); + buf1->offset = 0; + } + buf1->alloc_size = buf1->data_size + buf2->data_size; + buf1->data = realloc(buf1->data, buf1->alloc_size); + memcpy(buf1->data + buf1->data_size, buf2->data + buf2->offset, + buf2->data_size); + buf1->data_size = buf1->alloc_size; + free_buffer(buf2); + return buf1; + } +} + +/* Invalidate the first "bytes" bytes of the buffer, by adjusting the + offset and data size. */ +static void consume_buffer(queue_buffer_t* buf, ssize_t bytes) { + g_assert(bytes >= 0 && bytes <= (ssize_t)buf->data_size); + buf->offset += bytes; + buf->data_size -= bytes; +} + +/* Looks at the buffer to see how much free space it has. If it has more than + * twice the data size of unused space at the end, or more than four times + * the data size of unused space at the beginning, then that space is + * reclaimed. */ +static void heatshrink_buffer(queue_buffer_t *buf) { + if (buf == NULL) + return; + + if (G_UNLIKELY(buf->offset > buf->data_size * 4)) { + /* Consolodate with memmove. We will reclaim the space in the next + * step. */ + memmove(buf->data, buf->data + buf->offset, buf->data_size); + buf->offset = 0; + } + + if (buf->alloc_size > buf->data_size*2 + buf->offset) { + buf->alloc_size = buf->data_size + buf->offset; + buf->data = realloc(buf->data, buf->alloc_size); + } +} + +static gpointer do_producer_thread(gpointer datap) { + queue_data_t* data = datap; + + for (;;) { + queue_buffer_t *buf; + gboolean result; + + semaphore_decrement(data->free_memory, 0); + buf = g_async_queue_try_pop(data->free_queue); + if (buf != NULL && buf->data == NULL) { + /* Consumer is finished, then so are we. */ + amfree(buf); + return GINT_TO_POINTER(TRUE); + } + + if (buf == NULL) { + buf = invent_buffer(); + } + buf->offset = 0; + buf->data_size = 0; + + result = data->producer(data->producer_user_data, buf, + data->block_size); + + // Producers can allocate way too much memory. + heatshrink_buffer(buf); + + if (buf->data_size > 0) { + semaphore_force_adjust(data->free_memory, -buf->alloc_size); + + g_async_queue_push(data->data_queue, buf); + buf = NULL; + } else { + g_assert(result != PRODUCER_MORE); + free_buffer(buf); + buf = NULL; + } + + + if (result == PRODUCER_MORE) { + continue; + } else { + /* We are finished (and the first to do so). */ + g_async_queue_push(data->data_queue, invent_buffer()); + semaphore_force_set(data->free_memory, INT_MIN); + + return GINT_TO_POINTER(result == PRODUCER_FINISHED); + } + } +} + +static gpointer do_consumer_thread(gpointer datap) { + queue_data_t* data = datap; + gboolean got_eof = FALSE; + queue_buffer_t *buf = NULL; + + if (data->streaming_mode != STREAMING_REQUIREMENT_NONE) { + semaphore_wait_empty(data->free_memory); + } + + for (;;) { + gboolean result; + + /* Pull in and merge buffers until we have at least data->block_size + * bytes, or there are no more buffers */ + while (!got_eof && (buf == NULL || buf->data_size < data->block_size)) { + queue_buffer_t *next_buf; + if (data->streaming_mode == STREAMING_REQUIREMENT_DESIRED) { + do { + next_buf = g_async_queue_try_pop(data->data_queue); + if (next_buf == NULL) { + semaphore_wait_empty(data->free_memory); + } + } while (next_buf == NULL); + } else { + next_buf = g_async_queue_pop(data->data_queue); + g_assert(next_buf != NULL); + } + + if (next_buf->data == NULL) { + /* A buffer with NULL data is an EOF from the producer */ + free_buffer(next_buf); + got_eof = TRUE; + break; + } + + semaphore_increment(data->free_memory, next_buf->alloc_size); + + buf = merge_buffers(buf, next_buf); + } + + /* If we're out of data, then we are done. */ + if (buf == NULL) + break; + + result = data->consumer(data->consumer_user_data, buf); + + if (result > 0) { + consume_buffer(buf, result); + if (buf->data_size == 0) { + g_async_queue_push(data->free_queue, buf); + buf = NULL; + } + continue; + } else { + free_buffer(buf); + return GINT_TO_POINTER(FALSE); + } + } + + /* We are so outta here. */ + return GINT_TO_POINTER(TRUE); +} + +/* Empties a buffer queue and frees all the buffers associated with it. + * + * If full_cleanup is TRUE, then we delete the queue itself. + * If full_cleanup is FALSE, then we leave the queue around, with a + * signal element in it. */ +static void cleanup_buffer_queue(GAsyncQueue *Q, gboolean full_cleanup) { + g_async_queue_lock(Q); + for (;;) { + queue_buffer_t *buftmp; + buftmp = g_async_queue_try_pop_unlocked(Q); + if (buftmp == NULL) + break; + + free_buffer(buftmp); + } + if (!full_cleanup) + g_async_queue_push_unlocked(Q, invent_buffer()); + + g_async_queue_unlock(Q); + + if (full_cleanup) + g_async_queue_unref(Q); +} + +/* This function sacrifices performance, but will still work just + fine, on systems where threads are not supported. */ +static queue_result_flags +do_unthreaded_consumer_producer_queue(size_t block_size, + ProducerFunctor producer, + gpointer producer_user_data, + ConsumerFunctor consumer, + gpointer consumer_user_data) { + queue_buffer_t *buf = NULL, *next_buf = NULL; + gboolean finished = FALSE; + queue_result_flags rval = 0; + + /* The basic theory of operation here is to read until we have + enough data to write, then write until we don't.. */ + while (!finished) { + producer_result_t result; + + while ((buf == NULL || buf->data_size < block_size) && !finished) { + if (next_buf == NULL) + next_buf = invent_buffer(); + + result = producer(producer_user_data, next_buf, block_size); + + if (result != PRODUCER_MORE) { + finished = TRUE; + if (result != PRODUCER_FINISHED) { + rval |= QUEUE_PRODUCER_ERROR; + } + } + + buf = merge_buffers(buf, next_buf); + next_buf = NULL; + } + + while (buf != NULL && buf->data_size > 0 && + (buf->data_size >= block_size || finished)) { + result = consumer(consumer_user_data, buf); + + if (result > 0) { + consume_buffer(buf, result); + if (buf->data_size == 0) { + next_buf = buf; + buf = NULL; + } + } else { + finished = TRUE; + rval |= QUEUE_CONSUMER_ERROR; + break; + } + } + } + + free_buffer(buf); + free_buffer(next_buf); + return rval; +} + +gboolean do_consumer_producer_queue(ProducerFunctor producer, + gpointer producer_user_data, + ConsumerFunctor consumer, + gpointer consumer_user_data) { + return QUEUE_SUCCESS == + do_consumer_producer_queue_full(producer, producer_user_data, + consumer, consumer_user_data, + 0, DEFAULT_MAX_BUFFER_MEMORY, + STREAMING_REQUIREMENT_NONE); +} + +queue_result_flags +do_consumer_producer_queue_full(ProducerFunctor producer, + gpointer producer_user_data, + ConsumerFunctor consumer, + gpointer consumer_user_data, + size_t block_size, + size_t max_memory, + StreamingRequirement streaming_mode) { + GThread * producer_thread; + GThread * consumer_thread; + queue_data_t queue_data; + gpointer producer_result; + gpointer consumer_result; + queue_result_flags rval; + + if (block_size <= 0) { + block_size = DISK_BLOCK_BYTES; + } + + g_return_val_if_fail(producer != NULL, FALSE); + g_return_val_if_fail(consumer != NULL, FALSE); + + if (!g_thread_supported()) { + return do_unthreaded_consumer_producer_queue(block_size, producer, + producer_user_data, + consumer, + consumer_user_data); + } + + queue_data.block_size = block_size; + queue_data.producer = producer; + queue_data.producer_user_data = producer_user_data; + queue_data.consumer = consumer; + queue_data.consumer_user_data = consumer_user_data; + queue_data.streaming_mode = streaming_mode; + + queue_data.data_queue = g_async_queue_new(); + queue_data.free_queue = g_async_queue_new(); + + max_memory = MAX(1,MIN(max_memory, INT_MAX / 2)); + queue_data.free_memory = semaphore_new_with_value(max_memory); + + producer_thread = g_thread_create(do_producer_thread, &queue_data, + TRUE, + NULL /* FIXME: Should handle + errors. */); + consumer_thread = g_thread_create(do_consumer_thread, &queue_data, + TRUE, + NULL /* FIXME: Should handle + errors. */); + + /* The order of cleanup here is very important, to avoid deadlock. */ + /* 1) Reap the consumer. */ + consumer_result = g_thread_join(consumer_thread); + /* 2) Stop the producer. */ + semaphore_force_set(queue_data.free_memory, -1); + /* 3) Cleanup the free queue; add a signal flag. */ + cleanup_buffer_queue(queue_data.free_queue, FALSE); + /* 4) Restart the producer (so it can exit). */ + semaphore_force_set(queue_data.free_memory, INT_MAX); + /* 5) Reap the producer. */ + producer_result = g_thread_join(producer_thread); + + cleanup_buffer_queue(queue_data.free_queue, TRUE); + cleanup_buffer_queue(queue_data.data_queue, TRUE); + + semaphore_free(queue_data.free_memory); + + rval = 0; + if (!GPOINTER_TO_INT(producer_result)) { + rval |= QUEUE_PRODUCER_ERROR; + } + if (!GPOINTER_TO_INT(consumer_result)) { + rval |= QUEUE_CONSUMER_ERROR; + } + return rval; +} + +/* Commonly-useful producers and consumers below. */ + +queue_fd_t * +queue_fd_new( + int fd, + char *errmsg) +{ + queue_fd_t *queue_fd; + + queue_fd = malloc(sizeof(queue_fd_t)); + queue_fd->fd = fd; + queue_fd->errmsg = errmsg; + + return queue_fd; +} + +int +queue_fd_fd( + queue_fd_t *queue_fd) +{ + return queue_fd->fd; +} + +char *queue_fd_errmsg( + queue_fd_t *queue_fd) +{ + return queue_fd->errmsg; +} + +producer_result_t fd_read_producer(gpointer f_queue_fd, queue_buffer_t *buffer, + size_t hint_size) { + int fd; + queue_fd_t *queue_fd = (queue_fd_t *)f_queue_fd; + fd = queue_fd->fd; + g_assert(fd >= 0); + g_assert(buffer->data_size == 0); + + buffer->offset = 0; + + if (buffer->data == NULL) { + /* Set up the buffer. */ + buffer->data = malloc(hint_size); + buffer->alloc_size = hint_size; + } + + for (;;) { + ssize_t result; + result = read(fd, buffer->data, buffer->alloc_size); + + if (result > 0) { + buffer->data_size = result; + return PRODUCER_MORE; + } else if (result == 0) { + /* End of file. */ + return PRODUCER_FINISHED; + } else if (0 +#ifdef EAGAIN + || errno == EAGAIN +#endif +#ifdef EWOULDBLOCK + || errno == EWOULDBLOCK +#endif +#ifdef EINTR + || errno == EINTR +#endif + ) { + /* Try again. */ + continue; + } else { + /* Error occured. */ + queue_fd->errmsg = newvstrallocf(queue_fd->errmsg, + "Error reading fd %d: %s\n", fd, strerror(errno)); + return PRODUCER_ERROR; + } + } +} + +ssize_t fd_write_consumer(gpointer f_queue_fd, queue_buffer_t *buffer) { + int fd; + queue_fd_t *queue_fd = (queue_fd_t *)f_queue_fd; + fd = queue_fd->fd; + + g_assert(fd >= 0); + + g_return_val_if_fail(buffer->data_size > 0, 0); + + for (;;) { + ssize_t write_size; + write_size = write(fd, buffer->data + buffer->offset, + buffer->data_size); + + if (write_size > 0) { + return write_size; + } else if (0 +#ifdef EAGAIN + || errno == EAGAIN +#endif +#ifdef EWOULDBLOCK + || errno == EWOULDBLOCK +#endif +#ifdef EINTR + || errno == EINTR +#endif + ) { + /* Try again. */ + continue; + } else { + /* Error occured. */ + g_fprintf(stderr, "Error writing fd %d: %s\n", fd, strerror(errno)); + return -1; + } + } +}