Imported Upstream version 2.6.0p2
[debian/amanda] / device-src / queueing.c
1 /*
2  * Copyright (c) 2005 Zmanda, Inc.  All Rights Reserved.
3  * 
4  * This library is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU Lesser General Public License version 2.1 as 
6  * published by the Free Software Foundation.
7  * 
8  * This library is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
11  * License for more details.
12  * 
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this library; if not, write to the Free Software Foundation,
15  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
16  * 
17  * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 #include "queueing.h"
22 #include "device.h"
23 #include "semaphore.h"
24 #include "amanda.h"
25
26 /* Queueing framework here. */
27 typedef struct {
28     guint block_size;
29     ProducerFunctor producer;
30     gpointer producer_user_data;
31     ConsumerFunctor consumer;
32     gpointer consumer_user_data;
33     GAsyncQueue *data_queue, *free_queue;
34     semaphore_t *free_memory;
35     StreamingRequirement streaming_mode;
36 } queue_data_t;
37
38 static queue_buffer_t *invent_buffer(void) {
39     queue_buffer_t *rval;
40     rval = malloc(sizeof(*rval));
41
42     rval->data = NULL;
43     rval->alloc_size = 0;
44     rval->data_size = 0;
45     rval->offset = 0;
46
47     return rval;
48 }
49
50 void free_buffer(queue_buffer_t *buf) {
51     if (buf != NULL)
52         amfree(buf->data);
53     amfree(buf);
54 }
55
56 static queue_buffer_t * merge_buffers(queue_buffer_t *buf1,
57                                       queue_buffer_t *buf2) {
58     if (buf1 == NULL)
59         return buf2;
60     else if (buf2 == NULL)
61         return buf1;
62
63     if (buf2->offset >= buf1->data_size) {
64         /* We can fit buf1 at the beginning of buf2. */
65         memcpy(buf2->data + buf2->offset - buf1->data_size,
66                buf1->data + buf1->offset,
67                buf1->data_size);
68         buf2->offset -= buf1->data_size;
69         buf2->data_size += buf1->data_size;
70         free_buffer(buf1);
71         return buf2;
72     } else if (buf1->alloc_size - buf1->offset - buf1->data_size
73                >= buf2->data_size) {
74         /* We can fit buf2 at the end of buf1. */
75         memcpy(buf1->data + buf1->offset + buf1->data_size,
76                buf2->data + buf2->offset, buf2->data_size);
77         buf1->data_size += buf2->data_size;
78         free_buffer(buf2);
79         return buf1;
80     } else {
81         /* We can grow buf1 and put everything there. */
82         if (buf1->offset != 0) {
83             /* But first we have to fix up buf1. */
84             memmove(buf1->data, buf1->data + buf1->offset, buf1->data_size);
85             buf1->offset = 0;
86         }
87         buf1->alloc_size = buf1->data_size + buf2->data_size;
88         buf1->data = realloc(buf1->data, buf1->alloc_size);
89         memcpy(buf1->data + buf1->data_size, buf2->data + buf2->offset,
90                buf2->data_size);
91         buf1->data_size = buf1->alloc_size;
92         free_buffer(buf2);
93         return buf1;
94     }
95 }
96
97 /* Invalidate the first "bytes" bytes of the buffer, by adjusting the
98    offset and data size. */
99 static void consume_buffer(queue_buffer_t* buf, int bytes) {
100     buf->offset += bytes;
101     buf->data_size -= bytes;
102 }
103
104 /* Looks at the buffer to see how much free space it has. If it has more than
105  * twice the data size of unused space at the end, or more than four times
106  * the data size of unused space at the beginning, then that space is
107  * reclaimed. */
108 static void heatshrink_buffer(queue_buffer_t *buf) {
109     if (buf == NULL)
110         return;
111
112     if (G_UNLIKELY(buf->data_size * 4 > buf->offset)) {
113         /* Consolodate with memmove. We will reclaim the space in the next
114          * step. */
115         memmove(buf->data, buf->data + buf->offset, buf->data_size);
116         buf->offset = 0;
117     } 
118
119     if (buf->alloc_size > buf->data_size*2 + buf->offset) {
120         buf->alloc_size = buf->data_size + buf->offset;
121         buf->data = realloc(buf->data, buf->alloc_size);
122     }
123 }
124
125 static gpointer do_producer_thread(gpointer datap) {
126     queue_data_t* data = datap;
127
128     for (;;) {
129         queue_buffer_t *buf;
130         gboolean result;
131
132         semaphore_decrement(data->free_memory, 0);
133         buf = g_async_queue_try_pop(data->free_queue);
134         if (buf != NULL && buf->data == NULL) {
135             /* Consumer is finished, then so are we. */
136             amfree(buf);
137             return GINT_TO_POINTER(TRUE);
138         }
139
140         if (buf == NULL) {
141             buf = invent_buffer();
142         }
143         buf->offset = 0;
144         buf->data_size = 0;
145
146         result = data->producer(data->producer_user_data, buf,
147                                 data->block_size);
148
149         // Producers can allocate way too much memory.
150         heatshrink_buffer(buf);
151
152         if (buf->data_size > 0) {
153             semaphore_force_adjust(data->free_memory, -buf->alloc_size);
154             
155             g_async_queue_push(data->data_queue, buf);
156             buf = NULL;
157         } else {
158             g_assert(result != PRODUCER_MORE);
159             free_buffer(buf);
160             buf = NULL;
161         }
162
163
164         if (result == PRODUCER_MORE) {
165             continue;
166         } else {
167             /* We are finished (and the first to do so). */
168             g_async_queue_push(data->data_queue, invent_buffer());
169             semaphore_force_set(data->free_memory, INT_MIN);
170
171             return GINT_TO_POINTER(result == PRODUCER_FINISHED);
172         }
173     }
174 }
175
176 static gpointer do_consumer_thread(gpointer datap) {
177     queue_data_t* data = datap;
178     gboolean finished = FALSE;
179     queue_buffer_t *buf = NULL;
180
181     if (data->streaming_mode != STREAMING_REQUIREMENT_NONE) {
182         semaphore_wait_empty(data->free_memory);
183     }
184
185     for (;;) {
186         gboolean result;
187
188         if (finished) {
189             return GINT_TO_POINTER(TRUE);
190         }
191
192         while (buf == NULL || buf->data_size < data->block_size) {
193             queue_buffer_t *next_buf;
194             if (data->streaming_mode == STREAMING_REQUIREMENT_DESIRED) {
195                 do {
196                     next_buf = g_async_queue_try_pop(data->data_queue);
197                     if (next_buf == NULL) {
198                         semaphore_wait_empty(data->free_memory);
199                     }
200                 } while (next_buf == NULL);
201             } else {
202                 next_buf = g_async_queue_pop(data->data_queue);
203                 g_assert(next_buf != NULL);
204             }
205
206             if (next_buf->data == NULL) {
207                 /* Producer is finished, then so are we.*/
208                 free_buffer(next_buf);
209                 if (buf != NULL) {
210                     /* But we can't quit yet, we have a buffer to flush.*/
211                     finished = TRUE;
212                     break;
213                 } else {
214                     /* We are so outta here. */
215                     return GINT_TO_POINTER(TRUE);
216                 }            
217             }
218
219             semaphore_increment(data->free_memory, next_buf->alloc_size);
220             
221             buf = merge_buffers(buf, next_buf);
222         }
223
224         result = data->consumer(data->consumer_user_data, buf);
225
226         if (result > 0) {
227             consume_buffer(buf, result);
228             if (buf->data_size == 0) {
229                 g_async_queue_push(data->free_queue, buf);
230                 buf = NULL;
231             }
232             continue;
233         } else {
234             free_buffer(buf);
235             return GINT_TO_POINTER(FALSE);
236         }
237     }
238 }
239
240 /* Empties a buffer queue and frees all the buffers associated with it.
241  *
242  * If full_cleanup is TRUE, then we delete the queue itself.
243  * If full_cleanup is FALSE, then we leave the queue around, with a
244  *         signal element in it. */
245 static void cleanup_buffer_queue(GAsyncQueue *Q, gboolean full_cleanup) {
246     g_async_queue_lock(Q);
247     for (;;) {
248         queue_buffer_t *buftmp;
249         buftmp = g_async_queue_try_pop_unlocked(Q);
250         if (buftmp == NULL)
251             break;
252
253         free_buffer(buftmp);
254     }
255     if (!full_cleanup)
256         g_async_queue_push_unlocked(Q, invent_buffer());
257
258     g_async_queue_unlock(Q);
259     
260     if (full_cleanup)
261         g_async_queue_unref(Q);
262 }
263
264 /* This function sacrifices performance, but will still work just
265    fine, on systems where threads are not supported. */
266 static queue_result_flags
267 do_unthreaded_consumer_producer_queue(guint block_size,
268                                       ProducerFunctor producer,
269                                       gpointer producer_user_data,
270                                       ConsumerFunctor consumer,
271                                       gpointer consumer_user_data) {
272     queue_buffer_t *buf = NULL, *next_buf = NULL;
273     gboolean finished = FALSE;
274     queue_result_flags rval = 0;
275
276     /* The basic theory of operation here is to read until we have
277        enough data to write, then write until we don't.. */
278     while (!finished) {
279         int result;
280         
281         while ((buf == NULL || buf->data_size < block_size) && !finished) {
282             if (next_buf == NULL)
283                 next_buf = invent_buffer();
284
285             result = producer(producer_user_data, next_buf, block_size);
286
287             if (result != PRODUCER_MORE) {
288                 finished = TRUE;
289                 if (result != PRODUCER_FINISHED) {
290                     rval |= QUEUE_PRODUCER_ERROR;
291                 }
292             }
293
294             buf = merge_buffers(buf, next_buf);
295             next_buf = NULL;
296         }
297
298         while (buf != NULL && buf->data_size > 0 &&
299                (buf->data_size >= block_size || finished)) {
300             result = consumer(consumer_user_data, buf);
301             
302             if (result > 0) {
303                 consume_buffer(buf, result);
304                 if (buf->data_size == 0) {
305                     next_buf = buf;
306                     buf = NULL;
307                 }
308             } else {
309                 finished = TRUE;
310                 rval |= QUEUE_CONSUMER_ERROR;
311                 break;
312             }
313         }
314     }
315
316     free_buffer(buf);
317     free_buffer(next_buf);
318     return rval;
319 }
320
321 gboolean do_consumer_producer_queue(ProducerFunctor producer,
322                                     gpointer producer_user_data,
323                                     ConsumerFunctor consumer,
324                                     gpointer consumer_user_data) {
325     return QUEUE_SUCCESS ==
326         do_consumer_producer_queue_full(producer, producer_user_data,
327                                         consumer, consumer_user_data,
328                                         0, DEFAULT_MAX_BUFFER_MEMORY,
329                                         STREAMING_REQUIREMENT_NONE);
330 }
331
332 queue_result_flags
333 do_consumer_producer_queue_full(ProducerFunctor producer,
334                                 gpointer producer_user_data,
335                                 ConsumerFunctor consumer,
336                                 gpointer consumer_user_data,
337                                 int block_size,
338                                 size_t max_memory,
339                                 StreamingRequirement streaming_mode) {
340     GThread     * producer_thread;
341     GThread     * consumer_thread;
342     queue_data_t  queue_data;
343     gpointer      producer_result;
344     gpointer      consumer_result;
345     queue_result_flags rval;
346
347     if (block_size <= 0) {
348         block_size = DISK_BLOCK_BYTES;
349     }
350
351     g_return_val_if_fail(producer != NULL, FALSE);
352     g_return_val_if_fail(consumer != NULL, FALSE);
353
354     if (!g_thread_supported()) {
355         return do_unthreaded_consumer_producer_queue(block_size, producer,
356                                                      producer_user_data,
357                                                      consumer,
358                                                      consumer_user_data);
359     }
360
361     queue_data.block_size = block_size;
362     queue_data.producer = producer;
363     queue_data.producer_user_data = producer_user_data;
364     queue_data.consumer = consumer;
365     queue_data.consumer_user_data = consumer_user_data;
366     queue_data.streaming_mode = streaming_mode;
367
368     queue_data.data_queue = g_async_queue_new();
369     queue_data.free_queue = g_async_queue_new();
370
371     max_memory = MAX(1,MIN(max_memory, INT_MAX / 2));
372     queue_data.free_memory = semaphore_new_with_value(max_memory);
373
374     producer_thread = g_thread_create(do_producer_thread, &queue_data,
375                                       TRUE,
376                                       NULL /* FIXME: Should handle
377                                               errors. */);
378     consumer_thread = g_thread_create(do_consumer_thread, &queue_data,
379                                       TRUE,
380                                       NULL /* FIXME: Should handle
381                                               errors. */);
382     
383     /* The order of cleanup here is very important, to avoid deadlock. */
384     /* 1) Reap the consumer. */
385     consumer_result = g_thread_join(consumer_thread);
386     /* 2) Stop the producer. */
387     semaphore_force_set(queue_data.free_memory, -1);
388     /* 3) Cleanup the free queue; add a signal flag. */
389     cleanup_buffer_queue(queue_data.free_queue, FALSE);
390     /* 4) Restart the producer (so it can exit). */
391     semaphore_force_set(queue_data.free_memory, INT_MAX);
392     /* 5) Reap the producer. */
393     producer_result = g_thread_join(producer_thread);
394
395     cleanup_buffer_queue(queue_data.free_queue, TRUE);
396     cleanup_buffer_queue(queue_data.data_queue, TRUE);
397
398     semaphore_free(queue_data.free_memory);
399     
400     rval = 0;
401     if (!GPOINTER_TO_INT(producer_result)) {
402         rval |= QUEUE_PRODUCER_ERROR;
403     }
404     if (!GPOINTER_TO_INT(consumer_result)) {
405         rval |= QUEUE_CONSUMER_ERROR;
406     }
407     return rval;
408 }
409
410 /* Commonly-useful producers and consumers below. */
411
412 producer_result_t device_read_producer(gpointer devicep,
413                                        queue_buffer_t *buffer,
414                                        int hint_size G_GNUC_UNUSED) {
415     Device* device;
416
417     device = (Device*) devicep;
418     g_assert(IS_DEVICE(device));
419
420     buffer->offset = 0;
421     for (;;) {
422         int result, read_size;
423         read_size = buffer->alloc_size;
424         result = device_read_block(device, buffer->data, &read_size);
425         if (result > 0) {
426             buffer->data_size = read_size;
427             return PRODUCER_MORE;
428         } else if (result == 0) {
429             buffer->data = realloc(buffer->data, read_size);
430             buffer->alloc_size = read_size;
431         } else if (device->is_eof) {
432             return PRODUCER_FINISHED;
433         } else {
434             buffer->data_size = 0;
435             return PRODUCER_ERROR;
436         }
437     }
438 }
439
440 int device_write_consumer(gpointer devicep, queue_buffer_t *buffer) {
441     Device* device;
442     unsigned int write_size;
443     static gboolean wrote_blocksize = FALSE;
444
445     device = (Device*) devicep;
446     g_assert(IS_DEVICE(device));
447     write_size = MIN(buffer->data_size,
448                      device_write_max_size(device));
449
450     if (!wrote_blocksize) {
451         wrote_blocksize = TRUE;
452         dbprintf("USING BLOCKSIZE %d bytes\n", write_size);
453     }
454
455     if (device_write_block(device, write_size,
456                            buffer->data + buffer->offset,
457                            buffer->data_size <
458                                device_write_min_size(device))) {
459         /* Success! */
460         return write_size;
461     } else {
462         /* Nope, really an error. */
463         return -1;
464     }
465 }
466
467 producer_result_t fd_read_producer(gpointer fdp, queue_buffer_t *buffer,
468                                    int hint_size) {
469     int fd;
470
471     fd = GPOINTER_TO_INT(fdp);
472     g_assert(fd >= 0);
473     g_assert(buffer->data_size == 0);
474
475     buffer->offset = 0;
476
477     if (buffer->data == NULL) {
478         /* Set up the buffer. */
479         buffer->data = malloc(hint_size);
480         buffer->alloc_size = hint_size;
481     }
482
483     for (;;) {
484         int result;
485         result = read(fd, buffer->data, buffer->alloc_size);
486
487         if (result > 0) {
488             buffer->data_size = result;
489             return PRODUCER_MORE;
490         } else if (result == 0) {
491             /* End of file. */
492             return PRODUCER_FINISHED;
493         } else if (0
494 #ifdef EAGAIN
495                 || errno == EAGAIN
496 #endif
497 #ifdef EWOULDBLOCK
498                 || errno == EWOULDBLOCK
499 #endif
500 #ifdef EINTR
501                 || errno == EINTR
502 #endif
503                 ) {
504                 /* Try again. */
505                 continue;
506         } else {
507             /* Error occured. */
508             g_fprintf(stderr, "Error reading fd %d: %s\n", fd, strerror(errno));
509             return PRODUCER_ERROR;
510         }
511     }
512 }
513
514 int fd_write_consumer(gpointer fdp, queue_buffer_t *buffer) {
515     int fd;
516
517     fd = GPOINTER_TO_INT(fdp);
518     g_assert(fd >= 0);
519
520     g_return_val_if_fail(buffer->data_size > 0, 0);
521
522     for (;;) {
523         int write_size;
524         write_size = write(fd, buffer->data + buffer->offset,
525                            buffer->data_size);
526         
527         if (write_size > 0) {
528             return write_size;
529         } else if (0
530 #ifdef EAGAIN
531                 || errno == EAGAIN
532 #endif
533 #ifdef EWOULDBLOCK
534                 || errno == EWOULDBLOCK
535 #endif
536 #ifdef EINTR
537                 || errno == EINTR
538 #endif
539                 ) {
540                 /* Try again. */
541                 continue;
542         } else {
543             /* Error occured. */
544             g_fprintf(stderr, "Error writing fd %d: %s\n", fd, strerror(errno));
545             return -1;
546         }        
547     }
548 }