75c7177815c75ae134fe583cba912a404e09bc6b
[debian/amanda] / xfer-src / element-glue.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2008 Zmanda Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  */
19
20 #include "amxfer.h"
21 #include "element-glue.h"
22 #include "amanda.h"
23
24 static GObjectClass *parent_class = NULL;
25
26 /*
27  * Utility functions, etc.
28  */
29
30 static void
31 make_pipe(
32     XferElementGlue *self)
33 {
34     if (pipe(self->pipe) < 0)
35         g_critical(_("Could not create pipe: %s"), strerror(errno));
36 }
37
38 static void
39 send_xfer_done(
40     XferElementGlue *self)
41 {
42     xfer_queue_message(XFER_ELEMENT(self)->xfer,
43             xmsg_new((XferElement *)self, XMSG_DONE, 0));
44 }
45
46 #define GLUE_BUFFER_SIZE 32768
47 #define GLUE_RING_BUFFER_SIZE 32
48
49 /*
50  * Worker threads
51  *
52  * At most one of these runs in a given instance, as selected in setup_impl
53  */
54
55 static gpointer
56 call_and_write_thread(
57     gpointer data)
58 {
59     XferElement *elt = XFER_ELEMENT(data);
60     XferElementGlue *self = XFER_ELEMENT_GLUE(data);
61     int *fdp = (self->pipe[1] == -1)? &elt->downstream->input_fd : &self->pipe[1];
62     int fd = *fdp;
63
64     while (!elt->cancelled) {
65         size_t len;
66         char *buf;
67
68         /* get a buffer from upstream */
69         buf = xfer_element_pull_buffer(elt->upstream, &len);
70         if (!buf)
71             break;
72
73         /* write it */
74         if (full_write(fd, buf, len) < len) {
75             xfer_element_handle_error(elt,
76                 _("Error writing to fd %d: %s"), fd, strerror(errno));
77             amfree(buf);
78             break;
79         }
80
81         amfree(buf);
82     }
83
84     if (elt->cancelled && elt->expect_eof)
85         xfer_element_drain_by_pulling(elt->upstream);
86
87     /* close the fd we've been writing, as an EOF signal to downstream, and
88      * set it to -1 to avoid accidental re-use */
89     close(fd);
90     *fdp = -1;
91
92     send_xfer_done(self);
93
94     return NULL;
95 }
96
97 static gpointer
98 read_and_write_thread(
99     gpointer data)
100 {
101     XferElement *elt = XFER_ELEMENT(data);
102     XferElementGlue *self = XFER_ELEMENT_GLUE(data);
103     int rfd = elt->upstream->output_fd;
104     int wfd = elt->downstream->input_fd;
105
106     /* dynamically allocate a buffer, in case this thread has
107      * a limited amount of stack allocated */
108     char *buf = g_malloc(GLUE_BUFFER_SIZE);
109
110     while (!elt->cancelled) {
111         size_t len;
112
113         /* read from upstream */
114         len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
115         if (len < GLUE_BUFFER_SIZE) {
116             if (errno) {
117                 xfer_element_handle_error(elt,
118                     _("Error reading from fd %d: %s"), rfd, strerror(errno));
119                 break;
120             } else if (len == 0) { /* we only count a zero-length read as EOF */
121                 break;
122             }
123         }
124
125         /* write the buffer fully */
126         if (full_write(wfd, buf, len) < len) {
127             xfer_element_handle_error(elt,
128                 _("Could not write to fd %d: %s"), wfd, strerror(errno));
129             break;
130         }
131     }
132
133     if (elt->cancelled && elt->expect_eof)
134         xfer_element_drain_by_pulling(elt->upstream);
135
136     /* close the read fd, if it's at EOF, and set it to -1 to avoid accidental
137      * re-use */
138     if (!elt->cancelled || elt->expect_eof) {
139         close(rfd);
140         elt->upstream->output_fd = -1;
141     }
142
143     /* close the fd we've been writing, as an EOF signal to downstream, and
144      * set it to -1 to avoid accidental re-use */
145     close(wfd);
146     elt->downstream->input_fd = -1;
147
148     send_xfer_done(self);
149
150     amfree(buf);
151     return NULL;
152 }
153
154 static gpointer
155 read_and_call_thread(
156     gpointer data)
157 {
158     XferElement *elt = XFER_ELEMENT(data);
159     XferElementGlue *self = XFER_ELEMENT_GLUE(data);
160     int *fdp = (self->pipe[0] == -1)? &elt->upstream->output_fd : &self->pipe[0];
161     int fd = *fdp;
162
163     while (!elt->cancelled) {
164         char *buf = g_malloc(GLUE_BUFFER_SIZE);
165         size_t len;
166
167         /* read a buffer from upstream */
168         len = full_read(fd, buf, GLUE_BUFFER_SIZE);
169         if (len < GLUE_BUFFER_SIZE) {
170             if (errno) {
171                 xfer_element_handle_error(elt,
172                     _("Error reading from fd %d: %s"), fd, strerror(errno));
173                 break;
174             } else if (len == 0) { /* we only count a zero-length read as EOF */
175                 amfree(buf);
176                 break;
177             }
178         }
179
180         xfer_element_push_buffer(elt->downstream, buf, len);
181     }
182
183     if (elt->cancelled && elt->expect_eof)
184         xfer_element_drain_by_reading(fd);
185
186     /* send an EOF indication downstream */
187     xfer_element_push_buffer(elt->downstream, NULL, 0);
188
189     /* close the read fd, since it's at EOF, and set it to -1 to avoid accidental
190      * re-use */
191     close(fd);
192     *fdp = -1;
193
194     send_xfer_done(self);
195
196     return NULL;
197 }
198
199 static gpointer
200 call_and_call_thread(
201     gpointer data)
202 {
203     XferElement *elt = XFER_ELEMENT(data);
204     XferElementGlue *self = XFER_ELEMENT_GLUE(data);
205     gboolean eof_sent = FALSE;
206
207     /* TODO: consider breaking this into two cooperating threads: one to pull
208      * buffers from upstream and one to push them downstream.  This would gain
209      * parallelism at the cost of a lot of synchronization operations. */
210
211     while (!elt->cancelled) {
212         char *buf;
213         size_t len;
214
215         /* get a buffer from upstream */
216         buf = xfer_element_pull_buffer(elt->upstream, &len);
217
218         /* and push it downstream */
219         xfer_element_push_buffer(elt->downstream, buf, len);
220
221         if (!buf) {
222             eof_sent = TRUE;
223             break;
224         }
225     }
226
227     if (elt->cancelled && elt->expect_eof)
228         xfer_element_drain_by_pulling(elt->upstream);
229
230     if (!eof_sent)
231         xfer_element_push_buffer(elt->downstream, NULL, 0);
232
233     send_xfer_done(self);
234
235     return NULL;
236 }
237
238 /*
239  * Implementation
240  */
241
242 static void
243 setup_impl(
244     XferElement *elt)
245 {
246     XferElementGlue *self = (XferElementGlue *)elt;
247
248     switch (elt->input_mech) {
249     case XFER_MECH_READFD:
250         switch (elt->output_mech) {
251         case XFER_MECH_READFD:
252             g_assert_not_reached(); /* no glue needed */
253             break;
254
255         case XFER_MECH_WRITEFD:
256             self->threadfunc = read_and_write_thread;
257             break;
258
259         case XFER_MECH_PUSH_BUFFER:
260             self->threadfunc = read_and_call_thread;
261             break;
262
263         case XFER_MECH_PULL_BUFFER:
264             break;
265
266         case XFER_MECH_NONE:
267             g_assert_not_reached();
268             break;
269         }
270         break;
271
272     case XFER_MECH_WRITEFD:
273         make_pipe(self);
274         elt->input_fd = self->pipe[1];
275         self->pipe[1] = -1; /* upstream will close this for us */
276
277         switch (elt->output_mech) {
278         case XFER_MECH_READFD:
279             elt->output_fd = self->pipe[0];
280             self->pipe[0] = -1; /* downstream will close this for us */
281             break;
282
283         case XFER_MECH_WRITEFD:
284             g_assert_not_reached(); /* no glue needed */
285             break;
286
287         case XFER_MECH_PUSH_BUFFER:
288             self->threadfunc = read_and_call_thread;
289             break;
290
291         case XFER_MECH_PULL_BUFFER:
292             break;
293
294         case XFER_MECH_NONE:
295             g_assert_not_reached();
296             break;
297         }
298         break;
299
300     case XFER_MECH_PUSH_BUFFER:
301         switch (elt->output_mech) {
302         case XFER_MECH_READFD:
303             make_pipe(self);
304             elt->output_fd = self->pipe[0];
305             self->pipe[0] = -1; /* downstream will close this for us */
306             break;
307
308         case XFER_MECH_WRITEFD:
309             break;
310
311         case XFER_MECH_PUSH_BUFFER:
312             g_assert_not_reached(); /* no glue needed */
313             break;
314
315         case XFER_MECH_PULL_BUFFER:
316             self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
317             self->ring_used_sem = semaphore_new_with_value(0);
318             self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
319             break;
320
321         case XFER_MECH_NONE:
322             g_assert_not_reached();
323             break;
324         }
325         break;
326
327     case XFER_MECH_PULL_BUFFER:
328         switch (elt->output_mech) {
329         case XFER_MECH_READFD:
330             make_pipe(self);
331             elt->output_fd = self->pipe[0];
332             self->pipe[0] = -1; /* downstream will close this for us */
333             self->threadfunc = call_and_write_thread;
334             break;
335
336         case XFER_MECH_WRITEFD:
337             self->threadfunc = call_and_write_thread;
338             break;
339
340         case XFER_MECH_PUSH_BUFFER:
341             self->threadfunc = call_and_call_thread;
342             break;
343
344         case XFER_MECH_PULL_BUFFER:
345             g_assert_not_reached(); /* no glue needed */
346             break;
347
348         case XFER_MECH_NONE:
349             g_assert_not_reached();
350             break;
351         }
352         break;
353
354     case XFER_MECH_NONE:
355         g_assert_not_reached();
356         break;
357     }
358 }
359
360 static gboolean
361 start_impl(
362     XferElement *elt)
363 {
364     XferElementGlue *self = (XferElementGlue *)elt;
365
366     if (self->threadfunc) {
367         self->thread = g_thread_create(self->threadfunc, (gpointer)self, FALSE, NULL);
368     }
369
370     /* we're active if we have a thread that will eventually die */
371     return self->threadfunc? TRUE : FALSE;
372 }
373
374 static gpointer
375 pull_buffer_impl(
376     XferElement *elt,
377     size_t *size)
378 {
379     XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
380
381     if (self->ring) {
382         gpointer buf;
383
384         if (elt->cancelled) {
385             /* The finalize method will empty the ring buffer */
386             *size = 0;
387             return NULL;
388         }
389
390         /* make sure there's at least one element available */
391         semaphore_down(self->ring_used_sem);
392
393         /* get it */
394         buf = self->ring[self->ring_tail].buf;
395         *size = self->ring[self->ring_tail].size;
396         self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
397
398         /* and mark this element as free to be overwritten */
399         semaphore_up(self->ring_free_sem);
400
401         return buf;
402     } else {
403         int *fdp = (self->pipe[0] == -1)? &elt->upstream->output_fd : &self->pipe[0];
404         int fd = *fdp;
405         char *buf = g_malloc(GLUE_BUFFER_SIZE);
406         ssize_t len;
407
408         if (elt->cancelled) {
409             if (elt->expect_eof)
410                 xfer_element_drain_by_reading(fd);
411
412             close(fd);
413             *fdp = -1;
414
415             *size = 0;
416             return NULL;
417         }
418
419         /* read from upstream */
420         len = full_read(fd, buf, GLUE_BUFFER_SIZE);
421         if (len < GLUE_BUFFER_SIZE) {
422             if (errno) {
423                 xfer_element_handle_error(elt,
424                     _("Error reading from fd %d: %s"), fd, strerror(errno));
425
426                 /* return an EOF */
427                 amfree(buf);
428                 len = 0;
429
430                 /* and finish off the upstream */
431                 if (elt->expect_eof) {
432                     xfer_element_drain_by_reading(fd);
433                 }
434                 close(fd);
435                 *fdp = -1;
436             } else if (len == 0) {
437                 /* EOF */
438                 g_free(buf);
439                 buf = NULL;
440                 *size = 0;
441
442                 /* signal EOF to downstream */
443                 close(fd);
444                 *fdp = -1;
445             }
446         }
447
448         *size = (size_t)len;
449         return buf;
450     }
451 }
452
453 static void
454 push_buffer_impl(
455     XferElement *elt,
456     gpointer buf,
457     size_t len)
458 {
459     XferElementGlue *self = (XferElementGlue *)elt;
460
461     if (self->ring) {
462         /* just drop packets if the transfer has been cancelled */
463         if (elt->cancelled) {
464             amfree(buf);
465             return;
466         }
467
468         /* make sure there's at least one element free */
469         semaphore_down(self->ring_free_sem);
470
471         /* set it */
472         self->ring[self->ring_head].buf = buf;
473         self->ring[self->ring_head].size = len;
474         self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
475
476         /* and mark this element as available for reading */
477         semaphore_up(self->ring_used_sem);
478
479         return;
480     } else {
481         int *fdp = (self->pipe[1] == -1)? &elt->downstream->input_fd : &self->pipe[1];
482         int fd = *fdp;
483
484         if (elt->cancelled) {
485             if (!elt->expect_eof || !buf) {
486                 close(fd);
487                 *fdp = -1;
488
489                 /* hack to ensure we won't close the fd again, if we get another push */
490                 elt->expect_eof = TRUE;
491             }
492
493             amfree(buf);
494
495             return;
496         }
497
498         /* write the full buffer to the fd, or close on EOF */
499         if (buf) {
500             if (full_write(fd, buf, len) < len) {
501                 xfer_element_handle_error(elt,
502                     _("Error writing to fd %d: %s"), fd, strerror(errno));
503                 /* nothing special to do to handle the cancellation */
504             }
505             amfree(buf);
506         } else {
507             close(fd);
508             *fdp = -1;
509         }
510
511         return;
512     }
513 }
514
515 static void
516 instance_init(
517     XferElementGlue *self)
518 {
519     XferElement *elt = (XferElement *)self;
520     elt->can_generate_eof = TRUE;
521     self->pipe[0] = self->pipe[1] = -1;
522 }
523
524 static void
525 finalize_impl(
526     GObject * obj_self)
527 {
528     XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
529
530     /* close our pipes if they're still open (they shouldn't be!) */
531     if (self->pipe[0] != -1) close(self->pipe[0]);
532     if (self->pipe[1] != -1) close(self->pipe[1]);
533
534     if (self->ring) {
535         /* empty the ring buffer, ignoring syncronization issues */
536         while (self->ring_used_sem->value) {
537             if (self->ring[self->ring_tail].buf)
538                 amfree(self->ring[self->ring_tail].buf);
539             self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
540         }
541
542         amfree(self->ring);
543         semaphore_free(self->ring_used_sem);
544         semaphore_free(self->ring_free_sem);
545     }
546
547     /* chain up */
548     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
549 }
550
551 static xfer_element_mech_pair_t _pairs[] = {
552     { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
553     { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
554     { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
555
556     { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
557     { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
558     { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
559
560     { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
561     { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
562     { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
563
564     { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
565     { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
566     { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
567
568     /* terminator */
569     { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
570 };
571 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
572
573 static void
574 class_init(
575     XferElementGlueClass * selfc)
576 {
577     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
578     GObjectClass *goc = G_OBJECT_CLASS(selfc);
579
580     klass->setup = setup_impl;
581     klass->start = start_impl;
582     klass->push_buffer = push_buffer_impl;
583     klass->pull_buffer = pull_buffer_impl;
584
585     klass->perl_class = "Amanda::Xfer::Element::Glue";
586     klass->mech_pairs = xfer_element_glue_mech_pairs;
587
588     goc->finalize = finalize_impl;
589
590     parent_class = g_type_class_peek_parent(selfc);
591 }
592
593 GType
594 xfer_element_glue_get_type (void)
595 {
596     static GType type = 0;
597
598     if G_UNLIKELY(type == 0) {
599         static const GTypeInfo info = {
600             sizeof (XferElementGlueClass),
601             (GBaseInitFunc) NULL,
602             (GBaseFinalizeFunc) NULL,
603             (GClassInitFunc) class_init,
604             (GClassFinalizeFunc) NULL,
605             NULL /* class_data */,
606             sizeof (XferElementGlue),
607             0 /* n_preallocs */,
608             (GInstanceInitFunc) instance_init,
609             NULL
610         };
611
612         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
613     }
614
615     return type;
616 }
617
618 /* create an element of this class; prototype is in xfer-element.h */
619 XferElement *
620 xfer_element_glue(void)
621 {
622     XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
623     XferElement *elt = XFER_ELEMENT(self);
624
625     return elt;
626 }