2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2008 Zmanda Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
21 #include "element-glue.h"
24 static GObjectClass *parent_class = NULL;
27 * Utility functions, etc.
32 XferElementGlue *self)
34 if (pipe(self->pipe) < 0)
35 g_critical(_("Could not create pipe: %s"), strerror(errno));
40 XferElementGlue *self)
42 xfer_queue_message(XFER_ELEMENT(self)->xfer,
43 xmsg_new((XferElement *)self, XMSG_DONE, 0));
46 #define GLUE_BUFFER_SIZE 32768
47 #define GLUE_RING_BUFFER_SIZE 32
52 * At most one of these runs in a given instance, as selected in setup_impl
56 call_and_write_thread(
59 XferElement *elt = XFER_ELEMENT(data);
60 XferElementGlue *self = XFER_ELEMENT_GLUE(data);
61 int *fdp = (self->pipe[1] == -1)? &elt->downstream->input_fd : &self->pipe[1];
64 while (!elt->cancelled) {
68 /* get a buffer from upstream */
69 buf = xfer_element_pull_buffer(elt->upstream, &len);
74 if (full_write(fd, buf, len) < len) {
75 xfer_element_handle_error(elt,
76 _("Error writing to fd %d: %s"), fd, strerror(errno));
84 if (elt->cancelled && elt->expect_eof)
85 xfer_element_drain_by_pulling(elt->upstream);
87 /* close the fd we've been writing, as an EOF signal to downstream, and
88 * set it to -1 to avoid accidental re-use */
98 read_and_write_thread(
101 XferElement *elt = XFER_ELEMENT(data);
102 XferElementGlue *self = XFER_ELEMENT_GLUE(data);
103 int rfd = elt->upstream->output_fd;
104 int wfd = elt->downstream->input_fd;
106 /* dynamically allocate a buffer, in case this thread has
107 * a limited amount of stack allocated */
108 char *buf = g_malloc(GLUE_BUFFER_SIZE);
110 while (!elt->cancelled) {
113 /* read from upstream */
114 len = full_read(rfd, buf, GLUE_BUFFER_SIZE);
115 if (len < GLUE_BUFFER_SIZE) {
117 xfer_element_handle_error(elt,
118 _("Error reading from fd %d: %s"), rfd, strerror(errno));
120 } else if (len == 0) { /* we only count a zero-length read as EOF */
125 /* write the buffer fully */
126 if (full_write(wfd, buf, len) < len) {
127 xfer_element_handle_error(elt,
128 _("Could not write to fd %d: %s"), wfd, strerror(errno));
133 if (elt->cancelled && elt->expect_eof)
134 xfer_element_drain_by_pulling(elt->upstream);
136 /* close the read fd, if it's at EOF, and set it to -1 to avoid accidental
138 if (!elt->cancelled || elt->expect_eof) {
140 elt->upstream->output_fd = -1;
143 /* close the fd we've been writing, as an EOF signal to downstream, and
144 * set it to -1 to avoid accidental re-use */
146 elt->downstream->input_fd = -1;
148 send_xfer_done(self);
155 read_and_call_thread(
158 XferElement *elt = XFER_ELEMENT(data);
159 XferElementGlue *self = XFER_ELEMENT_GLUE(data);
160 int *fdp = (self->pipe[0] == -1)? &elt->upstream->output_fd : &self->pipe[0];
163 while (!elt->cancelled) {
164 char *buf = g_malloc(GLUE_BUFFER_SIZE);
167 /* read a buffer from upstream */
168 len = full_read(fd, buf, GLUE_BUFFER_SIZE);
169 if (len < GLUE_BUFFER_SIZE) {
171 xfer_element_handle_error(elt,
172 _("Error reading from fd %d: %s"), fd, strerror(errno));
174 } else if (len == 0) { /* we only count a zero-length read as EOF */
180 xfer_element_push_buffer(elt->downstream, buf, len);
183 if (elt->cancelled && elt->expect_eof)
184 xfer_element_drain_by_reading(fd);
186 /* send an EOF indication downstream */
187 xfer_element_push_buffer(elt->downstream, NULL, 0);
189 /* close the read fd, since it's at EOF, and set it to -1 to avoid accidental
194 send_xfer_done(self);
200 call_and_call_thread(
203 XferElement *elt = XFER_ELEMENT(data);
204 XferElementGlue *self = XFER_ELEMENT_GLUE(data);
205 gboolean eof_sent = FALSE;
207 /* TODO: consider breaking this into two cooperating threads: one to pull
208 * buffers from upstream and one to push them downstream. This would gain
209 * parallelism at the cost of a lot of synchronization operations. */
211 while (!elt->cancelled) {
215 /* get a buffer from upstream */
216 buf = xfer_element_pull_buffer(elt->upstream, &len);
218 /* and push it downstream */
219 xfer_element_push_buffer(elt->downstream, buf, len);
227 if (elt->cancelled && elt->expect_eof)
228 xfer_element_drain_by_pulling(elt->upstream);
231 xfer_element_push_buffer(elt->downstream, NULL, 0);
233 send_xfer_done(self);
246 XferElementGlue *self = (XferElementGlue *)elt;
248 switch (elt->input_mech) {
249 case XFER_MECH_READFD:
250 switch (elt->output_mech) {
251 case XFER_MECH_READFD:
252 g_assert_not_reached(); /* no glue needed */
255 case XFER_MECH_WRITEFD:
256 self->threadfunc = read_and_write_thread;
259 case XFER_MECH_PUSH_BUFFER:
260 self->threadfunc = read_and_call_thread;
263 case XFER_MECH_PULL_BUFFER:
267 g_assert_not_reached();
272 case XFER_MECH_WRITEFD:
274 elt->input_fd = self->pipe[1];
275 self->pipe[1] = -1; /* upstream will close this for us */
277 switch (elt->output_mech) {
278 case XFER_MECH_READFD:
279 elt->output_fd = self->pipe[0];
280 self->pipe[0] = -1; /* downstream will close this for us */
283 case XFER_MECH_WRITEFD:
284 g_assert_not_reached(); /* no glue needed */
287 case XFER_MECH_PUSH_BUFFER:
288 self->threadfunc = read_and_call_thread;
291 case XFER_MECH_PULL_BUFFER:
295 g_assert_not_reached();
300 case XFER_MECH_PUSH_BUFFER:
301 switch (elt->output_mech) {
302 case XFER_MECH_READFD:
304 elt->output_fd = self->pipe[0];
305 self->pipe[0] = -1; /* downstream will close this for us */
308 case XFER_MECH_WRITEFD:
311 case XFER_MECH_PUSH_BUFFER:
312 g_assert_not_reached(); /* no glue needed */
315 case XFER_MECH_PULL_BUFFER:
316 self->ring = g_malloc(sizeof(*self->ring) * GLUE_RING_BUFFER_SIZE);
317 self->ring_used_sem = semaphore_new_with_value(0);
318 self->ring_free_sem = semaphore_new_with_value(GLUE_RING_BUFFER_SIZE);
322 g_assert_not_reached();
327 case XFER_MECH_PULL_BUFFER:
328 switch (elt->output_mech) {
329 case XFER_MECH_READFD:
331 elt->output_fd = self->pipe[0];
332 self->pipe[0] = -1; /* downstream will close this for us */
333 self->threadfunc = call_and_write_thread;
336 case XFER_MECH_WRITEFD:
337 self->threadfunc = call_and_write_thread;
340 case XFER_MECH_PUSH_BUFFER:
341 self->threadfunc = call_and_call_thread;
344 case XFER_MECH_PULL_BUFFER:
345 g_assert_not_reached(); /* no glue needed */
349 g_assert_not_reached();
355 g_assert_not_reached();
364 XferElementGlue *self = (XferElementGlue *)elt;
366 if (self->threadfunc) {
367 self->thread = g_thread_create(self->threadfunc, (gpointer)self, FALSE, NULL);
370 /* we're active if we have a thread that will eventually die */
371 return self->threadfunc? TRUE : FALSE;
379 XferElementGlue *self = XFER_ELEMENT_GLUE(elt);
384 if (elt->cancelled) {
385 /* The finalize method will empty the ring buffer */
390 /* make sure there's at least one element available */
391 semaphore_down(self->ring_used_sem);
394 buf = self->ring[self->ring_tail].buf;
395 *size = self->ring[self->ring_tail].size;
396 self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
398 /* and mark this element as free to be overwritten */
399 semaphore_up(self->ring_free_sem);
403 int *fdp = (self->pipe[0] == -1)? &elt->upstream->output_fd : &self->pipe[0];
405 char *buf = g_malloc(GLUE_BUFFER_SIZE);
408 if (elt->cancelled) {
410 xfer_element_drain_by_reading(fd);
419 /* read from upstream */
420 len = full_read(fd, buf, GLUE_BUFFER_SIZE);
421 if (len < GLUE_BUFFER_SIZE) {
423 xfer_element_handle_error(elt,
424 _("Error reading from fd %d: %s"), fd, strerror(errno));
430 /* and finish off the upstream */
431 if (elt->expect_eof) {
432 xfer_element_drain_by_reading(fd);
436 } else if (len == 0) {
442 /* signal EOF to downstream */
459 XferElementGlue *self = (XferElementGlue *)elt;
462 /* just drop packets if the transfer has been cancelled */
463 if (elt->cancelled) {
468 /* make sure there's at least one element free */
469 semaphore_down(self->ring_free_sem);
472 self->ring[self->ring_head].buf = buf;
473 self->ring[self->ring_head].size = len;
474 self->ring_head = (self->ring_head + 1) % GLUE_RING_BUFFER_SIZE;
476 /* and mark this element as available for reading */
477 semaphore_up(self->ring_used_sem);
481 int *fdp = (self->pipe[1] == -1)? &elt->downstream->input_fd : &self->pipe[1];
484 if (elt->cancelled) {
485 if (!elt->expect_eof || !buf) {
489 /* hack to ensure we won't close the fd again, if we get another push */
490 elt->expect_eof = TRUE;
498 /* write the full buffer to the fd, or close on EOF */
500 if (full_write(fd, buf, len) < len) {
501 xfer_element_handle_error(elt,
502 _("Error writing to fd %d: %s"), fd, strerror(errno));
503 /* nothing special to do to handle the cancellation */
517 XferElementGlue *self)
519 XferElement *elt = (XferElement *)self;
520 elt->can_generate_eof = TRUE;
521 self->pipe[0] = self->pipe[1] = -1;
528 XferElementGlue *self = XFER_ELEMENT_GLUE(obj_self);
530 /* close our pipes if they're still open (they shouldn't be!) */
531 if (self->pipe[0] != -1) close(self->pipe[0]);
532 if (self->pipe[1] != -1) close(self->pipe[1]);
535 /* empty the ring buffer, ignoring syncronization issues */
536 while (self->ring_used_sem->value) {
537 if (self->ring[self->ring_tail].buf)
538 amfree(self->ring[self->ring_tail].buf);
539 self->ring_tail = (self->ring_tail + 1) % GLUE_RING_BUFFER_SIZE;
543 semaphore_free(self->ring_used_sem);
544 semaphore_free(self->ring_free_sem);
548 G_OBJECT_CLASS(parent_class)->finalize(obj_self);
551 static xfer_element_mech_pair_t _pairs[] = {
552 { XFER_MECH_READFD, XFER_MECH_WRITEFD, 2, 1 }, /* splice or copy */
553 { XFER_MECH_READFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* read and call */
554 { XFER_MECH_READFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* read on demand */
556 { XFER_MECH_WRITEFD, XFER_MECH_READFD, 0, 0 }, /* pipe */
557 { XFER_MECH_WRITEFD, XFER_MECH_PUSH_BUFFER, 1, 1 }, /* pipe + read and call*/
558 { XFER_MECH_WRITEFD, XFER_MECH_PULL_BUFFER, 1, 0 }, /* pipe + read on demand */
560 { XFER_MECH_PUSH_BUFFER, XFER_MECH_READFD, 1, 0 }, /* write on demand + pipe */
561 { XFER_MECH_PUSH_BUFFER, XFER_MECH_WRITEFD, 1, 0 }, /* write on demand */
562 { XFER_MECH_PUSH_BUFFER, XFER_MECH_PULL_BUFFER, 0, 0 }, /* async queue */
564 { XFER_MECH_PULL_BUFFER, XFER_MECH_READFD, 1, 1 }, /* call and write + pipe */
565 { XFER_MECH_PULL_BUFFER, XFER_MECH_WRITEFD, 1, 1 }, /* call and write */
566 { XFER_MECH_PULL_BUFFER, XFER_MECH_PUSH_BUFFER, 0, 1 }, /* call and call */
569 { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
571 xfer_element_mech_pair_t *xfer_element_glue_mech_pairs = _pairs;
575 XferElementGlueClass * selfc)
577 XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
578 GObjectClass *goc = G_OBJECT_CLASS(selfc);
580 klass->setup = setup_impl;
581 klass->start = start_impl;
582 klass->push_buffer = push_buffer_impl;
583 klass->pull_buffer = pull_buffer_impl;
585 klass->perl_class = "Amanda::Xfer::Element::Glue";
586 klass->mech_pairs = xfer_element_glue_mech_pairs;
588 goc->finalize = finalize_impl;
590 parent_class = g_type_class_peek_parent(selfc);
594 xfer_element_glue_get_type (void)
596 static GType type = 0;
598 if G_UNLIKELY(type == 0) {
599 static const GTypeInfo info = {
600 sizeof (XferElementGlueClass),
601 (GBaseInitFunc) NULL,
602 (GBaseFinalizeFunc) NULL,
603 (GClassInitFunc) class_init,
604 (GClassFinalizeFunc) NULL,
605 NULL /* class_data */,
606 sizeof (XferElementGlue),
608 (GInstanceInitFunc) instance_init,
612 type = g_type_register_static (XFER_ELEMENT_TYPE, "XferElementGlue", &info, 0);
618 /* create an element of this class; prototype is in xfer-element.h */
620 xfer_element_glue(void)
622 XferElementGlue *self = (XferElementGlue *)g_object_new(XFER_ELEMENT_GLUE_TYPE, NULL);
623 XferElement *elt = XFER_ELEMENT(self);