Imported Upstream version 3.3.2
[debian/amanda] / server-src / xfer-source-holding.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amanda.h"
23 #include "xfer-server.h"
24 #include "xfer-device.h"
25
26 /*
27  * Class declaration
28  *
29  * This declaration is entirely private; nothing but xfer_source_holding() references
30  * it directly.
31  */
32
33 GType xfer_source_holding_get_type(void);
34 #define XFER_SOURCE_HOLDING_TYPE (xfer_source_holding_get_type())
35 #define XFER_SOURCE_HOLDING(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_holding_get_type(), XferSourceHolding)
36 #define XFER_SOURCE_HOLDING_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), xfer_source_holding_get_type(), XferSourceHolding const)
37 #define XFER_SOURCE_HOLDING_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), xfer_source_holding_get_type(), XferSourceHoldingClass)
38 #define IS_XFER_SOURCE_HOLDING(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), xfer_source_holding_get_type ())
39 #define XFER_SOURCE_HOLDING_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), xfer_source_holding_get_type(), XferSourceHoldingClass)
40
41 static GObjectClass *parent_class = NULL;
42
43 /*
44  * Main object structure
45  */
46
47 typedef struct XferSourceHolding {
48     XferElement __parent__;
49
50     int fd;
51     char *next_filename;
52     guint64 bytes_read;
53
54     XferElement *dest_taper;
55 } XferSourceHolding;
56
57 /*
58  * Class definition
59  */
60
61 typedef struct {
62     XferElementClass __parent__;
63 } XferSourceHoldingClass;
64
65 /*
66  * Implementation
67  */
68
69 static gboolean
70 start_new_chunk(
71     XferSourceHolding *self)
72 {
73     char *hdrbuf = NULL;
74     dumpfile_t hdr;
75     size_t bytes_read;
76
77     /* try to close an already-open file */
78     if (self->fd != -1) {
79         if (close(self->fd) < 0) {
80             xfer_cancel_with_error(XFER_ELEMENT(self),
81                 "while closing holding file: %s", strerror(errno));
82             wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
83             return FALSE;
84         }
85
86         self->fd = -1;
87     }
88
89     /* if we have no next filename, then we're at EOF */
90     if (!self->next_filename) {
91         return FALSE;
92     }
93
94     /* otherwise, open up the next file */
95     self->fd = open(self->next_filename, O_RDONLY);
96     if (self->fd < 0) {
97         xfer_cancel_with_error(XFER_ELEMENT(self),
98             "while opening holding file '%s': %s",
99             self->next_filename, strerror(errno));
100         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
101         return FALSE;
102     }
103
104     /* get a downstream XferDestTaper, if one exists.  This check happens
105      * for each chunk, but chunks are large, so that's OK. */
106     if (!self->dest_taper) {
107         XferElement *elt = (XferElement *)self;
108
109         /* the xfer may have inserted glue between this element and
110         * the XferDestTaper. Glue does not change the bytestream, so
111         * it does not interfere with cache_inform calls. */
112         XferElement *iter = elt->downstream;
113         while (iter && IS_XFER_ELEMENT_GLUE(iter)) {
114             iter = iter->downstream;
115         }
116         if (IS_XFER_DEST_TAPER(iter))
117             self->dest_taper = iter;
118     }
119
120     /* tell a XferDestTaper about the new file */
121     if (self->dest_taper) {
122         struct stat st;
123         if (fstat(self->fd, &st) < 0) {
124             xfer_cancel_with_error(XFER_ELEMENT(self),
125                 "while finding size of holding file '%s': %s",
126                 self->next_filename, strerror(errno));
127             wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
128             return FALSE;
129         }
130
131         xfer_dest_taper_cache_inform(self->dest_taper,
132             self->next_filename,
133             DISK_BLOCK_BYTES,
134             st.st_size - DISK_BLOCK_BYTES);
135     }
136
137     /* read the header from the file and determine the filename of the next chunk */
138     hdrbuf = g_malloc(DISK_BLOCK_BYTES);
139     bytes_read = full_read(self->fd, hdrbuf, DISK_BLOCK_BYTES);
140     if (bytes_read < DISK_BLOCK_BYTES) {
141         g_free(hdrbuf);
142         xfer_cancel_with_error(XFER_ELEMENT(self),
143             "while reading header from holding file '%s': %s",
144             self->next_filename, strerror(errno));
145         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
146         return FALSE;
147     }
148
149     parse_file_header(hdrbuf, &hdr, DISK_BLOCK_BYTES);
150     g_free(hdrbuf);
151     hdrbuf = NULL;
152
153     if (hdr.type != F_DUMPFILE && hdr.type != F_CONT_DUMPFILE) {
154         dumpfile_free_data(&hdr);
155         xfer_cancel_with_error(XFER_ELEMENT(self),
156             "unexpected header type %d in holding file '%s'",
157             hdr.type, self->next_filename);
158         wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
159         return FALSE;
160     }
161
162     g_free(self->next_filename);
163     if (hdr.cont_filename[0]) {
164         self->next_filename = g_strdup(hdr.cont_filename);
165     } else {
166         self->next_filename = NULL;
167     }
168     dumpfile_free_data(&hdr);
169
170     return TRUE;
171 }
172
173 /* pick an arbitrary block size for reading */
174 #define HOLDING_BLOCK_SIZE (1024*128)
175
176 static gpointer
177 pull_buffer_impl(
178     XferElement *elt,
179     size_t *size)
180 {
181     XferSourceHolding *self = (XferSourceHolding *)elt;
182     char *buf = NULL;
183     size_t bytes_read;
184
185     if (elt->cancelled)
186         goto return_eof;
187
188     if (self->fd == -1) {
189         if (!start_new_chunk(self))
190             goto return_eof;
191     }
192
193     buf = g_malloc(HOLDING_BLOCK_SIZE);
194
195     while (1) {
196         bytes_read = full_read(self->fd, buf, HOLDING_BLOCK_SIZE);
197         if (bytes_read > 0) {
198             *size = bytes_read;
199             self->bytes_read += bytes_read;
200             return buf;
201         }
202
203         /* did an error occur? */
204         if (errno != 0) {
205             xfer_cancel_with_error(XFER_ELEMENT(self),
206                 "while reading holding file: %s", strerror(errno));
207             wait_until_xfer_cancelled(XFER_ELEMENT(self)->xfer);
208             goto return_eof;
209         }
210
211         if (!start_new_chunk(self))
212             goto return_eof;
213     }
214
215 return_eof:
216     g_free(buf);
217     *size = 0;
218     return NULL;
219 }
220
221 static void
222 instance_init(
223     XferElement *elt)
224 {
225     XferSourceHolding *self = (XferSourceHolding *)elt;
226
227     elt->can_generate_eof = TRUE;
228     self->fd = -1;
229 }
230
231 static void
232 finalize_impl(
233     GObject * obj_self)
234 {
235     XferSourceHolding *self = (XferSourceHolding *)obj_self;
236
237     if (self->next_filename)
238         g_free(self->next_filename);
239
240     if (self->fd != -1)
241         close(self->fd); /* ignore error; we were probably already cancelled */
242
243     G_OBJECT_CLASS(parent_class)->finalize(obj_self);
244 }
245
246 static void
247 class_init(
248     XferSourceHoldingClass * selfc)
249 {
250     XferElementClass *klass = XFER_ELEMENT_CLASS(selfc);
251     GObjectClass *goc = G_OBJECT_CLASS(selfc);
252     static xfer_element_mech_pair_t mech_pairs[] = {
253         { XFER_MECH_NONE, XFER_MECH_PULL_BUFFER, 1, 0},
254         { XFER_MECH_NONE, XFER_MECH_NONE, 0, 0},
255     };
256
257     klass->pull_buffer = pull_buffer_impl;
258
259     klass->perl_class = "Amanda::Xfer::Source::Holding";
260     klass->mech_pairs = mech_pairs;
261
262     goc->finalize = finalize_impl;
263
264     parent_class = g_type_class_peek_parent(selfc);
265 }
266
267 GType
268 xfer_source_holding_get_type (void)
269 {
270     static GType type = 0;
271
272     if G_UNLIKELY(type == 0) {
273         static const GTypeInfo info = {
274             sizeof (XferSourceHoldingClass),
275             (GBaseInitFunc) NULL,
276             (GBaseFinalizeFunc) NULL,
277             (GClassInitFunc) class_init,
278             (GClassFinalizeFunc) NULL,
279             NULL /* class_data */,
280             sizeof (XferSourceHolding),
281             0 /* n_preallocs */,
282             (GInstanceInitFunc) instance_init,
283             NULL
284         };
285
286         type = g_type_register_static (XFER_ELEMENT_TYPE, "XferSourceHolding", &info, 0);
287     }
288
289     return type;
290 }
291
292 /* create an element of this class; prototype is in xfer-element.h */
293 XferElement *
294 xfer_source_holding(
295     const char *filename)
296 {
297     XferSourceHolding *self = (XferSourceHolding *)g_object_new(XFER_SOURCE_HOLDING_TYPE, NULL);
298     XferElement *elt = XFER_ELEMENT(self);
299
300     self->next_filename = g_strdup(filename);
301     self->bytes_read = 0;
302
303     return elt;
304 }
305
306 guint64
307 xfer_source_holding_get_bytes_read(
308     XferElement *elt)
309 {
310     XferSourceHolding *self = (XferSourceHolding *)elt;
311
312     return self->bytes_read;
313 }
314