2 * Copyright (c) 2008,2009 Zmanda, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 as published
6 * by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
22 #include "semaphore.h"
25 /* Queueing framework here. */
28 StreamingRequirement streaming_mode;
30 ProducerFunctor producer;
31 gpointer producer_user_data;
33 ConsumerFunctor consumer;
34 gpointer consumer_user_data;
36 GAsyncQueue *data_queue, *free_queue;
37 semaphore_t *free_memory;
40 static queue_buffer_t *invent_buffer(void) {
42 rval = g_new(queue_buffer_t, 1);
52 void free_buffer(queue_buffer_t *buf) {
58 static queue_buffer_t * merge_buffers(queue_buffer_t *buf1,
59 queue_buffer_t *buf2) {
62 else if (buf2 == NULL)
65 if (buf2->offset >= buf1->data_size) {
66 /* We can fit buf1 at the beginning of buf2. */
67 memcpy(buf2->data + buf2->offset - buf1->data_size,
68 buf1->data + buf1->offset,
70 buf2->offset -= buf1->data_size;
71 buf2->data_size += buf1->data_size;
74 } else if (buf1->alloc_size - buf1->offset - buf1->data_size
76 /* We can fit buf2 at the end of buf1. */
77 memcpy(buf1->data + buf1->offset + buf1->data_size,
78 buf2->data + buf2->offset, buf2->data_size);
79 buf1->data_size += buf2->data_size;
83 /* We can grow buf1 and put everything there. */
84 if (buf1->offset != 0) {
85 /* But first we have to fix up buf1. */
86 memmove(buf1->data, buf1->data + buf1->offset, buf1->data_size);
89 buf1->alloc_size = buf1->data_size + buf2->data_size;
90 buf1->data = realloc(buf1->data, buf1->alloc_size);
91 memcpy(buf1->data + buf1->data_size, buf2->data + buf2->offset,
93 buf1->data_size = buf1->alloc_size;
99 /* Invalidate the first "bytes" bytes of the buffer, by adjusting the
100 offset and data size. */
101 static void consume_buffer(queue_buffer_t* buf, ssize_t bytes) {
102 g_assert(bytes >= 0 && bytes <= (ssize_t)buf->data_size);
103 buf->offset += bytes;
104 buf->data_size -= bytes;
107 /* Looks at the buffer to see how much free space it has. If it has more than
108 * twice the data size of unused space at the end, or more than four times
109 * the data size of unused space at the beginning, then that space is
111 static void heatshrink_buffer(queue_buffer_t *buf) {
115 if (G_UNLIKELY(buf->offset > buf->data_size * 4)) {
116 /* Consolodate with memmove. We will reclaim the space in the next
118 memmove(buf->data, buf->data + buf->offset, buf->data_size);
122 if (buf->alloc_size > buf->data_size*2 + buf->offset) {
123 buf->alloc_size = buf->data_size + buf->offset;
124 buf->data = realloc(buf->data, buf->alloc_size);
128 static gpointer do_producer_thread(gpointer datap) {
129 queue_data_t* data = datap;
135 semaphore_decrement(data->free_memory, 0);
136 buf = g_async_queue_try_pop(data->free_queue);
137 if (buf != NULL && buf->data == NULL) {
138 /* Consumer is finished, then so are we. */
140 return GINT_TO_POINTER(TRUE);
144 buf = invent_buffer();
149 result = data->producer(data->producer_user_data, buf,
152 // Producers can allocate way too much memory.
153 heatshrink_buffer(buf);
155 if (buf->data_size > 0) {
156 semaphore_force_adjust(data->free_memory, -buf->alloc_size);
158 g_async_queue_push(data->data_queue, buf);
161 g_assert(result != PRODUCER_MORE);
167 if (result == PRODUCER_MORE) {
170 /* We are finished (and the first to do so). */
171 g_async_queue_push(data->data_queue, invent_buffer());
172 semaphore_force_set(data->free_memory, INT_MIN);
174 return GINT_TO_POINTER(result == PRODUCER_FINISHED);
179 static gpointer do_consumer_thread(gpointer datap) {
180 queue_data_t* data = datap;
181 gboolean got_eof = FALSE;
182 queue_buffer_t *buf = NULL;
184 if (data->streaming_mode != STREAMING_REQUIREMENT_NONE) {
185 semaphore_wait_empty(data->free_memory);
191 /* Pull in and merge buffers until we have at least data->block_size
192 * bytes, or there are no more buffers */
193 while (!got_eof && (buf == NULL || buf->data_size < data->block_size)) {
194 queue_buffer_t *next_buf;
195 if (data->streaming_mode == STREAMING_REQUIREMENT_DESIRED) {
197 next_buf = g_async_queue_try_pop(data->data_queue);
198 if (next_buf == NULL) {
199 semaphore_wait_empty(data->free_memory);
201 } while (next_buf == NULL);
203 next_buf = g_async_queue_pop(data->data_queue);
204 g_assert(next_buf != NULL);
207 if (next_buf->data == NULL) {
208 /* A buffer with NULL data is an EOF from the producer */
209 free_buffer(next_buf);
214 semaphore_increment(data->free_memory, next_buf->alloc_size);
216 buf = merge_buffers(buf, next_buf);
219 /* If we're out of data, then we are done. */
223 result = data->consumer(data->consumer_user_data, buf);
226 consume_buffer(buf, result);
227 if (buf->data_size == 0) {
228 g_async_queue_push(data->free_queue, buf);
234 return GINT_TO_POINTER(FALSE);
238 /* We are so outta here. */
239 return GINT_TO_POINTER(TRUE);
242 /* Empties a buffer queue and frees all the buffers associated with it.
244 * If full_cleanup is TRUE, then we delete the queue itself.
245 * If full_cleanup is FALSE, then we leave the queue around, with a
246 * signal element in it. */
247 static void cleanup_buffer_queue(GAsyncQueue *Q, gboolean full_cleanup) {
248 g_async_queue_lock(Q);
250 queue_buffer_t *buftmp;
251 buftmp = g_async_queue_try_pop_unlocked(Q);
258 g_async_queue_push_unlocked(Q, invent_buffer());
260 g_async_queue_unlock(Q);
263 g_async_queue_unref(Q);
266 /* This function sacrifices performance, but will still work just
267 fine, on systems where threads are not supported. */
268 static queue_result_flags
269 do_unthreaded_consumer_producer_queue(size_t block_size,
270 ProducerFunctor producer,
271 gpointer producer_user_data,
272 ConsumerFunctor consumer,
273 gpointer consumer_user_data) {
274 queue_buffer_t *buf = NULL, *next_buf = NULL;
275 gboolean finished = FALSE;
276 queue_result_flags rval = 0;
278 /* The basic theory of operation here is to read until we have
279 enough data to write, then write until we don't.. */
281 producer_result_t result;
283 while ((buf == NULL || buf->data_size < block_size) && !finished) {
284 if (next_buf == NULL)
285 next_buf = invent_buffer();
287 result = producer(producer_user_data, next_buf, block_size);
289 if (result != PRODUCER_MORE) {
291 if (result != PRODUCER_FINISHED) {
292 rval |= QUEUE_PRODUCER_ERROR;
296 buf = merge_buffers(buf, next_buf);
300 while (buf != NULL && buf->data_size > 0 &&
301 (buf->data_size >= block_size || finished)) {
302 result = consumer(consumer_user_data, buf);
305 consume_buffer(buf, result);
306 if (buf->data_size == 0) {
312 rval |= QUEUE_CONSUMER_ERROR;
319 free_buffer(next_buf);
323 gboolean do_consumer_producer_queue(ProducerFunctor producer,
324 gpointer producer_user_data,
325 ConsumerFunctor consumer,
326 gpointer consumer_user_data) {
327 return QUEUE_SUCCESS ==
328 do_consumer_producer_queue_full(producer, producer_user_data,
329 consumer, consumer_user_data,
330 0, DEFAULT_MAX_BUFFER_MEMORY,
331 STREAMING_REQUIREMENT_NONE);
335 do_consumer_producer_queue_full(ProducerFunctor producer,
336 gpointer producer_user_data,
337 ConsumerFunctor consumer,
338 gpointer consumer_user_data,
341 StreamingRequirement streaming_mode) {
342 GThread * producer_thread;
343 GThread * consumer_thread;
344 queue_data_t queue_data;
345 gpointer producer_result;
346 gpointer consumer_result;
347 queue_result_flags rval;
349 if (block_size <= 0) {
350 block_size = DISK_BLOCK_BYTES;
353 g_return_val_if_fail(producer != NULL, FALSE);
354 g_return_val_if_fail(consumer != NULL, FALSE);
356 if (!g_thread_supported()) {
357 return do_unthreaded_consumer_producer_queue(block_size, producer,
363 queue_data.block_size = block_size;
364 queue_data.producer = producer;
365 queue_data.producer_user_data = producer_user_data;
366 queue_data.consumer = consumer;
367 queue_data.consumer_user_data = consumer_user_data;
368 queue_data.streaming_mode = streaming_mode;
370 queue_data.data_queue = g_async_queue_new();
371 queue_data.free_queue = g_async_queue_new();
373 max_memory = MAX(1,MIN(max_memory, INT_MAX / 2));
374 queue_data.free_memory = semaphore_new_with_value(max_memory);
376 producer_thread = g_thread_create(do_producer_thread, &queue_data,
378 NULL /* FIXME: Should handle
380 consumer_thread = g_thread_create(do_consumer_thread, &queue_data,
382 NULL /* FIXME: Should handle
385 /* The order of cleanup here is very important, to avoid deadlock. */
386 /* 1) Reap the consumer. */
387 consumer_result = g_thread_join(consumer_thread);
388 /* 2) Stop the producer. */
389 semaphore_force_set(queue_data.free_memory, -1);
390 /* 3) Cleanup the free queue; add a signal flag. */
391 cleanup_buffer_queue(queue_data.free_queue, FALSE);
392 /* 4) Restart the producer (so it can exit). */
393 semaphore_force_set(queue_data.free_memory, INT_MAX);
394 /* 5) Reap the producer. */
395 producer_result = g_thread_join(producer_thread);
397 cleanup_buffer_queue(queue_data.free_queue, TRUE);
398 cleanup_buffer_queue(queue_data.data_queue, TRUE);
400 semaphore_free(queue_data.free_memory);
403 if (!GPOINTER_TO_INT(producer_result)) {
404 rval |= QUEUE_PRODUCER_ERROR;
406 if (!GPOINTER_TO_INT(consumer_result)) {
407 rval |= QUEUE_CONSUMER_ERROR;
412 /* Commonly-useful producers and consumers below. */
419 queue_fd_t *queue_fd;
421 queue_fd = malloc(sizeof(queue_fd_t));
423 queue_fd->errmsg = errmsg;
430 queue_fd_t *queue_fd)
435 char *queue_fd_errmsg(
436 queue_fd_t *queue_fd)
438 return queue_fd->errmsg;
441 producer_result_t fd_read_producer(gpointer f_queue_fd, queue_buffer_t *buffer,
444 queue_fd_t *queue_fd = (queue_fd_t *)f_queue_fd;
447 g_assert(buffer->data_size == 0);
451 if (buffer->data == NULL) {
452 /* Set up the buffer. */
453 buffer->data = malloc(hint_size);
454 buffer->alloc_size = hint_size;
459 result = read(fd, buffer->data, buffer->alloc_size);
462 buffer->data_size = result;
463 return PRODUCER_MORE;
464 } else if (result == 0) {
466 return PRODUCER_FINISHED;
472 || errno == EWOULDBLOCK
482 queue_fd->errmsg = newvstrallocf(queue_fd->errmsg,
483 "Error reading fd %d: %s\n", fd, strerror(errno));
484 return PRODUCER_ERROR;
489 ssize_t fd_write_consumer(gpointer f_queue_fd, queue_buffer_t *buffer) {
491 queue_fd_t *queue_fd = (queue_fd_t *)f_queue_fd;
496 g_return_val_if_fail(buffer->data_size > 0, 0);
500 write_size = write(fd, buffer->data + buffer->offset,
503 if (write_size > 0) {
510 || errno == EWOULDBLOCK
520 int save_errno = errno;
521 amfree(queue_fd->errmsg);
522 queue_fd->errmsg = g_strdup_printf("Error writing fd %d: %s", fd,
523 strerror(save_errno));
524 dbprintf("%s\n", queue_fd->errmsg);