Imported Upstream version 2.6.1
[debian/amanda] / server-src / taper-disk-port-source.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2005-2008 Zmanda Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  */
19
20 #define selfp (self->_priv)
21
22 #include "taper-disk-port-source.h"
23 #include "taper-mem-port-source.h"
24
25 struct _TaperDiskPortSourcePrivate {
26     gboolean retry_mode;
27     int buffer_fd;
28     TaperSource * fallback;
29     gboolean disk_problem;
30     guint64 disk_buffered_bytes;
31     
32     /* This is for extra data we picked up by accident. */
33     char * excess_buffer;
34     size_t excess_buffer_size;
35
36     guint64 retry_data_written;
37 };
38
39 /* here are local prototypes */
40 static void taper_disk_port_source_init (TaperDiskPortSource * o);
41 static void taper_disk_port_source_class_init (TaperDiskPortSourceClass * c);
42 static ssize_t taper_disk_port_source_read (TaperSource * pself, void * buf,
43                                             size_t count);
44 static gboolean taper_disk_port_source_seek_to_part_start
45     (TaperSource * pself);
46 static void taper_disk_port_source_start_new_part (TaperSource * pself);
47 static gboolean taper_disk_port_source_get_end_of_data(TaperSource * pself);
48 static gboolean taper_disk_port_source_get_end_of_part(TaperSource * pself);
49 static int taper_disk_port_source_predict_parts(TaperSource * pself);
50
51 /* pointer to the class of our parent */
52 static TaperSourceClass * source_parent_class = NULL;
53 static TaperPortSourceClass *parent_class = NULL;
54
55 GType taper_disk_port_source_get_type (void) {
56     static GType type = 0;
57     
58     if G_UNLIKELY(type == 0) {
59         static const GTypeInfo info = {
60             sizeof (TaperDiskPortSourceClass),
61             (GBaseInitFunc) NULL,
62             (GBaseFinalizeFunc) NULL,
63             (GClassInitFunc) taper_disk_port_source_class_init,
64             (GClassFinalizeFunc) NULL,
65             NULL /* class_data */,
66             sizeof (TaperDiskPortSource),
67             0 /* n_preallocs */,
68             (GInstanceInitFunc) taper_disk_port_source_init,
69             NULL
70         };
71         
72         type = g_type_register_static (TAPER_TYPE_PORT_SOURCE,
73                                        "TaperDiskPortSource", &info,
74                                        (GTypeFlags)0);
75     }
76     
77     return type;
78 }
79
80 static void taper_disk_port_source_dispose(GObject * obj_self) {
81     TaperDiskPortSource *self = TAPER_DISK_PORT_SOURCE (obj_self);
82     if (G_OBJECT_CLASS (parent_class)->dispose)
83         (* G_OBJECT_CLASS (parent_class)->dispose) (obj_self);
84     
85     if(self->_priv->fallback) {
86         g_object_unref (self->_priv->fallback);
87     }
88 }
89
90 static void
91 taper_disk_port_source_finalize(GObject *obj_self)
92 {
93     TaperDiskPortSource *self = TAPER_DISK_PORT_SOURCE (obj_self);
94     if(G_OBJECT_CLASS(parent_class)->finalize)
95         (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
96
97     amfree(self->buffer_dir_name);
98     amfree(self->_priv->excess_buffer);
99     amfree(self->_priv);
100 }
101
102 static void 
103 taper_disk_port_source_init (TaperDiskPortSource * o G_GNUC_UNUSED)
104 {
105         o->_priv = malloc(sizeof(TaperDiskPortSourcePrivate));
106         o->_priv->retry_mode = FALSE;
107         o->_priv->buffer_fd = -1;
108         o->_priv->fallback = NULL;
109         o->_priv->disk_problem = FALSE;
110         o->_priv->disk_buffered_bytes = 0;
111         o->_priv->excess_buffer = NULL;
112         o->_priv->excess_buffer_size = 0;
113 }
114
115 static void 
116 taper_disk_port_source_class_init (TaperDiskPortSourceClass * c G_GNUC_UNUSED)
117 {
118     GObjectClass *g_object_class = (GObjectClass*) c;
119     TaperSourceClass *taper_source_class = (TaperSourceClass *)c;
120     
121     parent_class = g_type_class_ref (TAPER_TYPE_PORT_SOURCE);
122     source_parent_class = (TaperSourceClass*)parent_class;
123     
124     taper_source_class->read = taper_disk_port_source_read;
125     taper_source_class->seek_to_part_start =
126         taper_disk_port_source_seek_to_part_start;
127     taper_source_class->start_new_part = taper_disk_port_source_start_new_part;
128     taper_source_class->get_end_of_data =
129         taper_disk_port_source_get_end_of_data;
130     taper_source_class->get_end_of_part =
131         taper_disk_port_source_get_end_of_part;
132     taper_source_class->predict_parts = taper_disk_port_source_predict_parts;
133
134     g_object_class->dispose = taper_disk_port_source_dispose;
135     g_object_class->finalize = taper_disk_port_source_finalize;
136 }
137
138
139 static gboolean taper_disk_port_source_get_end_of_data(TaperSource * pself) {
140     TaperDiskPortSource * self = TAPER_DISK_PORT_SOURCE(pself);
141     g_return_val_if_fail(self != NULL, TRUE);
142
143     if (self->_priv->fallback != NULL) {
144         return taper_source_get_end_of_data(self->_priv->fallback);
145     } else {
146         return (source_parent_class->get_end_of_data)(pself);
147     }
148 }
149 static gboolean taper_disk_port_source_get_end_of_part(TaperSource * pself) {
150     TaperDiskPortSource * self = TAPER_DISK_PORT_SOURCE(pself);
151     g_return_val_if_fail(self != NULL, TRUE);
152
153     if (self->_priv->fallback != NULL) {
154         return taper_source_get_end_of_part(self->_priv->fallback);
155     } else {
156         return (source_parent_class->get_end_of_part)(pself);
157     }
158 }
159   
160 static int taper_disk_port_source_predict_parts(TaperSource * pself) {
161     TaperDiskPortSource * self = TAPER_DISK_PORT_SOURCE(pself);
162     g_return_val_if_fail(self != NULL, -1);
163
164     return -1;
165 }
166
167 static TaperSource * make_fallback_source(TaperDiskPortSource * self) {
168     TaperSource * rval;
169     TaperPortSource * port_rval;
170     rval = (TaperSource*)
171         g_object_new(TAPER_TYPE_MEM_PORT_SOURCE, NULL);
172     port_rval = (TaperPortSource*)rval;
173     
174     if (rval == NULL)
175         return NULL;
176
177     port_rval->socket_fd = ((TaperPortSource*)self)->socket_fd;
178     rval->max_part_size = self->fallback_buffer_size;
179
180     return rval;
181 }
182
183 /* Open the buffer file. We create the file and then immediately
184    unlink it, to improve security and ease cleanup. */
185 static gboolean open_buffer_file(TaperDiskPortSource * self) {
186     int fd;
187     char * filename;
188     mode_t old_umask;
189     TaperSource * pself = (TaperSource *)self;
190
191     g_return_val_if_fail(self != NULL, FALSE);
192     g_return_val_if_fail(self->buffer_dir_name != NULL, FALSE);
193
194     filename = g_strdup_printf("%s/amanda-split-buffer-XXXXXX",
195                                self->buffer_dir_name);
196     /* This is not thread-safe. :-( */
197     old_umask = umask(0);
198     fd = g_mkstemp(filename);
199     umask(old_umask);
200     if (fd < 0) {
201         pself->errmsg = newvstrallocf(pself->errmsg,
202                 "Couldn't open temporary file with template %s: %s",
203                 filename, strerror(errno));
204         return FALSE;
205     }
206
207     /* If it fails, that's annoying, but no great loss. */
208     if (unlink(filename) != 0) {
209         g_fprintf(stderr, "Unlinking %s failed: %s\n", filename,
210                 strerror(errno));
211     }
212
213     free(filename);
214     selfp->buffer_fd = fd;
215     return TRUE;
216 }
217
218 /* An error has occured with the disk buffer; store the extra data in
219    memory until we can recover. */
220 static void store_excess(TaperDiskPortSource * self, char * buf,
221                          size_t attempted_size, size_t disk_size) {
222     TaperSource * pself = (TaperSource*)self;
223     g_return_if_fail(attempted_size > 0);
224     g_return_if_fail(disk_size < attempted_size);
225     g_return_if_fail(buf != NULL);
226     g_return_if_fail(selfp->excess_buffer == NULL);
227
228     selfp->excess_buffer_size = attempted_size - disk_size;
229     selfp->excess_buffer = malloc(selfp->excess_buffer_size);
230     memcpy(selfp->excess_buffer, buf + disk_size, attempted_size - disk_size);
231
232     selfp->disk_buffered_bytes += disk_size;
233     pself->max_part_size = MIN(pself->max_part_size,
234                                selfp->disk_buffered_bytes);
235 }
236
237 /* Handle the output of the small amount of saved in-memory data. */
238 static size_t handle_excess_buffer_read(TaperDiskPortSource * self,
239                                         void * buf, size_t count) {
240     TaperSource * pself = (TaperSource*)self;
241     guint64 offset;
242
243     /* First, do we have anything left? */
244     if (selfp->retry_data_written >=
245         (selfp->disk_buffered_bytes + selfp->excess_buffer_size)) {
246         pself->end_of_part = TRUE;
247         return 0;
248     }
249     
250     count = MIN(count,
251                 (selfp->disk_buffered_bytes + selfp->excess_buffer_size)
252                 - selfp->retry_data_written);
253
254     offset = selfp->disk_buffered_bytes + selfp->excess_buffer_size
255         - selfp->retry_data_written;
256     g_assert(offset + count <= selfp->excess_buffer_size);
257     memcpy(buf, selfp->excess_buffer + offset, count);
258
259     selfp->retry_data_written += count;
260
261     return count;
262 }
263     
264 /* Write data out to the disk buffer, and handle any problems that
265    crop up along the way. */
266 static ssize_t write_disk_buffer(TaperDiskPortSource * self, char * buf,
267                                  size_t read_size) {
268     size_t bytes_written = 0;
269     while (bytes_written < read_size) {
270         int write_result = write(selfp->buffer_fd, buf + bytes_written,
271                                  read_size - bytes_written);
272         if (write_result > 0) {
273             bytes_written += write_result;
274             continue;
275         } else if (write_result == 0) {
276             g_fprintf(stderr, "Writing disk buffer: Wrote 0 bytes.\n");
277             continue;
278         } else {
279             if (0
280 #ifdef EAGAIN
281                 || errno == EAGAIN
282 #endif
283 #ifdef EWOULDBLOCK
284                 || errno == EWOULDBLOCK
285 #endif
286 #ifdef EINTR
287                 || errno == EINTR
288 #endif
289                 ) {
290                 /* Try again. */
291                 continue;
292             } else if (0
293 #ifdef EFBIG
294                        || errno == EFBIG
295 #endif
296 #ifdef ENOSPC
297                        || errno == ENOSPC
298 #endif
299                        ) {
300                 /* Out of space */
301                 store_excess(self, buf, read_size, bytes_written);
302                 return read_size;
303             } else {
304                 /* I/O error. */
305                 store_excess(self, buf, read_size, bytes_written);
306                 selfp->disk_problem = TRUE;
307                 TAPER_SOURCE(self)->end_of_part = TRUE;
308                 return read_size;
309             }
310         }
311         g_assert_not_reached();
312     }
313
314     selfp->disk_buffered_bytes += bytes_written;
315     return read_size;
316 }
317
318 static ssize_t 
319 taper_disk_port_source_read (TaperSource * pself, void * buf, size_t count) {
320     TaperDiskPortSource * self = (TaperDiskPortSource*)pself;
321
322     g_return_val_if_fail (self != NULL, -1);
323     g_return_val_if_fail (TAPER_IS_DISK_PORT_SOURCE (self), -1);
324     g_return_val_if_fail (buf != NULL, -1);
325     g_return_val_if_fail (count > 0, -1);
326     g_assert(selfp->disk_buffered_bytes <= pself->max_part_size);
327         
328     if (selfp->fallback != NULL) {
329         return taper_source_read(selfp->fallback, buf, count);
330     } else if (selfp->buffer_fd < 0) {
331         if (!open_buffer_file(self)) {
332             /* Buffer file failed; go immediately to failover mode. */
333             selfp->fallback = make_fallback_source(self);
334             if (selfp->fallback != NULL) {
335                 return taper_source_read(selfp->fallback, buf, count);
336             } else {
337                 /* Even the fallback source failed! */
338                 return -1;
339             }
340         }
341     }
342
343     if (selfp->retry_mode) {
344         /* Read from disk buffer. */
345
346         if (selfp->retry_data_written < selfp->disk_buffered_bytes) {
347             /* Read from disk. */
348             int result;
349             count = MIN(count, selfp->disk_buffered_bytes -
350                                selfp->retry_data_written);
351             result = read(selfp->buffer_fd, buf, count);
352             if (result <= 0) {
353                 /* This should not happen. */
354                 return -1;
355             } else {
356                 selfp->retry_data_written += result;
357                 return result;
358             }
359         } else if (selfp->excess_buffer != NULL) {
360             /* We are writing out the last bit of buffer. Handle that. */
361             return handle_excess_buffer_read(self, buf, count);
362         } else {
363             /* No more data. */
364             pself->end_of_part = TRUE;
365             return 0;
366         }
367         
368         g_assert_not_reached();
369     } else {
370         /* Read from port. */
371         int read_result;
372         count = MIN(count, pself->max_part_size - selfp->disk_buffered_bytes);
373         if (count == 0) /* It was nonzero before. */ {
374             pself->end_of_part = TRUE;
375             return 0;
376         }
377         
378         read_result = source_parent_class->read(pself, buf, count);
379         /* Parent handles EOF and other goodness. */
380         if (read_result <= 0) {
381             return read_result;
382         }
383         /* Now write to disk buffer. */
384         return write_disk_buffer(self, buf, read_result);
385     }
386 }
387
388 /* Try seeking back to byte 0. If that fails, then we mark ourselves
389    as having a disk problem. Returns FALSE in that case. */
390 static gboolean try_rewind(TaperDiskPortSource * self) {
391     gint64 result;
392     TaperSource * pself = (TaperSource *)self;
393     result = lseek(selfp->buffer_fd, 0, SEEK_SET);
394     if (result != 0) {
395         pself->errmsg = newvstrallocf(pself->errmsg,
396                 "Couldn't seek split buffer: %s", strerror(errno));
397         selfp->disk_problem = TRUE;
398         return FALSE;
399     } else {
400         return TRUE;
401     }
402 }
403
404 static gboolean 
405 taper_disk_port_source_seek_to_part_start (TaperSource * pself) {
406     TaperDiskPortSource * self = (TaperDiskPortSource*)pself;
407     g_return_val_if_fail (self != NULL, FALSE);
408     g_return_val_if_fail (TAPER_IS_DISK_PORT_SOURCE (pself), FALSE);
409     g_return_val_if_fail (selfp->disk_buffered_bytes
410                           + selfp->excess_buffer_size > 0, FALSE);
411     
412     if (self->_priv->fallback != NULL) {
413         return taper_source_seek_to_part_start(selfp->fallback);
414     }
415
416     if (selfp->disk_problem && selfp->disk_buffered_bytes) {
417         /* The disk buffer is screwed; nothing to do. */
418         return FALSE;
419     }
420
421     if (!selfp->disk_problem) {
422         if (!try_rewind(self)) {
423             return FALSE;
424         }
425     }
426
427     selfp->retry_mode = TRUE;
428     selfp->retry_data_written = 0;
429
430     if (source_parent_class->seek_to_part_start) {
431         return source_parent_class->seek_to_part_start(pself);
432     } else {
433         return TRUE;
434     }
435 }
436
437 static void 
438 taper_disk_port_source_start_new_part (TaperSource * pself) {
439     TaperDiskPortSource * self = (TaperDiskPortSource*)pself;
440     g_return_if_fail (self != NULL);
441     g_return_if_fail (TAPER_IS_DISK_PORT_SOURCE (pself));
442         
443     if (self->_priv->fallback != NULL) {
444         taper_source_start_new_part(self->_priv->fallback);
445         return;
446     }
447
448     selfp->retry_mode = FALSE;
449     if (!selfp->disk_problem) {
450         try_rewind(self); /* If this fails it will set disk_problem to
451                              TRUE. */
452     }
453
454     if (selfp->disk_problem && selfp->fallback == NULL) {
455         selfp->fallback = make_fallback_source(self);
456     }
457     selfp->disk_buffered_bytes = 0;
458     amfree(selfp->excess_buffer);
459     selfp->excess_buffer_size = selfp->retry_data_written = 0;
460
461     if (source_parent_class->start_new_part) {
462         source_parent_class->start_new_part(pself);
463     }
464 }