Imported Upstream version 3.1.0
[debian/amanda] / common-src / queueing.c
1 /*
2  * Copyright (c) 2008,2009 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  *
17  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 #include "queueing.h"
22 #include "semaphore.h"
23 #include "amanda.h"
24
25 /* Queueing framework here. */
26 typedef struct {
27     size_t block_size;
28     StreamingRequirement streaming_mode;
29
30     ProducerFunctor producer;
31     gpointer producer_user_data;
32
33     ConsumerFunctor consumer;
34     gpointer consumer_user_data;
35
36     GAsyncQueue *data_queue, *free_queue;
37     semaphore_t *free_memory;
38 } queue_data_t;
39
40 static queue_buffer_t *invent_buffer(void) {
41     queue_buffer_t *rval;
42     rval = g_new(queue_buffer_t, 1);
43
44     rval->data = NULL;
45     rval->alloc_size = 0;
46     rval->data_size = 0;
47     rval->offset = 0;
48
49     return rval;
50 }
51
52 void free_buffer(queue_buffer_t *buf) {
53     if (buf != NULL)
54         amfree(buf->data);
55     amfree(buf);
56 }
57
58 static queue_buffer_t * merge_buffers(queue_buffer_t *buf1,
59                                       queue_buffer_t *buf2) {
60     if (buf1 == NULL)
61         return buf2;
62     else if (buf2 == NULL)
63         return buf1;
64
65     if (buf2->offset >= buf1->data_size) {
66         /* We can fit buf1 at the beginning of buf2. */
67         memcpy(buf2->data + buf2->offset - buf1->data_size,
68                buf1->data + buf1->offset,
69                buf1->data_size);
70         buf2->offset -= buf1->data_size;
71         buf2->data_size += buf1->data_size;
72         free_buffer(buf1);
73         return buf2;
74     } else if (buf1->alloc_size - buf1->offset - buf1->data_size
75                >= buf2->data_size) {
76         /* We can fit buf2 at the end of buf1. */
77         memcpy(buf1->data + buf1->offset + buf1->data_size,
78                buf2->data + buf2->offset, buf2->data_size);
79         buf1->data_size += buf2->data_size;
80         free_buffer(buf2);
81         return buf1;
82     } else {
83         /* We can grow buf1 and put everything there. */
84         if (buf1->offset != 0) {
85             /* But first we have to fix up buf1. */
86             memmove(buf1->data, buf1->data + buf1->offset, buf1->data_size);
87             buf1->offset = 0;
88         }
89         buf1->alloc_size = buf1->data_size + buf2->data_size;
90         buf1->data = realloc(buf1->data, buf1->alloc_size);
91         memcpy(buf1->data + buf1->data_size, buf2->data + buf2->offset,
92                buf2->data_size);
93         buf1->data_size = buf1->alloc_size;
94         free_buffer(buf2);
95         return buf1;
96     }
97 }
98
99 /* Invalidate the first "bytes" bytes of the buffer, by adjusting the
100    offset and data size. */
101 static void consume_buffer(queue_buffer_t* buf, ssize_t bytes) {
102     g_assert(bytes >= 0 && bytes <= (ssize_t)buf->data_size);
103     buf->offset += bytes;
104     buf->data_size -= bytes;
105 }
106
107 /* Looks at the buffer to see how much free space it has. If it has more than
108  * twice the data size of unused space at the end, or more than four times
109  * the data size of unused space at the beginning, then that space is
110  * reclaimed. */
111 static void heatshrink_buffer(queue_buffer_t *buf) {
112     if (buf == NULL)
113         return;
114
115     if (G_UNLIKELY(buf->offset > buf->data_size * 4)) {
116         /* Consolodate with memmove. We will reclaim the space in the next
117          * step. */
118         memmove(buf->data, buf->data + buf->offset, buf->data_size);
119         buf->offset = 0;
120     } 
121
122     if (buf->alloc_size > buf->data_size*2 + buf->offset) {
123         buf->alloc_size = buf->data_size + buf->offset;
124         buf->data = realloc(buf->data, buf->alloc_size);
125     }
126 }
127
128 static gpointer do_producer_thread(gpointer datap) {
129     queue_data_t* data = datap;
130
131     for (;;) {
132         queue_buffer_t *buf;
133         gboolean result;
134
135         semaphore_decrement(data->free_memory, 0);
136         buf = g_async_queue_try_pop(data->free_queue);
137         if (buf != NULL && buf->data == NULL) {
138             /* Consumer is finished, then so are we. */
139             amfree(buf);
140             return GINT_TO_POINTER(TRUE);
141         }
142
143         if (buf == NULL) {
144             buf = invent_buffer();
145         }
146         buf->offset = 0;
147         buf->data_size = 0;
148
149         result = data->producer(data->producer_user_data, buf,
150                                 data->block_size);
151
152         // Producers can allocate way too much memory.
153         heatshrink_buffer(buf);
154
155         if (buf->data_size > 0) {
156             semaphore_force_adjust(data->free_memory, -buf->alloc_size);
157             
158             g_async_queue_push(data->data_queue, buf);
159             buf = NULL;
160         } else {
161             g_assert(result != PRODUCER_MORE);
162             free_buffer(buf);
163             buf = NULL;
164         }
165
166
167         if (result == PRODUCER_MORE) {
168             continue;
169         } else {
170             /* We are finished (and the first to do so). */
171             g_async_queue_push(data->data_queue, invent_buffer());
172             semaphore_force_set(data->free_memory, INT_MIN);
173
174             return GINT_TO_POINTER(result == PRODUCER_FINISHED);
175         }
176     }
177 }
178
179 static gpointer do_consumer_thread(gpointer datap) {
180     queue_data_t* data = datap;
181     gboolean got_eof = FALSE;
182     queue_buffer_t *buf = NULL;
183
184     if (data->streaming_mode != STREAMING_REQUIREMENT_NONE) {
185         semaphore_wait_empty(data->free_memory);
186     }
187
188     for (;;) {
189         gboolean result;
190
191         /* Pull in and merge buffers until we have at least data->block_size
192          * bytes, or there are no more buffers */
193         while (!got_eof && (buf == NULL || buf->data_size < data->block_size)) {
194             queue_buffer_t *next_buf;
195             if (data->streaming_mode == STREAMING_REQUIREMENT_DESIRED) {
196                 do {
197                     next_buf = g_async_queue_try_pop(data->data_queue);
198                     if (next_buf == NULL) {
199                         semaphore_wait_empty(data->free_memory);
200                     }
201                 } while (next_buf == NULL);
202             } else {
203                 next_buf = g_async_queue_pop(data->data_queue);
204                 g_assert(next_buf != NULL);
205             }
206
207             if (next_buf->data == NULL) {
208                 /* A buffer with NULL data is an EOF from the producer */
209                 free_buffer(next_buf);
210                 got_eof = TRUE;
211                 break;
212             }
213
214             semaphore_increment(data->free_memory, next_buf->alloc_size);
215
216             buf = merge_buffers(buf, next_buf);
217         }
218
219         /* If we're out of data, then we are done. */
220         if (buf == NULL)
221             break;
222
223         result = data->consumer(data->consumer_user_data, buf);
224
225         if (result > 0) {
226             consume_buffer(buf, result);
227             if (buf->data_size == 0) {
228                 g_async_queue_push(data->free_queue, buf);
229                 buf = NULL;
230             }
231             continue;
232         } else {
233             free_buffer(buf);
234             return GINT_TO_POINTER(FALSE);
235         }
236     }
237
238     /* We are so outta here. */
239     return GINT_TO_POINTER(TRUE);
240 }
241
242 /* Empties a buffer queue and frees all the buffers associated with it.
243  *
244  * If full_cleanup is TRUE, then we delete the queue itself.
245  * If full_cleanup is FALSE, then we leave the queue around, with a
246  *         signal element in it. */
247 static void cleanup_buffer_queue(GAsyncQueue *Q, gboolean full_cleanup) {
248     g_async_queue_lock(Q);
249     for (;;) {
250         queue_buffer_t *buftmp;
251         buftmp = g_async_queue_try_pop_unlocked(Q);
252         if (buftmp == NULL)
253             break;
254
255         free_buffer(buftmp);
256     }
257     if (!full_cleanup)
258         g_async_queue_push_unlocked(Q, invent_buffer());
259
260     g_async_queue_unlock(Q);
261     
262     if (full_cleanup)
263         g_async_queue_unref(Q);
264 }
265
266 /* This function sacrifices performance, but will still work just
267    fine, on systems where threads are not supported. */
268 static queue_result_flags
269 do_unthreaded_consumer_producer_queue(size_t block_size,
270                                       ProducerFunctor producer,
271                                       gpointer producer_user_data,
272                                       ConsumerFunctor consumer,
273                                       gpointer consumer_user_data) {
274     queue_buffer_t *buf = NULL, *next_buf = NULL;
275     gboolean finished = FALSE;
276     queue_result_flags rval = 0;
277
278     /* The basic theory of operation here is to read until we have
279        enough data to write, then write until we don't.. */
280     while (!finished) {
281         producer_result_t result;
282         
283         while ((buf == NULL || buf->data_size < block_size) && !finished) {
284             if (next_buf == NULL)
285                 next_buf = invent_buffer();
286
287             result = producer(producer_user_data, next_buf, block_size);
288
289             if (result != PRODUCER_MORE) {
290                 finished = TRUE;
291                 if (result != PRODUCER_FINISHED) {
292                     rval |= QUEUE_PRODUCER_ERROR;
293                 }
294             }
295
296             buf = merge_buffers(buf, next_buf);
297             next_buf = NULL;
298         }
299
300         while (buf != NULL && buf->data_size > 0 &&
301                (buf->data_size >= block_size || finished)) {
302             result = consumer(consumer_user_data, buf);
303             
304             if (result > 0) {
305                 consume_buffer(buf, result);
306                 if (buf->data_size == 0) {
307                     next_buf = buf;
308                     buf = NULL;
309                 }
310             } else {
311                 finished = TRUE;
312                 rval |= QUEUE_CONSUMER_ERROR;
313                 break;
314             }
315         }
316     }
317
318     free_buffer(buf);
319     free_buffer(next_buf);
320     return rval;
321 }
322
323 gboolean do_consumer_producer_queue(ProducerFunctor producer,
324                                     gpointer producer_user_data,
325                                     ConsumerFunctor consumer,
326                                     gpointer consumer_user_data) {
327     return QUEUE_SUCCESS ==
328         do_consumer_producer_queue_full(producer, producer_user_data,
329                                         consumer, consumer_user_data,
330                                         0, DEFAULT_MAX_BUFFER_MEMORY,
331                                         STREAMING_REQUIREMENT_NONE);
332 }
333
334 queue_result_flags
335 do_consumer_producer_queue_full(ProducerFunctor producer,
336                                 gpointer producer_user_data,
337                                 ConsumerFunctor consumer,
338                                 gpointer consumer_user_data,
339                                 size_t block_size,
340                                 size_t max_memory,
341                                 StreamingRequirement streaming_mode) {
342     GThread     * producer_thread;
343     GThread     * consumer_thread;
344     queue_data_t  queue_data;
345     gpointer      producer_result;
346     gpointer      consumer_result;
347     queue_result_flags rval;
348
349     if (block_size <= 0) {
350         block_size = DISK_BLOCK_BYTES;
351     }
352
353     g_return_val_if_fail(producer != NULL, FALSE);
354     g_return_val_if_fail(consumer != NULL, FALSE);
355
356     if (!g_thread_supported()) {
357         return do_unthreaded_consumer_producer_queue(block_size, producer,
358                                                      producer_user_data,
359                                                      consumer,
360                                                      consumer_user_data);
361     }
362
363     queue_data.block_size = block_size;
364     queue_data.producer = producer;
365     queue_data.producer_user_data = producer_user_data;
366     queue_data.consumer = consumer;
367     queue_data.consumer_user_data = consumer_user_data;
368     queue_data.streaming_mode = streaming_mode;
369
370     queue_data.data_queue = g_async_queue_new();
371     queue_data.free_queue = g_async_queue_new();
372
373     max_memory = MAX(1,MIN(max_memory, INT_MAX / 2));
374     queue_data.free_memory = semaphore_new_with_value(max_memory);
375
376     producer_thread = g_thread_create(do_producer_thread, &queue_data,
377                                       TRUE,
378                                       NULL /* FIXME: Should handle
379                                               errors. */);
380     consumer_thread = g_thread_create(do_consumer_thread, &queue_data,
381                                       TRUE,
382                                       NULL /* FIXME: Should handle
383                                               errors. */);
384     
385     /* The order of cleanup here is very important, to avoid deadlock. */
386     /* 1) Reap the consumer. */
387     consumer_result = g_thread_join(consumer_thread);
388     /* 2) Stop the producer. */
389     semaphore_force_set(queue_data.free_memory, -1);
390     /* 3) Cleanup the free queue; add a signal flag. */
391     cleanup_buffer_queue(queue_data.free_queue, FALSE);
392     /* 4) Restart the producer (so it can exit). */
393     semaphore_force_set(queue_data.free_memory, INT_MAX);
394     /* 5) Reap the producer. */
395     producer_result = g_thread_join(producer_thread);
396
397     cleanup_buffer_queue(queue_data.free_queue, TRUE);
398     cleanup_buffer_queue(queue_data.data_queue, TRUE);
399
400     semaphore_free(queue_data.free_memory);
401     
402     rval = 0;
403     if (!GPOINTER_TO_INT(producer_result)) {
404         rval |= QUEUE_PRODUCER_ERROR;
405     }
406     if (!GPOINTER_TO_INT(consumer_result)) {
407         rval |= QUEUE_CONSUMER_ERROR;
408     }
409     return rval;
410 }
411
412 /* Commonly-useful producers and consumers below. */
413
414 queue_fd_t *
415 queue_fd_new(
416     int fd,
417     char *errmsg)
418 {
419     queue_fd_t *queue_fd;
420
421     queue_fd = malloc(sizeof(queue_fd_t));
422     queue_fd->fd = fd;
423     queue_fd->errmsg = errmsg;
424
425     return queue_fd;
426 }
427
428 int
429 queue_fd_fd(
430     queue_fd_t *queue_fd)
431 {
432     return queue_fd->fd;
433 }
434
435 char *queue_fd_errmsg(
436     queue_fd_t *queue_fd)
437 {
438     return queue_fd->errmsg;
439 }
440
441 producer_result_t fd_read_producer(gpointer f_queue_fd, queue_buffer_t *buffer,
442                                    size_t hint_size) {
443     int fd;
444     queue_fd_t *queue_fd = (queue_fd_t *)f_queue_fd;
445     fd = queue_fd->fd;
446     g_assert(fd >= 0);
447     g_assert(buffer->data_size == 0);
448
449     buffer->offset = 0;
450
451     if (buffer->data == NULL) {
452         /* Set up the buffer. */
453         buffer->data = malloc(hint_size);
454         buffer->alloc_size = hint_size;
455     }
456
457     for (;;) {
458         ssize_t result;
459         result = read(fd, buffer->data, buffer->alloc_size);
460
461         if (result > 0) {
462             buffer->data_size = result;
463             return PRODUCER_MORE;
464         } else if (result == 0) {
465             /* End of file. */
466             return PRODUCER_FINISHED;
467         } else if (0
468 #ifdef EAGAIN
469                 || errno == EAGAIN
470 #endif
471 #ifdef EWOULDBLOCK
472                 || errno == EWOULDBLOCK
473 #endif
474 #ifdef EINTR
475                 || errno == EINTR
476 #endif
477                 ) {
478                 /* Try again. */
479                 continue;
480         } else {
481             /* Error occured. */
482             queue_fd->errmsg = newvstrallocf(queue_fd->errmsg,
483                 "Error reading fd %d: %s\n", fd, strerror(errno));
484             return PRODUCER_ERROR;
485         }
486     }
487 }
488
489 ssize_t fd_write_consumer(gpointer f_queue_fd, queue_buffer_t *buffer) {
490     int fd;
491     queue_fd_t *queue_fd = (queue_fd_t *)f_queue_fd;
492     fd = queue_fd->fd;
493
494     g_assert(fd >= 0);
495
496     g_return_val_if_fail(buffer->data_size > 0, 0);
497
498     for (;;) {
499         ssize_t write_size;
500         write_size = write(fd, buffer->data + buffer->offset,
501                            buffer->data_size);
502         
503         if (write_size > 0) {
504             return write_size;
505         } else if (0
506 #ifdef EAGAIN
507                 || errno == EAGAIN
508 #endif
509 #ifdef EWOULDBLOCK
510                 || errno == EWOULDBLOCK
511 #endif
512 #ifdef EINTR
513                 || errno == EINTR
514 #endif
515                 ) {
516                 /* Try again. */
517                 continue;
518         } else {
519             /* Error occured. */
520             int save_errno = errno;
521             amfree(queue_fd->errmsg);
522             queue_fd->errmsg = g_strdup_printf("Error writing fd %d: %s", fd,
523                                                strerror(save_errno));
524             dbprintf("%s\n", queue_fd->errmsg);
525             return -1;
526         }        
527     }
528 }