819c80ecc3bfc8a93a5bb995a166c9d66abc8ce9
[debian/amanda] / server-src / taper-file-source.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2005-2008 Zmanda Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  */
19
20 #define selfp (self->_priv)
21
22 #include "taper-file-source.h"
23
24 #include "fileheader.h"
25 #include "holding.h"
26
27 #define HOLDING_DISK_OPEN_FLAGS (O_NOCTTY | O_RDONLY)
28
29 struct _TaperFileSourcePrivate {
30     /* How many bytes have we written from the current part? */
31     guint64 current_part_pos;
32     /* Information about the files at the start of this part. */
33     dumpfile_t part_start_chunk_header;
34     int part_start_chunk_fd;
35     /* Where is the start of this part with respect to the first chunk
36        of the part? */
37     guint64 part_start_chunk_offset;
38     /* These may be the same as their part_start_chunk_ counterparts. */
39     dumpfile_t current_chunk_header;
40     int current_chunk_fd;
41     /* Current position of current_chunk_fd. */
42     guint64 current_chunk_position;
43     /* Expected number of split parts. */
44     int predicted_splits;
45 };
46 /* here are local prototypes */
47 static void taper_file_source_init (TaperFileSource * o);
48 static void taper_file_source_class_init (TaperFileSourceClass * c);
49 static ssize_t taper_file_source_read (TaperSource * pself, void * buf,
50                                             size_t count);
51 static gboolean taper_file_source_seek_to_part_start (TaperSource * pself);
52 static void taper_file_source_start_new_part (TaperSource * pself);
53 static int taper_file_source_predict_parts(TaperSource * pself);
54 static dumpfile_t * taper_file_source_get_first_header(TaperSource * pself);
55 static gboolean first_time_setup(TaperFileSource * self);
56
57 /* pointer to the class of our parent */
58 static TaperSourceClass *parent_class = NULL;
59
60 GType taper_file_source_get_type (void) {
61     static GType type = 0;
62     
63     if G_UNLIKELY(type == 0) {
64         static const GTypeInfo info = {
65             sizeof (TaperFileSourceClass),
66             (GBaseInitFunc) NULL,
67             (GBaseFinalizeFunc) NULL,
68             (GClassInitFunc) taper_file_source_class_init,
69             (GClassFinalizeFunc) NULL,
70             NULL /* class_data */,
71             sizeof (TaperFileSource),
72             0 /* n_preallocs */,
73             (GInstanceInitFunc) taper_file_source_init,
74             NULL
75         };
76         
77         type = g_type_register_static (TAPER_SOURCE_TYPE, "TaperFileSource",
78                                        &info, (GTypeFlags)0);
79     }
80     
81     return type;
82 }
83
84 static void
85 taper_file_source_finalize(GObject *obj_self)
86 {
87     TaperFileSource *self = TAPER_FILE_SOURCE (obj_self);
88     gpointer priv G_GNUC_UNUSED = self->_priv;
89     if(G_OBJECT_CLASS(parent_class)->finalize)
90         (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
91     if(self->_priv->part_start_chunk_fd >= 0) {
92         close (self->_priv->part_start_chunk_fd);
93     }
94     if(self->_priv->current_chunk_fd >= 0) {
95         close (self->_priv->current_chunk_fd);
96     }
97     dumpfile_free_data(&(self->_priv->part_start_chunk_header));
98     dumpfile_free_data(&(self->_priv->current_chunk_header));
99     amfree(self->_priv);
100 }
101
102 static void 
103 taper_file_source_init (TaperFileSource * o G_GNUC_UNUSED)
104 {
105     o->_priv = malloc(sizeof(TaperFileSourcePrivate));
106     o->_priv->part_start_chunk_fd = -1;
107     o->_priv->current_chunk_fd = -1;
108     o->_priv->predicted_splits = -1;
109     fh_init(&o->_priv->part_start_chunk_header);
110     fh_init(&o->_priv->current_chunk_header);
111     o->holding_disk_file = NULL;
112 }
113
114 static void  taper_file_source_class_init (TaperFileSourceClass * c) {
115     GObjectClass *g_object_class = (GObjectClass*) c;
116     TaperSourceClass *taper_source_class = (TaperSourceClass *)c;
117
118     parent_class = g_type_class_ref (TAPER_SOURCE_TYPE);
119
120     taper_source_class->read = taper_file_source_read;
121     taper_source_class->seek_to_part_start =
122         taper_file_source_seek_to_part_start;
123     taper_source_class->start_new_part = taper_file_source_start_new_part;
124     taper_source_class->get_first_header = taper_file_source_get_first_header;
125     taper_source_class->predict_parts = taper_file_source_predict_parts;
126
127     g_object_class->finalize = taper_file_source_finalize;
128 }
129
130 static void compute_splits(TaperFileSource * self) {
131     guint64 total_kb;
132     int predicted_splits;
133     TaperSource * pself = (TaperSource*)self;
134
135     if (selfp->predicted_splits > 0) {
136         return;
137     }
138
139     if (pself->max_part_size <= 0) {
140         selfp->predicted_splits = 1;
141         return;
142     }
143
144     total_kb = holding_file_size(self->holding_disk_file, TRUE);
145     if (total_kb <= 0) {
146         g_fprintf(stderr, "taper: %lld KB holding file makes no sense, not precalculating splits\n",
147                 (long long)total_kb);
148         fflush(stderr);
149         selfp->predicted_splits = -1;
150         return;
151     }
152     
153     g_fprintf(stderr, "taper: Total dump size should be %jukb, part size is %ju bytes\n",
154             (uintmax_t)total_kb, (uintmax_t)pself->max_part_size);
155
156     /* always add one here; if the max_part_size evenly divides the total
157      * dump size, taper will write an empty final part */
158     predicted_splits = (total_kb * 1024) / pself->max_part_size + 1;
159     g_fprintf(stderr, "taper: predicting %d split parts\n", predicted_splits);
160     selfp->predicted_splits = predicted_splits;
161 }
162
163 static int taper_file_source_predict_parts(TaperSource * pself) {
164     TaperFileSource * self = TAPER_FILE_SOURCE(pself);
165     g_return_val_if_fail(self != NULL, -1);
166
167     compute_splits(self);
168
169     return selfp->predicted_splits;
170 }
171
172 static dumpfile_t * taper_file_source_get_first_header(TaperSource * pself) {
173     TaperFileSource * self = TAPER_FILE_SOURCE(pself);
174     g_return_val_if_fail(self != NULL, NULL);
175
176     first_time_setup(self);
177
178     if (parent_class->get_first_header) {
179         return (parent_class->get_first_header)(pself);
180     } else {
181         return NULL;
182     }
183 }
184
185 /* Open a holding disk and parse the header. Returns TRUE if
186    everything went OK. Writes the fd into fd_pointer and the header
187    into header_pointer. Both must be non-NULL. */
188 static gboolean open_holding_file(char * filename, int * fd_pointer,
189                                   dumpfile_t * header_pointer, char **errmsg) {
190     int fd;
191     size_t read_result;
192     char * header_buffer;
193
194     g_return_val_if_fail(filename != NULL, FALSE);
195     g_return_val_if_fail(fd_pointer != NULL, FALSE);
196     g_return_val_if_fail(header_pointer != NULL, FALSE);
197
198     fd = robust_open(filename, O_NOCTTY | O_RDONLY, 0);
199     if (fd < 0) {
200         *errmsg = newvstrallocf(*errmsg,
201                 "Could not open holding disk file \"%s\": %s",
202                 filename, strerror(errno));
203         return FALSE;
204     }
205
206     header_buffer = malloc(DISK_BLOCK_BYTES);
207     read_result = full_read(fd, header_buffer, DISK_BLOCK_BYTES);
208     if (read_result < DISK_BLOCK_BYTES) {
209         if (errno != 0) {
210             *errmsg = newvstrallocf(*errmsg,
211                     "Could not read header from holding disk file %s: %s",
212                     filename, strerror(errno));
213         } else {
214             *errmsg = newvstrallocf(*errmsg,
215                     "Could not read header from holding disk file %s: got EOF",
216                     filename);
217         }
218         aclose(fd);
219         amfree(header_buffer);
220         return FALSE;
221     }
222
223     dumpfile_free_data(header_pointer);
224     parse_file_header(header_buffer, header_pointer, DISK_BLOCK_BYTES);
225     amfree(header_buffer);
226     
227     if (!(header_pointer->type == F_DUMPFILE ||
228           header_pointer->type == F_CONT_DUMPFILE)) {
229         *errmsg = newvstrallocf(*errmsg,
230                 "Got strange header from file %s",
231                 filename);
232         aclose(fd);
233         return FALSE;
234     }
235     
236     *fd_pointer = fd;
237     return TRUE;
238 }
239
240 /* Copy fd and header information from first chunk fields to current
241    chunk. Returns FALSE if an error occurs (unlikely). */
242 static gboolean copy_chunk_data(int * from_fd, int* to_fd,
243                                 dumpfile_t * from_header,
244                                 dumpfile_t * to_header,
245                                 char **errmsg) {
246     g_return_val_if_fail(from_fd != NULL, FALSE);
247     g_return_val_if_fail(to_fd != NULL, FALSE);
248     g_return_val_if_fail(from_header != NULL, FALSE);
249     g_return_val_if_fail(to_header != NULL, FALSE);
250     g_return_val_if_fail(*to_fd < 0, FALSE);
251     
252     *to_fd = dup(*from_fd);
253     if (*to_fd < 0) {
254         *errmsg = newvstrallocf(*errmsg, "dup(%d) failed!", *from_fd);
255         return FALSE;
256     }
257
258     dumpfile_free_data(to_header);
259     dumpfile_copy_in_place(to_header, from_header);
260
261     return TRUE;
262 }
263
264
265 static gboolean first_time_setup(TaperFileSource * self) {
266     TaperSource * pself = (TaperSource*)self;
267
268     if (selfp->part_start_chunk_fd >= 0) {
269         return TRUE;
270     }
271
272     g_return_val_if_fail(self->holding_disk_file != NULL, FALSE);
273
274     if (!open_holding_file(self->holding_disk_file, 
275                            &(selfp->part_start_chunk_fd),
276                            &(selfp->part_start_chunk_header),
277                            &(pself->errmsg))) {
278         return FALSE;
279     }
280
281     /* We are all set; just copy the "start chunk" datums into the
282        "current chunk" fields. */
283     if (!copy_chunk_data(&(selfp->part_start_chunk_fd),
284                          &(selfp->current_chunk_fd),
285                          &(selfp->part_start_chunk_header),
286                          &(selfp->current_chunk_header),
287                          &(pself->errmsg))) {
288         aclose(selfp->part_start_chunk_fd);
289         return FALSE;
290     }
291
292     dumpfile_free(pself->first_header);
293     pself->first_header = dumpfile_copy(&(selfp->part_start_chunk_header));
294
295     /* Should not be necessary. You never know! */
296     selfp->current_part_pos = selfp->part_start_chunk_offset =
297         selfp->current_chunk_position = 0;
298
299     return TRUE;
300 }
301
302 static int retry_read(int fd, void * buf, size_t count) {
303     for (;;) {
304         int read_result = read(fd, buf, count);
305         if (read_result < 0 && (0
306 #ifdef EAGAIN
307                                 || errno == EAGAIN
308 #endif
309 #ifdef EWOULDBLOCK
310                                 || errno == EWOULDBLOCK
311 #endif
312 #ifdef EINTR
313                                 || errno == EINTR
314 #endif
315                   )) {
316             /* Try again. */
317             continue;
318         } else {
319             if (read_result < 0) {
320                 g_fprintf(stderr, "Error reading holding disk: %s\n",
321                         strerror(errno));
322             }
323             return read_result;
324         }
325     }
326 }
327
328 /* If another chunk is available, load it. Returns TRUE if there are
329    no more chunks or the next chunk is loaded, or FALSE if an error
330    occurs. */
331 static gboolean get_next_chunk(TaperFileSource * self) {
332     char * cont_filename = NULL;
333     TaperSource * pself = (TaperSource*)self;
334
335     if (selfp->current_chunk_header.cont_filename[0] != '\0') {
336         cont_filename =
337             g_strdup(selfp->current_chunk_header.cont_filename);
338     } else {
339         /* No more data. */
340         aclose(selfp->current_chunk_fd);
341         dumpfile_free_data(&(selfp->current_chunk_header));
342         bzero(&(selfp->current_chunk_header),
343               sizeof(selfp->current_chunk_header));
344         return TRUE;
345     }
346
347     /* More data. */
348
349     aclose(selfp->current_chunk_fd);
350
351     if (!open_holding_file(cont_filename,
352                            &(selfp->current_chunk_fd),
353                            &(selfp->current_chunk_header),
354                            &(pself->errmsg))) {
355         amfree(cont_filename);
356         dumpfile_free_data(&(selfp->current_chunk_header));
357         bzero(&(selfp->current_chunk_header),
358               sizeof(selfp->current_chunk_header));
359         aclose(selfp->current_chunk_fd);
360         return FALSE;
361     }
362
363     amfree(cont_filename);
364     selfp->current_chunk_position = 0;
365
366     return TRUE;
367 }
368
369 static ssize_t 
370 taper_file_source_read (TaperSource * pself, void * buf, size_t count) {
371     TaperFileSource * self = (TaperFileSource*) pself;
372     int read_result;
373
374     g_return_val_if_fail (self != NULL, -1);
375     g_return_val_if_fail (TAPER_IS_FILE_SOURCE (self), -1);
376     g_return_val_if_fail (buf != NULL, -1);
377     g_return_val_if_fail (count > 0, -1);
378     
379     if (!first_time_setup(self))
380         return -1;
381
382     if (pself->max_part_size > 0) {
383         count = MIN(count, pself->max_part_size - selfp->current_part_pos);
384     }
385     if (count <= 0) {
386         /* Was positive before. Thus we are at EOP. */
387         pself->end_of_part = TRUE;
388         return 0;
389     }
390
391     /* We don't use full_read, because we would rather return a partial
392      * read ASAP. */
393     read_result = retry_read(selfp->current_chunk_fd, buf, count);
394     if (read_result < 0) {
395         /* Nothing we can do. */
396         pself->errmsg = newvstrallocf(pself->errmsg,
397                 "Error reading holding disk '%s': %s'",
398                  self->holding_disk_file, strerror(errno));
399         return read_result;
400     } else if (read_result == 0) {
401         if (!get_next_chunk(self)) {
402             return -1; 
403         }
404
405         if (selfp->current_chunk_fd >= 0) {
406             /* Try again with the next chunk. */
407             return taper_file_source_read(pself, buf, count);
408         } else {
409             pself->end_of_data = TRUE;
410             return 0;
411         }
412     } else {
413         /* Success. */
414         selfp->current_part_pos += read_result;
415         selfp->current_chunk_position += read_result;
416         return read_result;
417     }
418 }
419
420 static gboolean taper_file_source_seek_to_part_start (TaperSource * pself) {
421     TaperFileSource * self = (TaperFileSource*)pself;
422     off_t lseek_result;
423
424     g_return_val_if_fail (self != NULL, FALSE);
425     g_return_val_if_fail (TAPER_IS_FILE_SOURCE (self), FALSE);
426
427     aclose(selfp->current_chunk_fd);
428     if (!copy_chunk_data(&(selfp->part_start_chunk_fd),
429                          &(selfp->current_chunk_fd),
430                          &(selfp->part_start_chunk_header),
431                          &(selfp->current_chunk_header),
432                          &(pself->errmsg))) {
433         return FALSE;
434     }
435
436     selfp->current_chunk_position = selfp->part_start_chunk_offset;
437
438     lseek_result = lseek(selfp->current_chunk_fd,
439                          DISK_BLOCK_BYTES + selfp->current_chunk_position,
440                          SEEK_SET);
441     if (lseek_result < 0) {
442         pself->errmsg = newvstrallocf(pself->errmsg,
443                 "Could not seek holding disk file: %s\n",
444                 strerror(errno));
445         return FALSE;
446     }
447
448     selfp->current_part_pos = 0;
449
450     if (parent_class->seek_to_part_start)
451         return parent_class->seek_to_part_start(pself);
452     else
453         return TRUE;
454 }
455
456 static void taper_file_source_start_new_part (TaperSource * pself) {
457     TaperFileSource * self = (TaperFileSource*)pself;
458     g_return_if_fail (self != NULL);
459     g_return_if_fail (TAPER_IS_FILE_SOURCE (self));
460
461     aclose(selfp->part_start_chunk_fd);
462     if (!copy_chunk_data(&(selfp->current_chunk_fd),
463                          &(selfp->part_start_chunk_fd),
464                          &(selfp->current_chunk_header),
465                          &(selfp->part_start_chunk_header),
466                          &(pself->errmsg))) {
467         /* We can't return FALSE. :-( Instead, we set things up so
468            they will fail on the next read(). */
469         aclose(selfp->current_chunk_fd);
470         aclose(selfp->part_start_chunk_fd);
471         return;
472     }
473
474     selfp->part_start_chunk_offset = selfp->current_chunk_position;
475     selfp->current_part_pos = 0;
476
477     if (parent_class->start_new_part)
478         parent_class->start_new_part(pself);
479 }
480