2 * Copyright (c) 2005 Zmanda, Inc. All Rights Reserved.
4 * This library is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU Lesser General Public License version 2.1 as
6 * published by the Free Software Foundation.
8 * This library is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
11 * License for more details.
13 * You should have received a copy of the GNU Lesser General Public License
14 * along with this library; if not, write to the Free Software Foundation,
15 * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
17 * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120
18 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
23 #include "semaphore.h"
26 /* Queueing framework here. */
29 ProducerFunctor producer;
30 gpointer producer_user_data;
31 ConsumerFunctor consumer;
32 gpointer consumer_user_data;
33 GAsyncQueue *data_queue, *free_queue;
34 semaphore_t *free_memory;
35 StreamingRequirement streaming_mode;
38 static queue_buffer_t *invent_buffer(void) {
40 rval = malloc(sizeof(*rval));
50 void free_buffer(queue_buffer_t *buf) {
56 static queue_buffer_t * merge_buffers(queue_buffer_t *buf1,
57 queue_buffer_t *buf2) {
60 else if (buf2 == NULL)
63 if (buf2->offset >= buf1->data_size) {
64 /* We can fit buf1 at the beginning of buf2. */
65 memcpy(buf2->data + buf2->offset - buf1->data_size,
66 buf1->data + buf1->offset,
68 buf2->offset -= buf1->data_size;
69 buf2->data_size += buf1->data_size;
72 } else if (buf1->alloc_size - buf1->offset - buf1->data_size
74 /* We can fit buf2 at the end of buf1. */
75 memcpy(buf1->data + buf1->offset + buf1->data_size,
76 buf2->data + buf2->offset, buf2->data_size);
77 buf1->data_size += buf2->data_size;
81 /* We can grow buf1 and put everything there. */
82 if (buf1->offset != 0) {
83 /* But first we have to fix up buf1. */
84 memmove(buf1->data, buf1->data + buf1->offset, buf1->data_size);
87 buf1->alloc_size = buf1->data_size + buf2->data_size;
88 buf1->data = realloc(buf1->data, buf1->alloc_size);
89 memcpy(buf1->data + buf1->data_size, buf2->data + buf2->offset,
91 buf1->data_size = buf1->alloc_size;
97 /* Invalidate the first "bytes" bytes of the buffer, by adjusting the
98 offset and data size. */
99 static void consume_buffer(queue_buffer_t* buf, int bytes) {
100 buf->offset += bytes;
101 buf->data_size -= bytes;
104 /* Looks at the buffer to see how much free space it has. If it has more than
105 * twice the data size of unused space at the end, or more than four times
106 * the data size of unused space at the beginning, then that space is
108 static void heatshrink_buffer(queue_buffer_t *buf) {
112 if (G_UNLIKELY(buf->data_size * 4 > buf->offset)) {
113 /* Consolodate with memmove. We will reclaim the space in the next
115 memmove(buf->data, buf->data + buf->offset, buf->data_size);
119 if (buf->alloc_size > buf->data_size*2 + buf->offset) {
120 buf->alloc_size = buf->data_size + buf->offset;
121 buf->data = realloc(buf->data, buf->alloc_size);
125 static gpointer do_producer_thread(gpointer datap) {
126 queue_data_t* data = datap;
132 semaphore_decrement(data->free_memory, 0);
133 buf = g_async_queue_try_pop(data->free_queue);
134 if (buf != NULL && buf->data == NULL) {
135 /* Consumer is finished, then so are we. */
137 return GINT_TO_POINTER(TRUE);
141 buf = invent_buffer();
146 result = data->producer(data->producer_user_data, buf,
149 // Producers can allocate way too much memory.
150 heatshrink_buffer(buf);
152 if (buf->data_size > 0) {
153 semaphore_force_adjust(data->free_memory, -buf->alloc_size);
155 g_async_queue_push(data->data_queue, buf);
158 g_assert(result != PRODUCER_MORE);
164 if (result == PRODUCER_MORE) {
167 /* We are finished (and the first to do so). */
168 g_async_queue_push(data->data_queue, invent_buffer());
169 semaphore_force_set(data->free_memory, INT_MIN);
171 return GINT_TO_POINTER(result == PRODUCER_FINISHED);
176 static gpointer do_consumer_thread(gpointer datap) {
177 queue_data_t* data = datap;
178 gboolean finished = FALSE;
179 queue_buffer_t *buf = NULL;
181 if (data->streaming_mode != STREAMING_REQUIREMENT_NONE) {
182 semaphore_wait_empty(data->free_memory);
189 return GINT_TO_POINTER(TRUE);
192 while (buf == NULL || buf->data_size < data->block_size) {
193 queue_buffer_t *next_buf;
194 if (data->streaming_mode == STREAMING_REQUIREMENT_DESIRED) {
196 next_buf = g_async_queue_try_pop(data->data_queue);
197 if (next_buf == NULL) {
198 semaphore_wait_empty(data->free_memory);
200 } while (next_buf == NULL);
202 next_buf = g_async_queue_pop(data->data_queue);
203 g_assert(next_buf != NULL);
206 if (next_buf->data == NULL) {
207 /* Producer is finished, then so are we.*/
208 free_buffer(next_buf);
210 /* But we can't quit yet, we have a buffer to flush.*/
214 /* We are so outta here. */
215 return GINT_TO_POINTER(TRUE);
219 semaphore_increment(data->free_memory, next_buf->alloc_size);
221 buf = merge_buffers(buf, next_buf);
224 result = data->consumer(data->consumer_user_data, buf);
227 consume_buffer(buf, result);
228 if (buf->data_size == 0) {
229 g_async_queue_push(data->free_queue, buf);
235 return GINT_TO_POINTER(FALSE);
240 /* Empties a buffer queue and frees all the buffers associated with it.
242 * If full_cleanup is TRUE, then we delete the queue itself.
243 * If full_cleanup is FALSE, then we leave the queue around, with a
244 * signal element in it. */
245 static void cleanup_buffer_queue(GAsyncQueue *Q, gboolean full_cleanup) {
246 g_async_queue_lock(Q);
248 queue_buffer_t *buftmp;
249 buftmp = g_async_queue_try_pop_unlocked(Q);
256 g_async_queue_push_unlocked(Q, invent_buffer());
258 g_async_queue_unlock(Q);
261 g_async_queue_unref(Q);
264 /* This function sacrifices performance, but will still work just
265 fine, on systems where threads are not supported. */
266 static queue_result_flags
267 do_unthreaded_consumer_producer_queue(guint block_size,
268 ProducerFunctor producer,
269 gpointer producer_user_data,
270 ConsumerFunctor consumer,
271 gpointer consumer_user_data) {
272 queue_buffer_t *buf = NULL, *next_buf = NULL;
273 gboolean finished = FALSE;
274 queue_result_flags rval = 0;
276 /* The basic theory of operation here is to read until we have
277 enough data to write, then write until we don't.. */
281 while ((buf == NULL || buf->data_size < block_size) && !finished) {
282 if (next_buf == NULL)
283 next_buf = invent_buffer();
285 result = producer(producer_user_data, next_buf, block_size);
287 if (result != PRODUCER_MORE) {
289 if (result != PRODUCER_FINISHED) {
290 rval |= QUEUE_PRODUCER_ERROR;
294 buf = merge_buffers(buf, next_buf);
298 while (buf != NULL && buf->data_size > 0 &&
299 (buf->data_size >= block_size || finished)) {
300 result = consumer(consumer_user_data, buf);
303 consume_buffer(buf, result);
304 if (buf->data_size == 0) {
310 rval |= QUEUE_CONSUMER_ERROR;
317 free_buffer(next_buf);
321 gboolean do_consumer_producer_queue(ProducerFunctor producer,
322 gpointer producer_user_data,
323 ConsumerFunctor consumer,
324 gpointer consumer_user_data) {
325 return QUEUE_SUCCESS ==
326 do_consumer_producer_queue_full(producer, producer_user_data,
327 consumer, consumer_user_data,
328 0, DEFAULT_MAX_BUFFER_MEMORY,
329 STREAMING_REQUIREMENT_NONE);
333 do_consumer_producer_queue_full(ProducerFunctor producer,
334 gpointer producer_user_data,
335 ConsumerFunctor consumer,
336 gpointer consumer_user_data,
339 StreamingRequirement streaming_mode) {
340 GThread * producer_thread;
341 GThread * consumer_thread;
342 queue_data_t queue_data;
343 gpointer producer_result;
344 gpointer consumer_result;
345 queue_result_flags rval;
347 if (block_size <= 0) {
348 block_size = DISK_BLOCK_BYTES;
351 g_return_val_if_fail(producer != NULL, FALSE);
352 g_return_val_if_fail(consumer != NULL, FALSE);
354 if (!g_thread_supported()) {
355 return do_unthreaded_consumer_producer_queue(block_size, producer,
361 queue_data.block_size = block_size;
362 queue_data.producer = producer;
363 queue_data.producer_user_data = producer_user_data;
364 queue_data.consumer = consumer;
365 queue_data.consumer_user_data = consumer_user_data;
366 queue_data.streaming_mode = streaming_mode;
368 queue_data.data_queue = g_async_queue_new();
369 queue_data.free_queue = g_async_queue_new();
371 max_memory = MAX(1,MIN(max_memory, INT_MAX / 2));
372 queue_data.free_memory = semaphore_new_with_value(max_memory);
374 producer_thread = g_thread_create(do_producer_thread, &queue_data,
376 NULL /* FIXME: Should handle
378 consumer_thread = g_thread_create(do_consumer_thread, &queue_data,
380 NULL /* FIXME: Should handle
383 /* The order of cleanup here is very important, to avoid deadlock. */
384 /* 1) Reap the consumer. */
385 consumer_result = g_thread_join(consumer_thread);
386 /* 2) Stop the producer. */
387 semaphore_force_set(queue_data.free_memory, -1);
388 /* 3) Cleanup the free queue; add a signal flag. */
389 cleanup_buffer_queue(queue_data.free_queue, FALSE);
390 /* 4) Restart the producer (so it can exit). */
391 semaphore_force_set(queue_data.free_memory, INT_MAX);
392 /* 5) Reap the producer. */
393 producer_result = g_thread_join(producer_thread);
395 cleanup_buffer_queue(queue_data.free_queue, TRUE);
396 cleanup_buffer_queue(queue_data.data_queue, TRUE);
398 semaphore_free(queue_data.free_memory);
401 if (!GPOINTER_TO_INT(producer_result)) {
402 rval |= QUEUE_PRODUCER_ERROR;
404 if (!GPOINTER_TO_INT(consumer_result)) {
405 rval |= QUEUE_CONSUMER_ERROR;
410 /* Commonly-useful producers and consumers below. */
412 producer_result_t device_read_producer(gpointer devicep,
413 queue_buffer_t *buffer,
414 int hint_size G_GNUC_UNUSED) {
417 device = (Device*) devicep;
418 g_assert(IS_DEVICE(device));
422 int result, read_size;
423 read_size = buffer->alloc_size;
424 result = device_read_block(device, buffer->data, &read_size);
426 buffer->data_size = read_size;
427 return PRODUCER_MORE;
428 } else if (result == 0) {
429 buffer->data = realloc(buffer->data, read_size);
430 buffer->alloc_size = read_size;
431 } else if (device->is_eof) {
432 return PRODUCER_FINISHED;
434 buffer->data_size = 0;
435 return PRODUCER_ERROR;
440 int device_write_consumer(gpointer devicep, queue_buffer_t *buffer) {
442 unsigned int write_size;
443 static gboolean wrote_blocksize = FALSE;
445 device = (Device*) devicep;
446 g_assert(IS_DEVICE(device));
447 write_size = MIN(buffer->data_size,
448 device_write_max_size(device));
450 if (!wrote_blocksize) {
451 wrote_blocksize = TRUE;
452 dbprintf("USING BLOCKSIZE %d bytes\n", write_size);
455 if (device_write_block(device, write_size,
456 buffer->data + buffer->offset,
458 device_write_min_size(device))) {
462 /* Nope, really an error. */
467 producer_result_t fd_read_producer(gpointer fdp, queue_buffer_t *buffer,
471 fd = GPOINTER_TO_INT(fdp);
473 g_assert(buffer->data_size == 0);
477 if (buffer->data == NULL) {
478 /* Set up the buffer. */
479 buffer->data = malloc(hint_size);
480 buffer->alloc_size = hint_size;
485 result = read(fd, buffer->data, buffer->alloc_size);
488 buffer->data_size = result;
489 return PRODUCER_MORE;
490 } else if (result == 0) {
492 return PRODUCER_FINISHED;
498 || errno == EWOULDBLOCK
508 g_fprintf(stderr, "Error reading fd %d: %s\n", fd, strerror(errno));
509 return PRODUCER_ERROR;
514 int fd_write_consumer(gpointer fdp, queue_buffer_t *buffer) {
517 fd = GPOINTER_TO_INT(fdp);
520 g_return_val_if_fail(buffer->data_size > 0, 0);
524 write_size = write(fd, buffer->data + buffer->offset,
527 if (write_size > 0) {
534 || errno == EWOULDBLOCK
544 g_fprintf(stderr, "Error writing fd %d: %s\n", fd, strerror(errno));