Imported Upstream version 3.3.3
[debian/amanda] / xfer-src / xfer-element.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18  *
19  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21  */
22
23 #include "amanda.h"
24 #include "amxfer.h"
25
26 /* parent class for XferElement */
27 static GObjectClass *parent_class = NULL;
28
29 /* parent class for XferDest, XferFilter, and XferSource */
30 static XferElementClass *xfer_element_class = NULL;
31
32 /***********************
33  * XferElement */
34
35 static void
36 xfer_element_init(
37     XferElement *xe)
38 {
39     xe->xfer = NULL;
40     xe->output_mech = XFER_MECH_NONE;
41     xe->input_mech = XFER_MECH_NONE;
42     xe->upstream = xe->downstream = NULL;
43     xe->_input_fd = xe->_output_fd = -1;
44     xe->repr = NULL;
45 }
46
47 static gboolean
48 xfer_element_setup_impl(
49     XferElement *elt G_GNUC_UNUSED)
50 {
51     return TRUE; /* success */
52 }
53
54 static gboolean
55 xfer_element_set_size_impl(
56     XferElement *elt G_GNUC_UNUSED,
57     gint64       size G_GNUC_UNUSED)
58 {
59     elt->size = size;
60
61     return TRUE; /* success */
62 }
63
64 static gboolean
65 xfer_element_start_impl(
66     XferElement *elt G_GNUC_UNUSED)
67 {
68     return FALSE; /* will not send XMSG_DONE */
69 }
70
71 static gboolean
72 xfer_element_cancel_impl(
73     XferElement *elt,
74     gboolean expect_eof)
75 {
76     elt->cancelled = TRUE;
77     elt->expect_eof = expect_eof;
78     return elt->can_generate_eof;
79 }
80
81 static gpointer
82 xfer_element_pull_buffer_impl(
83     XferElement *elt G_GNUC_UNUSED,
84     size_t *size G_GNUC_UNUSED)
85 {
86     return NULL;
87 }
88
89 static void
90 xfer_element_push_buffer_impl(
91     XferElement *elt G_GNUC_UNUSED,
92     gpointer buf G_GNUC_UNUSED,
93     size_t size G_GNUC_UNUSED)
94 {
95 }
96
97 static xfer_element_mech_pair_t *
98 xfer_element_get_mech_pairs_impl(
99     XferElement *elt)
100 {
101     return XFER_ELEMENT_GET_CLASS(elt)->mech_pairs;
102 }
103
104 static char *
105 xfer_element_repr_impl(
106     XferElement *elt)
107 {
108     if (!elt->repr) {
109         elt->repr = newvstrallocf(elt->repr, "<%s@%p>",
110                 G_OBJECT_TYPE_NAME(G_OBJECT(elt)),
111                 elt);
112     }
113
114     return elt->repr;
115 }
116
117 static void
118 xfer_element_finalize(
119     GObject * obj_self)
120 {
121     XferElement *elt = XFER_ELEMENT(obj_self);
122     gint fd;
123
124     /* free the repr cache */
125     if (elt->repr) g_free(elt->repr);
126
127     /* close up the input/output file descriptors, being careful to do so
128      * atomically, and making any errors doing so into mere warnings */
129     fd = xfer_element_swap_input_fd(elt, -1);
130     if (fd != -1 && close(fd) != 0)
131         g_warning("error closing fd %d: %s", fd, strerror(errno));
132     fd = xfer_element_swap_output_fd(elt, -1);
133     if (fd != -1 && close(fd) != 0)
134         g_warning("error closing fd %d: %s", fd, strerror(errno));
135
136     /* chain up */
137     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
138 }
139
140 static void
141 xfer_element_class_init(
142     XferElementClass * klass)
143 {
144     GObjectClass *goc = (GObjectClass*) klass;
145
146     klass->repr = xfer_element_repr_impl;
147     klass->setup = xfer_element_setup_impl;
148     klass->set_size = xfer_element_set_size_impl;
149     klass->start = xfer_element_start_impl;
150     klass->cancel = xfer_element_cancel_impl;
151     klass->pull_buffer = xfer_element_pull_buffer_impl;
152     klass->push_buffer = xfer_element_push_buffer_impl;
153     klass->get_mech_pairs = xfer_element_get_mech_pairs_impl;
154
155     goc->finalize = xfer_element_finalize;
156
157     klass->perl_class = NULL;
158
159     parent_class = g_type_class_peek_parent(klass);
160     xfer_element_class = klass;
161 }
162
163 GType
164 xfer_element_get_type(void)
165 {
166     static GType type = 0;
167
168     if G_UNLIKELY(type == 0) {
169         static const GTypeInfo info = {
170             sizeof (XferElementClass),
171             (GBaseInitFunc) NULL,
172             (GBaseFinalizeFunc) NULL,
173             (GClassInitFunc) xfer_element_class_init,
174             (GClassFinalizeFunc) NULL,
175             NULL /* class_data */,
176             sizeof (XferElement),
177             0 /* n_preallocs */,
178             (GInstanceInitFunc) xfer_element_init,
179             NULL
180         };
181
182         type = g_type_register_static (G_TYPE_OBJECT, "XferElement", &info,
183                                        (GTypeFlags)G_TYPE_FLAG_ABSTRACT);
184     }
185
186     return type;
187 }
188
189 /*
190  * Method stubs
191  */
192
193 void
194 xfer_element_unref(
195     XferElement *elt)
196 {
197     if (elt) g_object_unref(elt);
198 }
199
200 char *
201 xfer_element_repr(
202     XferElement *elt)
203 {
204     return XFER_ELEMENT_GET_CLASS(elt)->repr(elt);
205 }
206
207 gboolean
208 xfer_element_setup(
209     XferElement *elt)
210 {
211     return XFER_ELEMENT_GET_CLASS(elt)->setup(elt);
212 }
213
214 gboolean
215 xfer_element_set_size(
216     XferElement *elt,
217     gint64       size)
218 {
219     return XFER_ELEMENT_GET_CLASS(elt)->set_size(elt, size);
220 }
221
222 gboolean
223 xfer_element_start(
224     XferElement *elt)
225 {
226     return XFER_ELEMENT_GET_CLASS(elt)->start(elt);
227 }
228
229 gboolean
230 xfer_element_cancel(
231     XferElement *elt,
232     gboolean expect_eof)
233 {
234     return XFER_ELEMENT_GET_CLASS(elt)->cancel(elt, expect_eof);
235 }
236
237 gpointer
238 xfer_element_pull_buffer(
239     XferElement *elt,
240     size_t *size)
241 {
242     xfer_status status;
243     /* Make sure that the xfer is running before calling upstream's
244      * pull_buffer method; this avoids a race condition where upstream
245      * hasn't finished its xfer_element_start yet, and isn't ready for
246      * a pull */
247     g_mutex_lock(elt->xfer->status_mutex);
248     status = elt->xfer->status;
249     g_mutex_unlock(elt->xfer->status_mutex);
250     if (status == XFER_START)
251         wait_until_xfer_running(elt->xfer);
252
253     return XFER_ELEMENT_GET_CLASS(elt)->pull_buffer(elt, size);
254 }
255
256 void
257 xfer_element_push_buffer(
258     XferElement *elt,
259     gpointer buf,
260     size_t size)
261 {
262     /* There is no race condition with push_buffer, because downstream
263      * elements are started first. */
264     XFER_ELEMENT_GET_CLASS(elt)->push_buffer(elt, buf, size);
265 }
266
267 xfer_element_mech_pair_t *
268 xfer_element_get_mech_pairs(
269         XferElement *elt)
270 {
271     return XFER_ELEMENT_GET_CLASS(elt)->get_mech_pairs(elt);
272 }
273
274 /****
275  * Utilities
276  */
277
278 void
279 xfer_element_drain_buffers(
280     XferElement *upstream)
281 {
282     gpointer buf;
283     size_t size;
284
285     while ((buf =xfer_element_pull_buffer(upstream, &size))) {
286         amfree(buf);
287     }
288 }
289
290 void
291 xfer_element_drain_fd(
292     int fd)
293 {
294     size_t len;
295     char buf[1024];
296
297     while (1) {
298         len = full_read(fd, buf, sizeof(buf));
299         if (len < sizeof(buf))
300             return;
301     }
302 }
303