2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2006 Zmanda Inc.
5 * This library is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU Lesser General Public
7 * License as published by the Free Software Foundation; either
8 * version 2.1 of the License, or (at your option) any later version.
10 * This library is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * Lesser General Public License for more details.
15 * You should have received a copy of the GNU Lesser General Public
16 * License along with this library; if not, write to the Free Software
17 * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
20 #define selfp (self->_priv)
22 #include "taper-disk-port-source.h"
23 #include "taper-mem-port-source.h"
25 struct _TaperDiskPortSourcePrivate {
28 TaperSource * fallback;
29 gboolean disk_problem;
30 guint64 disk_buffered_bytes;
32 /* This is for extra data we picked up by accident. */
34 size_t excess_buffer_size;
36 guint64 retry_data_written;
39 /* here are local prototypes */
40 static void taper_disk_port_source_init (TaperDiskPortSource * o);
41 static void taper_disk_port_source_class_init (TaperDiskPortSourceClass * c);
42 static ssize_t taper_disk_port_source_read (TaperSource * pself, void * buf,
44 static gboolean taper_disk_port_source_seek_to_part_start
45 (TaperSource * pself);
46 static void taper_disk_port_source_start_new_part (TaperSource * pself);
47 static gboolean taper_disk_port_source_get_end_of_data(TaperSource * pself);
48 static gboolean taper_disk_port_source_get_end_of_part(TaperSource * pself);
49 static int taper_disk_port_source_predict_parts(TaperSource * pself);
51 /* pointer to the class of our parent */
52 static TaperSourceClass * source_parent_class = NULL;
53 static TaperPortSourceClass *parent_class = NULL;
55 GType taper_disk_port_source_get_type (void) {
56 static GType type = 0;
58 if G_UNLIKELY(type == 0) {
59 static const GTypeInfo info = {
60 sizeof (TaperDiskPortSourceClass),
62 (GBaseFinalizeFunc) NULL,
63 (GClassInitFunc) taper_disk_port_source_class_init,
64 (GClassFinalizeFunc) NULL,
65 NULL /* class_data */,
66 sizeof (TaperDiskPortSource),
68 (GInstanceInitFunc) taper_disk_port_source_init,
72 type = g_type_register_static (TAPER_TYPE_PORT_SOURCE,
73 "TaperDiskPortSource", &info,
80 static void taper_disk_port_source_dispose(GObject * obj_self) {
81 TaperDiskPortSource *self = TAPER_DISK_PORT_SOURCE (obj_self);
82 if (G_OBJECT_CLASS (parent_class)->dispose)
83 (* G_OBJECT_CLASS (parent_class)->dispose) (obj_self);
85 if(self->_priv->fallback) {
86 g_object_unref (self->_priv->fallback);
91 taper_disk_port_source_finalize(GObject *obj_self)
93 TaperDiskPortSource *self = TAPER_DISK_PORT_SOURCE (obj_self);
94 if(G_OBJECT_CLASS(parent_class)->finalize)
95 (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
97 amfree(self->buffer_dir_name);
98 amfree(self->_priv->excess_buffer);
103 taper_disk_port_source_init (TaperDiskPortSource * o G_GNUC_UNUSED)
105 o->_priv = malloc(sizeof(TaperDiskPortSourcePrivate));
106 o->_priv->retry_mode = FALSE;
107 o->_priv->buffer_fd = -1;
108 o->_priv->fallback = NULL;
109 o->_priv->disk_problem = FALSE;
110 o->_priv->disk_buffered_bytes = 0;
111 o->_priv->excess_buffer = NULL;
112 o->_priv->excess_buffer_size = 0;
116 taper_disk_port_source_class_init (TaperDiskPortSourceClass * c G_GNUC_UNUSED)
118 GObjectClass *g_object_class = (GObjectClass*) c;
119 TaperSourceClass *taper_source_class = (TaperSourceClass *)c;
121 parent_class = g_type_class_ref (TAPER_TYPE_PORT_SOURCE);
122 source_parent_class = (TaperSourceClass*)parent_class;
124 taper_source_class->read = taper_disk_port_source_read;
125 taper_source_class->seek_to_part_start =
126 taper_disk_port_source_seek_to_part_start;
127 taper_source_class->start_new_part = taper_disk_port_source_start_new_part;
128 taper_source_class->get_end_of_data =
129 taper_disk_port_source_get_end_of_data;
130 taper_source_class->get_end_of_part =
131 taper_disk_port_source_get_end_of_part;
132 taper_source_class->predict_parts = taper_disk_port_source_predict_parts;
134 g_object_class->dispose = taper_disk_port_source_dispose;
135 g_object_class->finalize = taper_disk_port_source_finalize;
139 static gboolean taper_disk_port_source_get_end_of_data(TaperSource * pself) {
140 TaperDiskPortSource * self = TAPER_DISK_PORT_SOURCE(pself);
141 g_return_val_if_fail(self != NULL, TRUE);
143 if (self->_priv->fallback != NULL) {
144 return taper_source_get_end_of_data(self->_priv->fallback);
146 return (source_parent_class->get_end_of_data)(pself);
149 static gboolean taper_disk_port_source_get_end_of_part(TaperSource * pself) {
150 TaperDiskPortSource * self = TAPER_DISK_PORT_SOURCE(pself);
151 g_return_val_if_fail(self != NULL, TRUE);
153 if (self->_priv->fallback != NULL) {
154 return taper_source_get_end_of_part(self->_priv->fallback);
156 return (source_parent_class->get_end_of_part)(pself);
160 static int taper_disk_port_source_predict_parts(TaperSource * pself) {
161 TaperDiskPortSource * self = TAPER_DISK_PORT_SOURCE(pself);
162 g_return_val_if_fail(self != NULL, -1);
167 static TaperSource * make_fallback_source(TaperDiskPortSource * self) {
169 TaperPortSource * port_rval;
170 rval = (TaperSource*)
171 g_object_new(TAPER_TYPE_MEM_PORT_SOURCE, NULL);
172 port_rval = (TaperPortSource*)rval;
177 port_rval->socket_fd = ((TaperPortSource*)self)->socket_fd;
178 rval->max_part_size = self->fallback_buffer_size;
183 /* Open the buffer file. We create the file and then immediately
184 unlink it, to improve security and ease cleanup. */
185 static gboolean open_buffer_file(TaperDiskPortSource * self) {
190 g_return_val_if_fail(self != NULL, FALSE);
191 g_return_val_if_fail(self->buffer_dir_name != NULL, FALSE);
193 filename = g_strdup_printf("%s/amanda-split-buffer-XXXXXX",
194 self->buffer_dir_name);
195 /* This is not thread-safe. :-( */
196 old_umask = umask(0);
197 fd = g_mkstemp(filename);
200 g_fprintf(stderr, "Couldn't open temporary file with template %s: %s\n",
201 filename, strerror(errno));
205 /* If it fails, that's annoying, but no great loss. */
206 if (unlink(filename) != 0) {
207 g_fprintf(stderr, "Unlinking %s failed: %s\n", filename,
212 selfp->buffer_fd = fd;
216 /* An error has occured with the disk buffer; store the extra data in
217 memory until we can recover. */
218 static void store_excess(TaperDiskPortSource * self, char * buf,
219 size_t attempted_size, size_t disk_size) {
220 TaperSource * pself = (TaperSource*)self;
221 g_return_if_fail(attempted_size > 0);
222 g_return_if_fail(disk_size < attempted_size);
223 g_return_if_fail(buf != NULL);
224 g_return_if_fail(selfp->excess_buffer == NULL);
226 selfp->excess_buffer_size = attempted_size - disk_size;
227 selfp->excess_buffer = malloc(selfp->excess_buffer_size);
228 memcpy(selfp->excess_buffer, buf + disk_size, attempted_size - disk_size);
230 selfp->disk_buffered_bytes += disk_size;
231 pself->max_part_size = MIN(pself->max_part_size,
232 selfp->disk_buffered_bytes);
235 /* Handle the output of the small amount of saved in-memory data. */
236 static size_t handle_excess_buffer_read(TaperDiskPortSource * self,
237 void * buf, size_t count) {
238 TaperSource * pself = (TaperSource*)self;
241 /* First, do we have anything left? */
242 if (selfp->retry_data_written >=
243 (selfp->disk_buffered_bytes + selfp->excess_buffer_size)) {
244 pself->end_of_part = TRUE;
249 (selfp->disk_buffered_bytes + selfp->excess_buffer_size)
250 - selfp->retry_data_written);
252 offset = selfp->disk_buffered_bytes + selfp->excess_buffer_size
253 - selfp->retry_data_written;
254 g_assert(offset + count <= selfp->excess_buffer_size);
255 memcpy(buf, selfp->excess_buffer + offset, count);
257 selfp->retry_data_written += count;
262 /* Write data out to the disk buffer, and handle any problems that
263 crop up along the way. */
264 static ssize_t write_disk_buffer(TaperDiskPortSource * self, char * buf,
266 size_t bytes_written = 0;
267 while (bytes_written < read_size) {
268 int write_result = write(selfp->buffer_fd, buf + bytes_written,
269 read_size - bytes_written);
270 if (write_result > 0) {
271 bytes_written += write_result;
273 } else if (write_result == 0) {
274 g_fprintf(stderr, "Writing disk buffer: Wrote 0 bytes.\n");
282 || errno == EWOULDBLOCK
299 store_excess(self, buf, read_size, bytes_written);
303 store_excess(self, buf, read_size, bytes_written);
304 selfp->disk_problem = TRUE;
305 TAPER_SOURCE(self)->end_of_part = TRUE;
309 g_assert_not_reached();
312 selfp->disk_buffered_bytes += bytes_written;
317 taper_disk_port_source_read (TaperSource * pself, void * buf, size_t count) {
318 TaperDiskPortSource * self = (TaperDiskPortSource*)pself;
320 g_return_val_if_fail (self != NULL, -1);
321 g_return_val_if_fail (TAPER_IS_DISK_PORT_SOURCE (self), -1);
322 g_return_val_if_fail (buf != NULL, -1);
323 g_return_val_if_fail (count > 0, -1);
324 g_assert(selfp->disk_buffered_bytes <= pself->max_part_size);
326 if (selfp->fallback != NULL) {
327 return taper_source_read(selfp->fallback, buf, count);
328 } else if (selfp->buffer_fd < 0) {
329 if (!open_buffer_file(self)) {
330 /* Buffer file failed; go immediately to failover mode. */
331 selfp->fallback = make_fallback_source(self);
332 if (selfp->fallback != NULL) {
333 return taper_source_read(selfp->fallback, buf, count);
335 /* Even the fallback source failed! */
341 if (selfp->retry_mode) {
342 /* Read from disk buffer. */
344 if (selfp->retry_data_written < selfp->disk_buffered_bytes) {
345 /* Read from disk. */
347 count = MIN(count, selfp->disk_buffered_bytes -
348 selfp->retry_data_written);
349 result = read(selfp->buffer_fd, buf, count);
351 /* This should not happen. */
354 selfp->retry_data_written += result;
357 } else if (selfp->excess_buffer != NULL) {
358 /* We are writing out the last bit of buffer. Handle that. */
359 return handle_excess_buffer_read(self, buf, count);
362 pself->end_of_part = TRUE;
366 g_assert_not_reached();
368 /* Read from port. */
370 count = MIN(count, pself->max_part_size - selfp->disk_buffered_bytes);
371 if (count == 0) /* It was nonzero before. */ {
372 pself->end_of_part = TRUE;
376 read_result = source_parent_class->read(pself, buf, count);
377 /* Parent handles EOF and other goodness. */
378 if (read_result <= 0) {
381 /* Now write to disk buffer. */
382 return write_disk_buffer(self, buf, read_result);
386 /* Try seeking back to byte 0. If that fails, then we mark ourselves
387 as having a disk problem. Returns FALSE in that case. */
388 static gboolean try_rewind(TaperDiskPortSource * self) {
390 result = lseek(selfp->buffer_fd, 0, SEEK_SET);
392 g_fprintf(stderr, "Couldn't seek split buffer: %s\n", strerror(errno));
393 selfp->disk_problem = TRUE;
401 taper_disk_port_source_seek_to_part_start (TaperSource * pself) {
402 TaperDiskPortSource * self = (TaperDiskPortSource*)pself;
403 g_return_val_if_fail (self != NULL, FALSE);
404 g_return_val_if_fail (TAPER_IS_DISK_PORT_SOURCE (pself), FALSE);
405 g_return_val_if_fail (selfp->disk_buffered_bytes
406 + selfp->excess_buffer_size > 0, FALSE);
408 if (self->_priv->fallback != NULL) {
409 return taper_source_seek_to_part_start(selfp->fallback);
412 if (selfp->disk_problem && selfp->disk_buffered_bytes) {
413 /* The disk buffer is screwed; nothing to do. */
417 if (!selfp->disk_problem) {
418 if (!try_rewind(self)) {
423 selfp->retry_mode = TRUE;
424 selfp->retry_data_written = 0;
426 if (source_parent_class->seek_to_part_start) {
427 return source_parent_class->seek_to_part_start(pself);
434 taper_disk_port_source_start_new_part (TaperSource * pself) {
435 TaperDiskPortSource * self = (TaperDiskPortSource*)pself;
436 g_return_if_fail (self != NULL);
437 g_return_if_fail (TAPER_IS_DISK_PORT_SOURCE (pself));
439 if (self->_priv->fallback != NULL) {
440 taper_source_start_new_part(self->_priv->fallback);
444 selfp->retry_mode = FALSE;
445 if (!selfp->disk_problem) {
446 try_rewind(self); /* If this fails it will set disk_problem to
450 if (selfp->disk_problem && selfp->fallback == NULL) {
451 selfp->fallback = make_fallback_source(self);
453 selfp->disk_buffered_bytes = 0;
454 amfree(selfp->excess_buffer);
455 selfp->excess_buffer_size = selfp->retry_data_written = 0;
457 if (source_parent_class->start_new_part) {
458 source_parent_class->start_new_part(pself);