Imported Upstream version 2.6.1p2
[debian/amanda] / server-src / taper-disk-port-source.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2005-2008 Zmanda Inc.
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Lesser General Public
7  * License as published by the Free Software Foundation; either
8  * version 2.1 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Lesser General Public License for more details.
14  *
15  * You should have received a copy of the GNU Lesser General Public
16  * License along with this library; if not, write to the Free Software
17  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
18  */
19
20 #define selfp (self->_priv)
21
22 #include "taper-disk-port-source.h"
23 #include "taper-mem-port-source.h"
24
25 struct _TaperDiskPortSourcePrivate {
26     gboolean retry_mode;
27     int buffer_fd;
28     TaperSource * fallback;
29     gboolean disk_problem;
30     guint64 disk_buffered_bytes;
31     
32     /* This is for extra data we picked up by accident. */
33     char * excess_buffer;
34     size_t excess_buffer_size;
35
36     guint64 retry_data_written;
37 };
38
39 /* here are local prototypes */
40 static void taper_disk_port_source_init (TaperDiskPortSource * o);
41 static void taper_disk_port_source_class_init (TaperDiskPortSourceClass * c);
42 static ssize_t taper_disk_port_source_read (TaperSource * pself, void * buf,
43                                             size_t count);
44 static gboolean taper_disk_port_source_seek_to_part_start
45     (TaperSource * pself);
46 static void taper_disk_port_source_start_new_part (TaperSource * pself);
47 static gboolean taper_disk_port_source_get_end_of_data(TaperSource * pself);
48 static gboolean taper_disk_port_source_get_end_of_part(TaperSource * pself);
49 static int taper_disk_port_source_predict_parts(TaperSource * pself);
50
51 /* pointer to the class of our parent */
52 static TaperSourceClass * source_parent_class = NULL;
53 static TaperPortSourceClass *parent_class = NULL;
54
55 GType taper_disk_port_source_get_type (void) {
56     static GType type = 0;
57     
58     if G_UNLIKELY(type == 0) {
59         static const GTypeInfo info = {
60             sizeof (TaperDiskPortSourceClass),
61             (GBaseInitFunc) NULL,
62             (GBaseFinalizeFunc) NULL,
63             (GClassInitFunc) taper_disk_port_source_class_init,
64             (GClassFinalizeFunc) NULL,
65             NULL /* class_data */,
66             sizeof (TaperDiskPortSource),
67             0 /* n_preallocs */,
68             (GInstanceInitFunc) taper_disk_port_source_init,
69             NULL
70         };
71         
72         type = g_type_register_static (TAPER_TYPE_PORT_SOURCE,
73                                        "TaperDiskPortSource", &info,
74                                        (GTypeFlags)0);
75     }
76     
77     return type;
78 }
79
80 static void taper_disk_port_source_dispose(GObject * obj_self) {
81     TaperDiskPortSource *self = TAPER_DISK_PORT_SOURCE (obj_self);
82     if (G_OBJECT_CLASS (parent_class)->dispose)
83         (* G_OBJECT_CLASS (parent_class)->dispose) (obj_self);
84     
85     if(self->_priv->fallback) {
86         g_object_unref (self->_priv->fallback);
87     }
88 }
89
90 static void
91 taper_disk_port_source_finalize(GObject *obj_self)
92 {
93     TaperDiskPortSource *self = TAPER_DISK_PORT_SOURCE (obj_self);
94     if(G_OBJECT_CLASS(parent_class)->finalize)
95         (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
96
97     amfree(self->buffer_dir_name);
98     amfree(self->_priv->excess_buffer);
99     amfree(self->_priv);
100 }
101
102 static void 
103 taper_disk_port_source_init (TaperDiskPortSource * o G_GNUC_UNUSED)
104 {
105         o->_priv = malloc(sizeof(TaperDiskPortSourcePrivate));
106         o->_priv->retry_mode = FALSE;
107         o->_priv->buffer_fd = -1;
108         o->_priv->fallback = NULL;
109         o->_priv->disk_problem = FALSE;
110         o->_priv->disk_buffered_bytes = 0;
111         o->_priv->excess_buffer = NULL;
112         o->_priv->excess_buffer_size = 0;
113 }
114
115 static void 
116 taper_disk_port_source_class_init (TaperDiskPortSourceClass * c G_GNUC_UNUSED)
117 {
118     GObjectClass *g_object_class = (GObjectClass*) c;
119     TaperSourceClass *taper_source_class = (TaperSourceClass *)c;
120     
121     parent_class = g_type_class_ref (TAPER_TYPE_PORT_SOURCE);
122     source_parent_class = (TaperSourceClass*)parent_class;
123     
124     taper_source_class->read = taper_disk_port_source_read;
125     taper_source_class->seek_to_part_start =
126         taper_disk_port_source_seek_to_part_start;
127     taper_source_class->start_new_part = taper_disk_port_source_start_new_part;
128     taper_source_class->get_end_of_data =
129         taper_disk_port_source_get_end_of_data;
130     taper_source_class->get_end_of_part =
131         taper_disk_port_source_get_end_of_part;
132     taper_source_class->predict_parts = taper_disk_port_source_predict_parts;
133
134     g_object_class->dispose = taper_disk_port_source_dispose;
135     g_object_class->finalize = taper_disk_port_source_finalize;
136 }
137
138
139 static gboolean taper_disk_port_source_get_end_of_data(TaperSource * pself) {
140     TaperDiskPortSource * self = TAPER_DISK_PORT_SOURCE(pself);
141     g_return_val_if_fail(self != NULL, TRUE);
142
143     if (self->_priv->fallback != NULL) {
144         return taper_source_get_end_of_data(self->_priv->fallback);
145     } else {
146         return (source_parent_class->get_end_of_data)(pself);
147     }
148 }
149 static gboolean taper_disk_port_source_get_end_of_part(TaperSource * pself) {
150     TaperDiskPortSource * self = TAPER_DISK_PORT_SOURCE(pself);
151     g_return_val_if_fail(self != NULL, TRUE);
152
153     if (self->_priv->fallback != NULL) {
154         return taper_source_get_end_of_part(self->_priv->fallback);
155     } else {
156         return (source_parent_class->get_end_of_part)(pself);
157     }
158 }
159   
160 static int taper_disk_port_source_predict_parts(TaperSource * pself) {
161     TaperDiskPortSource * self = TAPER_DISK_PORT_SOURCE(pself);
162     g_return_val_if_fail(self != NULL, -1);
163
164     return -1;
165 }
166
167 static TaperSource * make_fallback_source(TaperDiskPortSource * self) {
168     TaperSource * rval;
169     TaperPortSource * port_rval;
170     rval = (TaperSource*)
171         g_object_new(TAPER_TYPE_MEM_PORT_SOURCE, NULL);
172     port_rval = (TaperPortSource*)rval;
173     
174     if (rval == NULL)
175         return NULL;
176
177     port_rval->socket_fd = ((TaperPortSource*)self)->socket_fd;
178     rval->max_part_size = self->fallback_buffer_size;
179
180     return rval;
181 }
182
183 /* Open the buffer file. We create the file and then immediately
184    unlink it, to improve security and ease cleanup. */
185 static gboolean open_buffer_file(TaperDiskPortSource * self) {
186     int fd;
187     char * filename;
188     mode_t old_umask;
189     TaperSource * pself = (TaperSource *)self;
190
191     g_return_val_if_fail(self != NULL, FALSE);
192     g_return_val_if_fail(self->buffer_dir_name != NULL, FALSE);
193
194     filename = g_strdup_printf("%s/amanda-split-buffer-XXXXXX",
195                                self->buffer_dir_name);
196     /* This is not thread-safe. :-( */
197     old_umask = umask(0);
198     fd = g_mkstemp(filename);
199     umask(old_umask);
200     if (fd < 0) {
201         pself->errmsg = newvstrallocf(pself->errmsg,
202                 "Couldn't open temporary file with template %s: %s",
203                 filename, strerror(errno));
204         return FALSE;
205     }
206
207     /* If it fails, that's annoying, but no great loss. */
208     if (unlink(filename) != 0) {
209         g_fprintf(stderr, "Unlinking %s failed: %s\n", filename,
210                 strerror(errno));
211     }
212
213     free(filename);
214     selfp->buffer_fd = fd;
215     return TRUE;
216 }
217
218 /* An error has occured with the disk buffer; store the extra data in
219    memory until we can recover. */
220 static void store_excess(TaperDiskPortSource * self, char * buf,
221                          size_t attempted_size, size_t disk_size) {
222     TaperSource * pself = (TaperSource*)self;
223     g_return_if_fail(attempted_size > 0);
224     g_return_if_fail(disk_size < attempted_size);
225     g_return_if_fail(buf != NULL);
226     g_return_if_fail(selfp->excess_buffer == NULL);
227
228     selfp->excess_buffer_size = attempted_size - disk_size;
229     selfp->excess_buffer = malloc(selfp->excess_buffer_size);
230     memcpy(selfp->excess_buffer, buf + disk_size, attempted_size - disk_size);
231
232     selfp->disk_buffered_bytes += disk_size;
233     pself->max_part_size = MIN(pself->max_part_size,
234                                selfp->disk_buffered_bytes);
235 }
236
237 /* Handle the output of the small amount of saved in-memory data. */
238 static size_t handle_excess_buffer_read(TaperDiskPortSource * self,
239                                         void * buf, size_t count) {
240     guint64 offset;
241
242     /* First, do we have anything left? */
243     if (selfp->retry_data_written >=
244         (selfp->disk_buffered_bytes + selfp->excess_buffer_size)) {
245         return 0;
246     }
247     
248     count = MIN(count,
249                 (selfp->disk_buffered_bytes + selfp->excess_buffer_size)
250                 - selfp->retry_data_written);
251
252     offset = selfp->disk_buffered_bytes + selfp->excess_buffer_size
253         - selfp->retry_data_written;
254     g_assert(offset + count <= selfp->excess_buffer_size);
255     memcpy(buf, selfp->excess_buffer + offset, count);
256
257     selfp->retry_data_written += count;
258
259     return count;
260 }
261     
262 /* Write data out to the disk buffer, and handle any problems that
263    crop up along the way. */
264 static ssize_t write_disk_buffer(TaperDiskPortSource * self, char * buf,
265                                  size_t read_size) {
266     size_t bytes_written = 0;
267     while (bytes_written < read_size) {
268         int write_result = write(selfp->buffer_fd, buf + bytes_written,
269                                  read_size - bytes_written);
270         if (write_result > 0) {
271             bytes_written += write_result;
272             continue;
273         } else if (write_result == 0) {
274             g_fprintf(stderr, "Writing disk buffer: Wrote 0 bytes.\n");
275             continue;
276         } else {
277             if (0
278 #ifdef EAGAIN
279                 || errno == EAGAIN
280 #endif
281 #ifdef EWOULDBLOCK
282                 || errno == EWOULDBLOCK
283 #endif
284 #ifdef EINTR
285                 || errno == EINTR
286 #endif
287                 ) {
288                 /* Try again. */
289                 continue;
290             } else if (0
291 #ifdef EFBIG
292                        || errno == EFBIG
293 #endif
294 #ifdef ENOSPC
295                        || errno == ENOSPC
296 #endif
297                        ) {
298                 /* Out of space */
299                 store_excess(self, buf, read_size, bytes_written);
300                 return read_size;
301             } else {
302                 /* I/O error. */
303                 store_excess(self, buf, read_size, bytes_written);
304                 selfp->disk_problem = TRUE;
305                 TAPER_SOURCE(self)->end_of_part = TRUE;
306                 return read_size;
307             }
308         }
309         g_assert_not_reached();
310     }
311
312     selfp->disk_buffered_bytes += bytes_written;
313     return read_size;
314 }
315
316 static ssize_t 
317 taper_disk_port_source_read (TaperSource * pself, void * buf, size_t count) {
318     TaperDiskPortSource * self = (TaperDiskPortSource*)pself;
319     int read_result;
320     int result;
321
322     g_return_val_if_fail (self != NULL, -1);
323     g_return_val_if_fail (TAPER_IS_DISK_PORT_SOURCE (self), -1);
324     g_return_val_if_fail (buf != NULL, -1);
325     g_return_val_if_fail (count > 0, -1);
326     g_assert(selfp->disk_buffered_bytes <= pself->max_part_size);
327         
328     if (selfp->fallback != NULL) {
329         return taper_source_read(selfp->fallback, buf, count);
330     } else if (selfp->buffer_fd < 0) {
331         if (!open_buffer_file(self)) {
332             /* Buffer file failed; go immediately to failover mode. */
333             selfp->fallback = make_fallback_source(self);
334             if (selfp->fallback != NULL) {
335                 return taper_source_read(selfp->fallback, buf, count);
336             } else {
337                 /* Even the fallback source failed! */
338                 return -1;
339             }
340         }
341     }
342
343     if (selfp->retry_mode) {
344         /* Read from disk buffer. */
345
346         if (selfp->retry_data_written < selfp->disk_buffered_bytes) {
347             /* Read from disk. */
348             count = MIN(count, selfp->disk_buffered_bytes -
349                                selfp->retry_data_written);
350             result = read(selfp->buffer_fd, buf, count);
351             if (result <= 0) {
352                 /* This should not happen. */
353                 return -1;
354             } else {
355                 selfp->retry_data_written += result;
356                 return result;
357             }
358         } else if (selfp->excess_buffer != NULL) {
359             /* We are writing out the last bit of buffer. Handle that. */
360             result = handle_excess_buffer_read(self, buf, count);
361             if (result) {
362                 return result;
363             }
364         }
365
366         /* No more cached data -- start reading from the part again */
367         selfp->retry_mode = FALSE;
368     }
369
370     /* Read from port. */
371     count = MIN(count, pself->max_part_size - selfp->disk_buffered_bytes);
372     if (count == 0) /* It was nonzero before. */ {
373         pself->end_of_part = TRUE;
374         return 0;
375     }
376
377     read_result = source_parent_class->read(pself, buf, count);
378     /* Parent handles EOF and other goodness. */
379     if (read_result <= 0) {
380         return read_result;
381     }
382
383     /* Now write to disk buffer. */
384     return write_disk_buffer(self, buf, read_result);
385 }
386
387 /* Try seeking back to byte 0. If that fails, then we mark ourselves
388    as having a disk problem. Returns FALSE in that case. */
389 static gboolean try_rewind(TaperDiskPortSource * self) {
390     gint64 result;
391     TaperSource * pself = (TaperSource *)self;
392     result = lseek(selfp->buffer_fd, 0, SEEK_SET);
393     if (result != 0) {
394         pself->errmsg = newvstrallocf(pself->errmsg,
395                 "Couldn't seek split buffer: %s", strerror(errno));
396         selfp->disk_problem = TRUE;
397         return FALSE;
398     } else {
399         return TRUE;
400     }
401 }
402
403 static gboolean 
404 taper_disk_port_source_seek_to_part_start (TaperSource * pself) {
405     TaperDiskPortSource * self = TAPER_DISK_PORT_SOURCE(pself);
406
407     if (self->_priv->fallback != NULL) {
408         return taper_source_seek_to_part_start(selfp->fallback);
409     }
410
411     if (selfp->disk_problem && selfp->disk_buffered_bytes) {
412         /* The disk buffer is screwed; nothing to do. */
413         return FALSE;
414     }
415
416     if (!selfp->disk_problem) {
417         if (!try_rewind(self)) {
418             return FALSE;
419         }
420     }
421
422     selfp->retry_mode = TRUE;
423     selfp->retry_data_written = 0;
424
425     if (source_parent_class->seek_to_part_start) {
426         return source_parent_class->seek_to_part_start(pself);
427     } else {
428         return TRUE;
429     }
430 }
431
432 static void 
433 taper_disk_port_source_start_new_part (TaperSource * pself) {
434     TaperDiskPortSource * self = (TaperDiskPortSource*)pself;
435     g_return_if_fail (self != NULL);
436     g_return_if_fail (TAPER_IS_DISK_PORT_SOURCE (pself));
437         
438     if (self->_priv->fallback != NULL) {
439         taper_source_start_new_part(self->_priv->fallback);
440         return;
441     }
442
443     selfp->retry_mode = FALSE;
444     if (!selfp->disk_problem) {
445         try_rewind(self); /* If this fails it will set disk_problem to
446                              TRUE. */
447     }
448
449     if (selfp->disk_problem && selfp->fallback == NULL) {
450         selfp->fallback = make_fallback_source(self);
451     }
452     selfp->disk_buffered_bytes = 0;
453     amfree(selfp->excess_buffer);
454     selfp->excess_buffer_size = selfp->retry_data_written = 0;
455
456     if (source_parent_class->start_new_part) {
457         source_parent_class->start_new_part(pself);
458     }
459 }