678be8462669350bd611010781ca4c7968a55867
[debian/amanda] / xfer-src / xfer-element.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2008 Zmanda Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  */
19
20 #include "amxfer.h"
21 #include "amanda.h"
22 #include "arglist.h"
23
24 /* parent class for XferElement */
25 static GObjectClass *parent_class = NULL;
26
27 /* parent class for XferDest, XferFilter, and XferSource */
28 static XferElementClass *xfer_element_class = NULL;
29
30 /***********************
31  * XferElement */
32
33 static void
34 xfer_element_init(
35     XferElement *xe)
36 {
37     xe->xfer = NULL;
38     xe->output_mech = XFER_MECH_NONE;
39     xe->input_mech = XFER_MECH_NONE;
40     xe->upstream = xe->downstream = NULL;
41     xe->input_fd = xe->output_fd = -1;
42     xe->repr = NULL;
43 }
44
45 static void
46 xfer_element_setup_impl(
47     XferElement *elt G_GNUC_UNUSED)
48 {
49 }
50
51 static gboolean
52 xfer_element_start_impl(
53     XferElement *elt G_GNUC_UNUSED)
54 {
55     return FALSE;
56 }
57
58 static gboolean
59 xfer_element_cancel_impl(
60     XferElement *elt,
61     gboolean expect_eof)
62 {
63     elt->cancelled = TRUE;
64     elt->expect_eof = expect_eof;
65     return elt->can_generate_eof;
66 }
67
68 static gpointer
69 xfer_element_pull_buffer_impl(
70     XferElement *elt G_GNUC_UNUSED,
71     size_t *size G_GNUC_UNUSED)
72 {
73     return NULL;
74 }
75
76 static void
77 xfer_element_push_buffer_impl(
78     XferElement *elt G_GNUC_UNUSED,
79     gpointer buf G_GNUC_UNUSED,
80     size_t size G_GNUC_UNUSED)
81 {
82 }
83
84 static char *
85 xfer_element_repr_impl(
86     XferElement *elt)
87 {
88     if (!elt->repr) {
89         elt->repr = newvstrallocf(elt->repr, "<%s@%p>", 
90                 G_OBJECT_TYPE_NAME(G_OBJECT(elt)),
91                 elt);
92     }
93
94     return elt->repr;
95 }
96
97 static void
98 xfer_element_finalize(
99     GObject * obj_self)
100 {
101     XferElement *elt = XFER_ELEMENT(obj_self);
102
103     /* free the repr cache */
104     if (elt->repr) g_free(elt->repr);
105
106     /* chain up */
107     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
108 }
109
110 static void
111 xfer_element_class_init(
112     XferElementClass * klass)
113 {
114     GObjectClass *goc = (GObjectClass*) klass;
115
116     klass->repr = xfer_element_repr_impl;
117     klass->setup = xfer_element_setup_impl;
118     klass->start = xfer_element_start_impl;
119     klass->cancel = xfer_element_cancel_impl;
120     klass->pull_buffer = xfer_element_pull_buffer_impl;
121     klass->push_buffer = xfer_element_push_buffer_impl;
122
123     goc->finalize = xfer_element_finalize;
124
125     klass->perl_class = NULL;
126
127     parent_class = g_type_class_peek_parent(klass);
128     xfer_element_class = klass;
129 }
130
131 GType
132 xfer_element_get_type(void)
133 {
134     static GType type = 0;
135
136     if G_UNLIKELY(type == 0) {
137         static const GTypeInfo info = {
138             sizeof (XferElementClass),
139             (GBaseInitFunc) NULL,
140             (GBaseFinalizeFunc) NULL,
141             (GClassInitFunc) xfer_element_class_init,
142             (GClassFinalizeFunc) NULL,
143             NULL /* class_data */,
144             sizeof (XferElement),
145             0 /* n_preallocs */,
146             (GInstanceInitFunc) xfer_element_init,
147             NULL
148         };
149
150         type = g_type_register_static (G_TYPE_OBJECT, "XferElement", &info,
151                                        (GTypeFlags)G_TYPE_FLAG_ABSTRACT);
152     }
153
154     return type;
155 }
156
157 /*
158  * Method stubs
159  */
160
161 void
162 xfer_element_unref(
163     XferElement *elt)
164 {
165     if (elt) g_object_unref(elt);
166 }
167
168 char *
169 xfer_element_repr(
170     XferElement *elt)
171 {
172     return XFER_ELEMENT_GET_CLASS(elt)->repr(elt);
173 }
174
175 void
176 xfer_element_setup(
177     XferElement *elt)
178 {
179     XFER_ELEMENT_GET_CLASS(elt)->setup(elt);
180 }
181
182 gboolean
183 xfer_element_start(
184     XferElement *elt)
185 {
186     return XFER_ELEMENT_GET_CLASS(elt)->start(elt);
187 }
188
189 gboolean
190 xfer_element_cancel(
191     XferElement *elt,
192     gboolean expect_eof)
193 {
194     return XFER_ELEMENT_GET_CLASS(elt)->cancel(elt, expect_eof);
195 }
196
197 gpointer
198 xfer_element_pull_buffer(
199     XferElement *elt,
200     size_t *size)
201 {
202     return XFER_ELEMENT_GET_CLASS(elt)->pull_buffer(elt, size);
203 }
204
205 void
206 xfer_element_push_buffer(
207     XferElement *elt,
208     gpointer buf,
209     size_t size)
210 {
211     XFER_ELEMENT_GET_CLASS(elt)->push_buffer(elt, buf, size);
212 }
213
214 /****
215  * Utilities
216  */
217
218 void
219 xfer_element_drain_by_pulling(
220     XferElement *upstream)
221 {
222     gpointer buf;
223     size_t size;
224
225     while ((buf =xfer_element_pull_buffer(upstream, &size))) {
226         amfree(buf);
227     }
228 }
229
230 void
231 xfer_element_drain_by_reading(
232     int fd)
233 {
234     size_t len;
235     char buf[1024];
236
237     while (1) {
238         len = full_read(fd, buf, sizeof(buf));
239         if (len < sizeof(buf))
240             return;
241     }
242 }
243
244 xfer_status
245 wait_until_xfer_cancelled(
246     Xfer *xfer)
247 {
248     xfer_status seen_status;
249     g_assert(xfer != NULL);
250
251     g_mutex_lock(xfer->status_mutex);
252     while (xfer->status != XFER_CANCELLED && xfer->status != XFER_DONE)
253         g_cond_wait(xfer->status_cond, xfer->status_mutex);
254     seen_status = xfer->status;
255     g_mutex_unlock(xfer->status_mutex);
256
257     return seen_status;
258 }
259
260 void
261 xfer_element_handle_error(
262     XferElement *elt,
263     const char *fmt,
264     ...)
265 {
266     va_list argp;
267     XMsg *msg;
268
269     g_assert(elt != NULL);
270     g_assert(elt->xfer != NULL);
271
272     msg = xmsg_new(elt, XMSG_ERROR, 0);
273
274     arglist_start(argp, fmt);
275     msg->message = g_strdup_vprintf(fmt, argp);
276     arglist_end(argp);
277
278     /* send the XMSG_ERROR */
279     xfer_queue_message(elt->xfer, msg);
280
281     /* cancel the transfer */
282     xfer_cancel(elt->xfer);
283
284     /* and wait for the cancellation to take effect */
285     wait_until_xfer_cancelled(elt->xfer);
286 }