Imported Upstream version 3.3.3
[debian/amanda] / server-src / xfer-source-holding.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18  *
19  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21  */
22
23 #include "amanda.h"
24 #include "xfer-server.h"
25 #include "xfer-device.h"
26
27 /*
28  * Class declaration
29  *
30  * This declaration is entirely private; nothing but xfer_source_holding() references
31  * it directly.
32  */
33
34 GType xfer_source_holding_get_type(void);
35 #define XFER_SOURCE_HOLDING_TYPE (xfer_source_holding_get_type())
36 #define XFER_SOURCE_HOLDING(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_holding_get_type(), XferSourceHolding)
37 #define XFER_SOURCE_HOLDING_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_holding_get_type(), XferSourceHolding const)
38 #define XFER_SOURCE_HOLDING_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_holding_get_type(), XferSourceHoldingClass)
39 #define IS_XFER_SOURCE_HOLDING(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_holding_get_type ())
40 #define XFER_SOURCE_HOLDING_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_holding_get_type(), XferSourceHoldingClass)
41
42 static GObjectClass *parent_class = NULL;
43
44 /*
45  * Main object structure
46  */
47
48 typedef struct XferSourceHolding {
49     XferElement __parent__;
50
51     int fd;
52     char *next_filename;
53     guint64 bytes_read;
54
55     XferElement *dest_taper;
56 } XferSourceHolding;
57
58 /*
59  * Class definition
60  */
61
62 typedef struct {
63     XferElementClass __parent__;
64 } XferSourceHoldingClass;
65
66 /*
67  * Implementation
68  */
69
70 static gboolean
71 start_new_chunk(
72     XferSourceHolding *self)
73 {
74     char *hdrbuf = NULL;
75     dumpfile_t hdr;
76     size_t bytes_read;
77
78     /* try to close an already-open file */
79     if (self->fd != -1) {
80         if (close(self->fd) < 0) {
81             xfer_cancel_with_error(XFER_ELEMENT(self),
82                 "while closing holding file: %s", strerror(errno));
83             wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
84             return FALSE;
85         }
86
87         self->fd = -1;
88     }
89
90     /* if we have no next filename, then we're at EOF */
91     if (!self->next_filename) {
92         return FALSE;
93     }
94
95     /* otherwise, open up the next file */
96     self->fd = open(self->next_filename, O_RDONLY);
97     if (self->fd < 0) {
98         xfer_cancel_with_error(XFER_ELEMENT(self),
99             "while opening holding file '%s': %s",
100             self->next_filename, strerror(errno));
101         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
102         return FALSE;
103     }
104
105     /* get a downstream XferDestTaper, if one exists.  This check happens
106      * for each chunk, but chunks are large, so that's OK. */
107     if (!self->dest_taper) {
108         XferElement *elt = (XferElement *)self;
109
110         /* the xfer may have inserted glue between this element and
111         * the XferDestTaper. Glue does not change the bytestream, so
112         * it does not interfere with cache_inform calls. */
113         XferElement *iter = elt->downstream;
114         while (iter && IS_XFER_ELEMENT_GLUE(iter)) {
115             iter = iter->downstream;
116         }
117         if (IS_XFER_DEST_TAPER(iter))
118             self->dest_taper = iter;
119     }
120
121     /* tell a XferDestTaper about the new file */
122     if (self->dest_taper) {
123         struct stat st;
124         if (fstat(self->fd, &st) < 0) {
125             xfer_cancel_with_error(XFER_ELEMENT(self),
126                 "while finding size of holding file '%s': %s",
127                 self->next_filename, strerror(errno));
128             wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
129             return FALSE;
130         }
131
132         xfer_dest_taper_cache_inform(self->dest_taper,
133             self->next_filename,
134             DISK_BLOCK_BYTES,
135             st.st_size - DISK_BLOCK_BYTES);
136     }
137
138     /* read the header from the file and determine the filename of the next chunk */
139     hdrbuf = g_malloc(DISK_BLOCK_BYTES);
140     bytes_read = full_read(self->fd, hdrbuf, DISK_BLOCK_BYTES);
141     if (bytes_read < DISK_BLOCK_BYTES) {
142         g_free(hdrbuf);
143         xfer_cancel_with_error(XFER_ELEMENT(self),
144             "while reading header from holding file '%s': %s",
145             self->next_filename, strerror(errno));
146         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
147         return FALSE;
148     }
149
150     parse_file_header(hdrbuf, &hdr, DISK_BLOCK_BYTES);
151     g_free(hdrbuf);
152     hdrbuf = NULL;
153
154     if (hdr.type != F_DUMPFILE && hdr.type != F_CONT_DUMPFILE) {
155         dumpfile_free_data(&hdr);
156         xfer_cancel_with_error(XFER_ELEMENT(self),
157             "unexpected header type %d in holding file '%s'",
158             hdr.type, self->next_filename);
159         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
160         return FALSE;
161     }
162
163     g_free(self->next_filename);
164     if (hdr.cont_filename[0]) {
165         self->next_filename = g_strdup(hdr.cont_filename);
166     } else {
167         self->next_filename = NULL;
168     }
169     dumpfile_free_data(&hdr);
170
171     return TRUE;
172 }
173
174 /* pick an arbitrary block size for reading */
175 #define HOLDING_BLOCK_SIZE (1024*128)
176
177 static gpointer
178 pull_buffer_impl(
179     XferElement *elt,
180     size_t *size)
181 {
182     XferSourceHolding *self = (XferSourceHolding *)elt;
183     char *buf = NULL;
184     size_t bytes_read;
185
186     if (elt->cancelled)
187         goto return_eof;
188
189     if (self->fd == -1) {
190         if (!start_new_chunk(self))
191             goto return_eof;
192     }
193
194     buf = g_malloc(HOLDING_BLOCK_SIZE);
195
196     while (1) {
197         bytes_read = full_read(self->fd, buf, HOLDING_BLOCK_SIZE);
198         if (bytes_read > 0) {
199             *size = bytes_read;
200             self->bytes_read += bytes_read;
201             return buf;
202         }
203
204         /* did an error occur? */
205         if (errno != 0) {
206             xfer_cancel_with_error(XFER_ELEMENT(self),
207                 "while reading holding file: %s", strerror(errno));
208             wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
209             goto return_eof;
210         }
211
212         if (!start_new_chunk(self))
213             goto return_eof;
214     }
215
216 return_eof:
217     g_free(buf);
218     *size = 0;
219     return NULL;
220 }
221
222 static void
223 instance_init(
224     XferElement *elt)
225 {
226     XferSourceHolding *self = (XferSourceHolding *)elt;
227
228     elt->can_generate_eof = TRUE;
229     self->fd = -1;
230 }
231
232 static void
233 finalize_impl(
234     GObject * obj_self)
235 {
236     XferSourceHolding *self = (XferSourceHolding *)obj_self;
237
238     if (self->next_filename)
239         g_free(self->next_filename);
240
241     if (self->fd != -1)
242         close(self->fd); /* ignore error; we were probably already cancelled */
243
244     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
245 }
246
247 static void
248 class_init(
249     XferSourceHoldingClass * selfc)
250 {
251     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
252     GObjectClass *goc = G_OBJECT_CLASS(selfc);
253     static xfer_element_mech_pair_t mech_pairs[] = {
254         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
255         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
256     };
257
258     klass->pull_buffer = pull_buffer_impl;
259
260     klass->perl_class = "Amanda::Xfer::Source::Holding";
261     klass->mech_pairs = mech_pairs;
262
263     goc->finalize = finalize_impl;
264
265     parent_class = g_type_class_peek_parent(selfc);
266 }
267
268 GType
269 xfer_source_holding_get_type (void)
270 {
271     static GType type = 0;
272
273     if G_UNLIKELY(type == 0) {
274         static const GTypeInfo info = {
275             sizeof (XferSourceHoldingClass),
276             (GBaseInitFunc) NULL,
277             (GBaseFinalizeFunc) NULL,
278             (GClassInitFunc) class_init,
279             (GClassFinalizeFunc) NULL,
280             NULL /* class_data */,
281             sizeof (XferSourceHolding),
282             0 /* n_preallocs */,
283             (GInstanceInitFunc) instance_init,
284             NULL
285         };
286
287         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceHolding", &info, 0);
288     }
289
290     return type;
291 }
292
293 /* create an element of this class; prototype is in xfer-element.h */
294 XferElement *
295 xfer_source_holding(
296     const char *filename)
297 {
298     XferSourceHolding *self = (XferSourceHolding *)g_object_new(XFER_SOURCE_HOLDING_TYPE, NULL);
299     XferElement *elt = XFER_ELEMENT(self);
300
301     self->next_filename = g_strdup(filename);
302     self->bytes_read = 0;
303
304     return elt;
305 }
306
307 guint64
308 xfer_source_holding_get_bytes_read(
309     XferElement *elt)
310 {
311     XferSourceHolding *self = (XferSourceHolding *)elt;
312
313     return self->bytes_read;
314 }
315