Imported Upstream version 2.6.1
[debian/amanda] / device-src / queueing.c
diff --git a/device-src/queueing.c b/device-src/queueing.c
deleted file mode 100644 (file)
index 2ec0e8c..0000000
+++ /dev/null
@@ -1,548 +0,0 @@
-/*
- * Copyright (c) 2005 Zmanda, Inc.  All Rights Reserved.
- * 
- * This library is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License version 2.1 as 
- * published by the Free Software Foundation.
- * 
- * This library is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
- * License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public License
- * along with this library; if not, write to the Free Software Foundation,
- * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
- * 
- * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120
- * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
- */
-
-#include "queueing.h"
-#include "device.h"
-#include "semaphore.h"
-#include "amanda.h"
-
-/* Queueing framework here. */
-typedef struct {
-    guint block_size;
-    ProducerFunctor producer;
-    gpointer producer_user_data;
-    ConsumerFunctor consumer;
-    gpointer consumer_user_data;
-    GAsyncQueue *data_queue, *free_queue;
-    semaphore_t *free_memory;
-    StreamingRequirement streaming_mode;
-} queue_data_t;
-
-static queue_buffer_t *invent_buffer(void) {
-    queue_buffer_t *rval;
-    rval = malloc(sizeof(*rval));
-
-    rval->data = NULL;
-    rval->alloc_size = 0;
-    rval->data_size = 0;
-    rval->offset = 0;
-
-    return rval;
-}
-
-void free_buffer(queue_buffer_t *buf) {
-    if (buf != NULL)
-        amfree(buf->data);
-    amfree(buf);
-}
-
-static queue_buffer_t * merge_buffers(queue_buffer_t *buf1,
-                                      queue_buffer_t *buf2) {
-    if (buf1 == NULL)
-        return buf2;
-    else if (buf2 == NULL)
-        return buf1;
-
-    if (buf2->offset >= buf1->data_size) {
-        /* We can fit buf1 at the beginning of buf2. */
-        memcpy(buf2->data + buf2->offset - buf1->data_size,
-               buf1->data + buf1->offset,
-               buf1->data_size);
-        buf2->offset -= buf1->data_size;
-        buf2->data_size += buf1->data_size;
-        free_buffer(buf1);
-        return buf2;
-    } else if (buf1->alloc_size - buf1->offset - buf1->data_size
-               >= buf2->data_size) {
-        /* We can fit buf2 at the end of buf1. */
-        memcpy(buf1->data + buf1->offset + buf1->data_size,
-               buf2->data + buf2->offset, buf2->data_size);
-        buf1->data_size += buf2->data_size;
-        free_buffer(buf2);
-        return buf1;
-    } else {
-        /* We can grow buf1 and put everything there. */
-        if (buf1->offset != 0) {
-            /* But first we have to fix up buf1. */
-            memmove(buf1->data, buf1->data + buf1->offset, buf1->data_size);
-            buf1->offset = 0;
-        }
-        buf1->alloc_size = buf1->data_size + buf2->data_size;
-        buf1->data = realloc(buf1->data, buf1->alloc_size);
-        memcpy(buf1->data + buf1->data_size, buf2->data + buf2->offset,
-               buf2->data_size);
-        buf1->data_size = buf1->alloc_size;
-        free_buffer(buf2);
-        return buf1;
-    }
-}
-
-/* Invalidate the first "bytes" bytes of the buffer, by adjusting the
-   offset and data size. */
-static void consume_buffer(queue_buffer_t* buf, int bytes) {
-    buf->offset += bytes;
-    buf->data_size -= bytes;
-}
-
-/* Looks at the buffer to see how much free space it has. If it has more than
- * twice the data size of unused space at the end, or more than four times
- * the data size of unused space at the beginning, then that space is
- * reclaimed. */
-static void heatshrink_buffer(queue_buffer_t *buf) {
-    if (buf == NULL)
-        return;
-
-    if (G_UNLIKELY(buf->data_size * 4 > buf->offset)) {
-        /* Consolodate with memmove. We will reclaim the space in the next
-         * step. */
-        memmove(buf->data, buf->data + buf->offset, buf->data_size);
-        buf->offset = 0;
-    } 
-
-    if (buf->alloc_size > buf->data_size*2 + buf->offset) {
-        buf->alloc_size = buf->data_size + buf->offset;
-        buf->data = realloc(buf->data, buf->alloc_size);
-    }
-}
-
-static gpointer do_producer_thread(gpointer datap) {
-    queue_data_t* data = datap;
-
-    for (;;) {
-        queue_buffer_t *buf;
-        gboolean result;
-
-        semaphore_decrement(data->free_memory, 0);
-        buf = g_async_queue_try_pop(data->free_queue);
-        if (buf != NULL && buf->data == NULL) {
-            /* Consumer is finished, then so are we. */
-            amfree(buf);
-            return GINT_TO_POINTER(TRUE);
-        }
-
-        if (buf == NULL) {
-            buf = invent_buffer();
-        }
-        buf->offset = 0;
-        buf->data_size = 0;
-
-        result = data->producer(data->producer_user_data, buf,
-                                data->block_size);
-
-        // Producers can allocate way too much memory.
-        heatshrink_buffer(buf);
-
-        if (buf->data_size > 0) {
-            semaphore_force_adjust(data->free_memory, -buf->alloc_size);
-            
-            g_async_queue_push(data->data_queue, buf);
-            buf = NULL;
-        } else {
-            g_assert(result != PRODUCER_MORE);
-            free_buffer(buf);
-            buf = NULL;
-        }
-
-
-        if (result == PRODUCER_MORE) {
-            continue;
-        } else {
-            /* We are finished (and the first to do so). */
-            g_async_queue_push(data->data_queue, invent_buffer());
-            semaphore_force_set(data->free_memory, INT_MIN);
-
-            return GINT_TO_POINTER(result == PRODUCER_FINISHED);
-        }
-    }
-}
-
-static gpointer do_consumer_thread(gpointer datap) {
-    queue_data_t* data = datap;
-    gboolean finished = FALSE;
-    queue_buffer_t *buf = NULL;
-
-    if (data->streaming_mode != STREAMING_REQUIREMENT_NONE) {
-        semaphore_wait_empty(data->free_memory);
-    }
-
-    for (;;) {
-        gboolean result;
-
-        if (finished) {
-            return GINT_TO_POINTER(TRUE);
-        }
-
-        while (buf == NULL || buf->data_size < data->block_size) {
-            queue_buffer_t *next_buf;
-            if (data->streaming_mode == STREAMING_REQUIREMENT_DESIRED) {
-                do {
-                    next_buf = g_async_queue_try_pop(data->data_queue);
-                    if (next_buf == NULL) {
-                        semaphore_wait_empty(data->free_memory);
-                    }
-                } while (next_buf == NULL);
-            } else {
-                next_buf = g_async_queue_pop(data->data_queue);
-                g_assert(next_buf != NULL);
-            }
-
-            if (next_buf->data == NULL) {
-                /* Producer is finished, then so are we.*/
-                free_buffer(next_buf);
-                if (buf != NULL) {
-                    /* But we can't quit yet, we have a buffer to flush.*/
-                    finished = TRUE;
-                    break;
-                } else {
-                    /* We are so outta here. */
-                    return GINT_TO_POINTER(TRUE);
-                }            
-            }
-
-            semaphore_increment(data->free_memory, next_buf->alloc_size);
-            
-            buf = merge_buffers(buf, next_buf);
-        }
-
-        result = data->consumer(data->consumer_user_data, buf);
-
-        if (result > 0) {
-            consume_buffer(buf, result);
-            if (buf->data_size == 0) {
-                g_async_queue_push(data->free_queue, buf);
-                buf = NULL;
-            }
-            continue;
-        } else {
-            free_buffer(buf);
-            return GINT_TO_POINTER(FALSE);
-        }
-    }
-}
-
-/* Empties a buffer queue and frees all the buffers associated with it.
- *
- * If full_cleanup is TRUE, then we delete the queue itself.
- * If full_cleanup is FALSE, then we leave the queue around, with a
- *         signal element in it. */
-static void cleanup_buffer_queue(GAsyncQueue *Q, gboolean full_cleanup) {
-    g_async_queue_lock(Q);
-    for (;;) {
-        queue_buffer_t *buftmp;
-        buftmp = g_async_queue_try_pop_unlocked(Q);
-        if (buftmp == NULL)
-            break;
-
-        free_buffer(buftmp);
-    }
-    if (!full_cleanup)
-        g_async_queue_push_unlocked(Q, invent_buffer());
-
-    g_async_queue_unlock(Q);
-    
-    if (full_cleanup)
-        g_async_queue_unref(Q);
-}
-
-/* This function sacrifices performance, but will still work just
-   fine, on systems where threads are not supported. */
-static queue_result_flags
-do_unthreaded_consumer_producer_queue(guint block_size,
-                                      ProducerFunctor producer,
-                                      gpointer producer_user_data,
-                                      ConsumerFunctor consumer,
-                                      gpointer consumer_user_data) {
-    queue_buffer_t *buf = NULL, *next_buf = NULL;
-    gboolean finished = FALSE;
-    queue_result_flags rval = 0;
-
-    /* The basic theory of operation here is to read until we have
-       enough data to write, then write until we don't.. */
-    while (!finished) {
-        int result;
-        
-        while ((buf == NULL || buf->data_size < block_size) && !finished) {
-            if (next_buf == NULL)
-                next_buf = invent_buffer();
-
-            result = producer(producer_user_data, next_buf, block_size);
-
-            if (result != PRODUCER_MORE) {
-                finished = TRUE;
-                if (result != PRODUCER_FINISHED) {
-                    rval |= QUEUE_PRODUCER_ERROR;
-                }
-            }
-
-            buf = merge_buffers(buf, next_buf);
-            next_buf = NULL;
-        }
-
-        while (buf != NULL && buf->data_size > 0 &&
-               (buf->data_size >= block_size || finished)) {
-            result = consumer(consumer_user_data, buf);
-            
-            if (result > 0) {
-                consume_buffer(buf, result);
-                if (buf->data_size == 0) {
-                    next_buf = buf;
-                    buf = NULL;
-                }
-            } else {
-                finished = TRUE;
-                rval |= QUEUE_CONSUMER_ERROR;
-                break;
-            }
-        }
-    }
-
-    free_buffer(buf);
-    free_buffer(next_buf);
-    return rval;
-}
-
-gboolean do_consumer_producer_queue(ProducerFunctor producer,
-                                    gpointer producer_user_data,
-                                    ConsumerFunctor consumer,
-                                    gpointer consumer_user_data) {
-    return QUEUE_SUCCESS ==
-        do_consumer_producer_queue_full(producer, producer_user_data,
-                                        consumer, consumer_user_data,
-                                        0, DEFAULT_MAX_BUFFER_MEMORY,
-                                        STREAMING_REQUIREMENT_NONE);
-}
-
-queue_result_flags
-do_consumer_producer_queue_full(ProducerFunctor producer,
-                                gpointer producer_user_data,
-                                ConsumerFunctor consumer,
-                                gpointer consumer_user_data,
-                                int block_size,
-                                size_t max_memory,
-                                StreamingRequirement streaming_mode) {
-    GThread     * producer_thread;
-    GThread     * consumer_thread;
-    queue_data_t  queue_data;
-    gpointer      producer_result;
-    gpointer      consumer_result;
-    queue_result_flags rval;
-
-    if (block_size <= 0) {
-        block_size = DISK_BLOCK_BYTES;
-    }
-
-    g_return_val_if_fail(producer != NULL, FALSE);
-    g_return_val_if_fail(consumer != NULL, FALSE);
-
-    if (!g_thread_supported()) {
-        return do_unthreaded_consumer_producer_queue(block_size, producer,
-                                                     producer_user_data,
-                                                     consumer,
-                                                     consumer_user_data);
-    }
-
-    queue_data.block_size = block_size;
-    queue_data.producer = producer;
-    queue_data.producer_user_data = producer_user_data;
-    queue_data.consumer = consumer;
-    queue_data.consumer_user_data = consumer_user_data;
-    queue_data.streaming_mode = streaming_mode;
-
-    queue_data.data_queue = g_async_queue_new();
-    queue_data.free_queue = g_async_queue_new();
-
-    max_memory = MAX(1,MIN(max_memory, INT_MAX / 2));
-    queue_data.free_memory = semaphore_new_with_value(max_memory);
-
-    producer_thread = g_thread_create(do_producer_thread, &queue_data,
-                                      TRUE,
-                                      NULL /* FIXME: Should handle
-                                              errors. */);
-    consumer_thread = g_thread_create(do_consumer_thread, &queue_data,
-                                      TRUE,
-                                      NULL /* FIXME: Should handle
-                                              errors. */);
-    
-    /* The order of cleanup here is very important, to avoid deadlock. */
-    /* 1) Reap the consumer. */
-    consumer_result = g_thread_join(consumer_thread);
-    /* 2) Stop the producer. */
-    semaphore_force_set(queue_data.free_memory, -1);
-    /* 3) Cleanup the free queue; add a signal flag. */
-    cleanup_buffer_queue(queue_data.free_queue, FALSE);
-    /* 4) Restart the producer (so it can exit). */
-    semaphore_force_set(queue_data.free_memory, INT_MAX);
-    /* 5) Reap the producer. */
-    producer_result = g_thread_join(producer_thread);
-
-    cleanup_buffer_queue(queue_data.free_queue, TRUE);
-    cleanup_buffer_queue(queue_data.data_queue, TRUE);
-
-    semaphore_free(queue_data.free_memory);
-    
-    rval = 0;
-    if (!GPOINTER_TO_INT(producer_result)) {
-        rval |= QUEUE_PRODUCER_ERROR;
-    }
-    if (!GPOINTER_TO_INT(consumer_result)) {
-        rval |= QUEUE_CONSUMER_ERROR;
-    }
-    return rval;
-}
-
-/* Commonly-useful producers and consumers below. */
-
-producer_result_t device_read_producer(gpointer devicep,
-                                       queue_buffer_t *buffer,
-                                       int hint_size G_GNUC_UNUSED) {
-    Device* device;
-
-    device = (Device*) devicep;
-    g_assert(IS_DEVICE(device));
-
-    buffer->offset = 0;
-    for (;;) {
-        int result, read_size;
-        read_size = buffer->alloc_size;
-        result = device_read_block(device, buffer->data, &read_size);
-        if (result > 0) {
-            buffer->data_size = read_size;
-            return PRODUCER_MORE;
-        } else if (result == 0) {
-            buffer->data = realloc(buffer->data, read_size);
-            buffer->alloc_size = read_size;
-        } else if (device->is_eof) {
-            return PRODUCER_FINISHED;
-        } else {
-            buffer->data_size = 0;
-            return PRODUCER_ERROR;
-        }
-    }
-}
-
-int device_write_consumer(gpointer devicep, queue_buffer_t *buffer) {
-    Device* device;
-    unsigned int write_size;
-    static gboolean wrote_blocksize = FALSE;
-
-    device = (Device*) devicep;
-    g_assert(IS_DEVICE(device));
-    write_size = MIN(buffer->data_size,
-                     device_write_max_size(device));
-
-    if (!wrote_blocksize) {
-       wrote_blocksize = TRUE;
-       dbprintf("USING BLOCKSIZE %d bytes\n", write_size);
-    }
-
-    if (device_write_block(device, write_size,
-                           buffer->data + buffer->offset,
-                           buffer->data_size <
-                               device_write_min_size(device))) {
-        /* Success! */
-        return write_size;
-    } else {
-        /* Nope, really an error. */
-        return -1;
-    }
-}
-
-producer_result_t fd_read_producer(gpointer fdp, queue_buffer_t *buffer,
-                                   int hint_size) {
-    int fd;
-
-    fd = GPOINTER_TO_INT(fdp);
-    g_assert(fd >= 0);
-    g_assert(buffer->data_size == 0);
-
-    buffer->offset = 0;
-
-    if (buffer->data == NULL) {
-        /* Set up the buffer. */
-        buffer->data = malloc(hint_size);
-        buffer->alloc_size = hint_size;
-    }
-
-    for (;;) {
-        int result;
-        result = read(fd, buffer->data, buffer->alloc_size);
-
-        if (result > 0) {
-            buffer->data_size = result;
-            return PRODUCER_MORE;
-        } else if (result == 0) {
-            /* End of file. */
-            return PRODUCER_FINISHED;
-        } else if (0
-#ifdef EAGAIN
-                || errno == EAGAIN
-#endif
-#ifdef EWOULDBLOCK
-                || errno == EWOULDBLOCK
-#endif
-#ifdef EINTR
-                || errno == EINTR
-#endif
-                ) {
-                /* Try again. */
-                continue;
-        } else {
-            /* Error occured. */
-            g_fprintf(stderr, "Error reading fd %d: %s\n", fd, strerror(errno));
-            return PRODUCER_ERROR;
-        }
-    }
-}
-
-int fd_write_consumer(gpointer fdp, queue_buffer_t *buffer) {
-    int fd;
-
-    fd = GPOINTER_TO_INT(fdp);
-    g_assert(fd >= 0);
-
-    g_return_val_if_fail(buffer->data_size > 0, 0);
-
-    for (;;) {
-        int write_size;
-        write_size = write(fd, buffer->data + buffer->offset,
-                           buffer->data_size);
-        
-        if (write_size > 0) {
-            return write_size;
-        } else if (0
-#ifdef EAGAIN
-                || errno == EAGAIN
-#endif
-#ifdef EWOULDBLOCK
-                || errno == EWOULDBLOCK
-#endif
-#ifdef EINTR
-                || errno == EINTR
-#endif
-                ) {
-                /* Try again. */
-                continue;
-        } else {
-            /* Error occured. */
-            g_fprintf(stderr, "Error writing fd %d: %s\n", fd, strerror(errno));
-            return -1;
-        }        
-    }
-}