dd94e3ca126faafa4ca3ffd055c5fdae49e08ec5
[debian/amanda] / xfer-src / xfer-element.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amanda.h"
23 #include "amxfer.h"
24
25 /* parent class for XferElement */
26 static GObjectClass *parent_class = NULL;
27
28 /* parent class for XferDest, XferFilter, and XferSource */
29 static XferElementClass *xfer_element_class = NULL;
30
31 /***********************
32  * XferElement */
33
34 static void
35 xfer_element_init(
36     XferElement *xe)
37 {
38     xe->xfer = NULL;
39     xe->output_mech = XFER_MECH_NONE;
40     xe->input_mech = XFER_MECH_NONE;
41     xe->upstream = xe->downstream = NULL;
42     xe->_input_fd = xe->_output_fd = -1;
43     xe->repr = NULL;
44 }
45
46 static gboolean
47 xfer_element_setup_impl(
48     XferElement *elt G_GNUC_UNUSED)
49 {
50     return TRUE; /* success */
51 }
52
53 static gboolean
54 xfer_element_set_size_impl(
55     XferElement *elt G_GNUC_UNUSED,
56     gint64       size G_GNUC_UNUSED)
57 {
58     elt->size = size;
59
60     return TRUE; /* success */
61 }
62
63 static gboolean
64 xfer_element_start_impl(
65     XferElement *elt G_GNUC_UNUSED)
66 {
67     return FALSE; /* will not send XMSG_DONE */
68 }
69
70 static gboolean
71 xfer_element_cancel_impl(
72     XferElement *elt,
73     gboolean expect_eof)
74 {
75     elt->cancelled = TRUE;
76     elt->expect_eof = expect_eof;
77     return elt->can_generate_eof;
78 }
79
80 static gpointer
81 xfer_element_pull_buffer_impl(
82     XferElement *elt G_GNUC_UNUSED,
83     size_t *size G_GNUC_UNUSED)
84 {
85     return NULL;
86 }
87
88 static void
89 xfer_element_push_buffer_impl(
90     XferElement *elt G_GNUC_UNUSED,
91     gpointer buf G_GNUC_UNUSED,
92     size_t size G_GNUC_UNUSED)
93 {
94 }
95
96 static xfer_element_mech_pair_t *
97 xfer_element_get_mech_pairs_impl(
98     XferElement *elt)
99 {
100     return XFER_ELEMENT_GET_CLASS(elt)->mech_pairs;
101 }
102
103 static char *
104 xfer_element_repr_impl(
105     XferElement *elt)
106 {
107     if (!elt->repr) {
108         elt->repr = newvstrallocf(elt->repr, "<%s@%p>",
109                 G_OBJECT_TYPE_NAME(G_OBJECT(elt)),
110                 elt);
111     }
112
113     return elt->repr;
114 }
115
116 static void
117 xfer_element_finalize(
118     GObject * obj_self)
119 {
120     XferElement *elt = XFER_ELEMENT(obj_self);
121     gint fd;
122
123     /* free the repr cache */
124     if (elt->repr) g_free(elt->repr);
125
126     /* close up the input/output file descriptors, being careful to do so
127      * atomically, and making any errors doing so into mere warnings */
128     fd = xfer_element_swap_input_fd(elt, -1);
129     if (fd != -1 && close(fd) != 0)
130         g_warning("error closing fd %d: %s", fd, strerror(errno));
131     fd = xfer_element_swap_output_fd(elt, -1);
132     if (fd != -1 && close(fd) != 0)
133         g_warning("error closing fd %d: %s", fd, strerror(errno));
134
135     /* chain up */
136     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
137 }
138
139 static void
140 xfer_element_class_init(
141     XferElementClass * klass)
142 {
143     GObjectClass *goc = (GObjectClass*) klass;
144
145     klass->repr = xfer_element_repr_impl;
146     klass->setup = xfer_element_setup_impl;
147     klass->set_size = xfer_element_set_size_impl;
148     klass->start = xfer_element_start_impl;
149     klass->cancel = xfer_element_cancel_impl;
150     klass->pull_buffer = xfer_element_pull_buffer_impl;
151     klass->push_buffer = xfer_element_push_buffer_impl;
152     klass->get_mech_pairs = xfer_element_get_mech_pairs_impl;
153
154     goc->finalize = xfer_element_finalize;
155
156     klass->perl_class = NULL;
157
158     parent_class = g_type_class_peek_parent(klass);
159     xfer_element_class = klass;
160 }
161
162 GType
163 xfer_element_get_type(void)
164 {
165     static GType type = 0;
166
167     if G_UNLIKELY(type == 0) {
168         static const GTypeInfo info = {
169             sizeof (XferElementClass),
170             (GBaseInitFunc) NULL,
171             (GBaseFinalizeFunc) NULL,
172             (GClassInitFunc) xfer_element_class_init,
173             (GClassFinalizeFunc) NULL,
174             NULL /* class_data */,
175             sizeof (XferElement),
176             0 /* n_preallocs */,
177             (GInstanceInitFunc) xfer_element_init,
178             NULL
179         };
180
181         type = g_type_register_static (G_TYPE_OBJECT, "XferElement", &info,
182                                        (GTypeFlags)G_TYPE_FLAG_ABSTRACT);
183     }
184
185     return type;
186 }
187
188 /*
189  * Method stubs
190  */
191
192 void
193 xfer_element_unref(
194     XferElement *elt)
195 {
196     if (elt) g_object_unref(elt);
197 }
198
199 char *
200 xfer_element_repr(
201     XferElement *elt)
202 {
203     return XFER_ELEMENT_GET_CLASS(elt)->repr(elt);
204 }
205
206 gboolean
207 xfer_element_setup(
208     XferElement *elt)
209 {
210     return XFER_ELEMENT_GET_CLASS(elt)->setup(elt);
211 }
212
213 gboolean
214 xfer_element_set_size(
215     XferElement *elt,
216     gint64       size)
217 {
218     return XFER_ELEMENT_GET_CLASS(elt)->set_size(elt, size);
219 }
220
221 gboolean
222 xfer_element_start(
223     XferElement *elt)
224 {
225     return XFER_ELEMENT_GET_CLASS(elt)->start(elt);
226 }
227
228 gboolean
229 xfer_element_cancel(
230     XferElement *elt,
231     gboolean expect_eof)
232 {
233     return XFER_ELEMENT_GET_CLASS(elt)->cancel(elt, expect_eof);
234 }
235
236 gpointer
237 xfer_element_pull_buffer(
238     XferElement *elt,
239     size_t *size)
240 {
241     /* Make sure that the xfer is running before calling upstream's
242      * pull_buffer method; this avoids a race condition where upstream
243      * hasn't finished its xfer_element_start yet, and isn't ready for
244      * a pull */
245     if (elt->xfer->status == XFER_START)
246         wait_until_xfer_running(elt->xfer);
247
248     return XFER_ELEMENT_GET_CLASS(elt)->pull_buffer(elt, size);
249 }
250
251 void
252 xfer_element_push_buffer(
253     XferElement *elt,
254     gpointer buf,
255     size_t size)
256 {
257     /* There is no race condition with push_buffer, because downstream
258      * elements are started first. */
259     XFER_ELEMENT_GET_CLASS(elt)->push_buffer(elt, buf, size);
260 }
261
262 xfer_element_mech_pair_t *
263 xfer_element_get_mech_pairs(
264         XferElement *elt)
265 {
266     return XFER_ELEMENT_GET_CLASS(elt)->get_mech_pairs(elt);
267 }
268
269 /****
270  * Utilities
271  */
272
273 void
274 xfer_element_drain_buffers(
275     XferElement *upstream)
276 {
277     gpointer buf;
278     size_t size;
279
280     while ((buf =xfer_element_pull_buffer(upstream, &size))) {
281         amfree(buf);
282     }
283 }
284
285 void
286 xfer_element_drain_fd(
287     int fd)
288 {
289     size_t len;
290     char buf[1024];
291
292     while (1) {
293         len = full_read(fd, buf, sizeof(buf));
294         if (len < sizeof(buf))
295             return;
296     }
297 }
298