Imported Upstream version 2.6.0p2
[debian/amanda] / device-src / rait-device.c
1 /*
2  * Copyright (c) 2005 Zmanda, Inc.  All Rights Reserved.
3  * 
4  * This library is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU Lesser General Public License version 2.1 as 
6  * published by the Free Software Foundation.
7  * 
8  * This library is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
11  * License for more details.
12  * 
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this library; if not, write to the Free Software Foundation,
15  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
16  * 
17  * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 /* The RAIT device encapsulates some number of other devices into a single
22  * redundant device. */
23
24 #include "rait-device.h"
25 #include <amanda.h>
26 #include "property.h"
27 #include "util.h"
28
29 typedef enum {
30     RAIT_STATUS_COMPLETE, /* All subdevices OK. */
31     RAIT_STATUS_DEGRADED, /* One subdevice failed. */
32     RAIT_STATUS_FAILED    /* Two or more subdevices failed. */
33 } RaitStatus;
34
35 struct RaitDevicePrivate_s {
36     GPtrArray * children;
37     /* These flags are only relevant for reading. */
38     RaitStatus status;
39     /* If status == RAIT_STATUS_DEGRADED, this holds the index of the
40        failed node. It holds a negative number otherwise. */
41     int failed;
42     guint block_size;
43 };
44
45 #define PRIVATE(o) (o->private)
46
47 /* here are local prototypes */
48 static void rait_device_init (RaitDevice * o);
49 static void rait_device_class_init (RaitDeviceClass * c);
50 static gboolean rait_device_open_device (Device * self, char * device_name);
51 static gboolean rait_device_start (Device * self, DeviceAccessMode mode,
52                                    char * label, char * timestamp);
53 static gboolean rait_device_start_file(Device * self, const dumpfile_t * info);
54 static gboolean rait_device_write_block (Device * self, guint size,
55                                          gpointer data, gboolean last_block);
56 static gboolean rait_device_finish_file (Device * self);
57 static dumpfile_t * rait_device_seek_file (Device * self, guint file);
58 static gboolean rait_device_seek_block (Device * self, guint64 block);
59 static int      rait_device_read_block (Device * self, gpointer buf,
60                                         int * size);
61 static gboolean rait_device_property_get (Device * self, DevicePropertyId id,
62                                           GValue * val);
63 static gboolean rait_device_property_set (Device * self, DevicePropertyId id,
64                                           GValue * val);
65 static gboolean rait_device_recycle_file (Device * self, guint filenum);
66 static gboolean rait_device_finish (Device * self);
67 static ReadLabelStatusFlags rait_device_read_label(Device * dself);
68 static void find_simple_params(RaitDevice * self, guint * num_children,
69                                guint * data_children, int * blocksize);
70
71 /* pointer to the class of our parent */
72 static DeviceClass *parent_class = NULL;
73
74 /* This function is replicated here in case we have GLib from before 2.4.
75  * It should probably go eventually. */
76 #if !GLIB_CHECK_VERSION(2,4,0)
77 static void
78 g_ptr_array_foreach (GPtrArray *array,
79                      GFunc      func,
80                      gpointer   user_data)
81 {
82   guint i;
83
84   g_return_if_fail (array);
85
86   for (i = 0; i < array->len; i++)
87     (*func) (array->pdata[i], user_data);
88 }
89 #endif
90
91 GType
92 rait_device_get_type (void)
93 {
94     static GType type = 0;
95
96     if G_UNLIKELY(type == 0) {
97         static const GTypeInfo info = {
98             sizeof (RaitDeviceClass),
99             (GBaseInitFunc) NULL,
100             (GBaseFinalizeFunc) NULL,
101             (GClassInitFunc) rait_device_class_init,
102             (GClassFinalizeFunc) NULL,
103             NULL /* class_data */,
104             sizeof (RaitDevice),
105             0 /* n_preallocs */,
106             (GInstanceInitFunc) rait_device_init,
107             NULL
108         };
109         
110         type = g_type_register_static (TYPE_DEVICE, "RaitDevice", &info,
111                                        (GTypeFlags)0);
112         }
113     
114     return type;
115 }
116
117 static void g_object_unref_foreach(gpointer data,
118                                    gpointer user_data G_GNUC_UNUSED) {
119     g_return_if_fail(G_IS_OBJECT(data));
120     g_object_unref(data);
121 }
122
123 static void
124 rait_device_finalize(GObject *obj_self)
125 {
126     RaitDevice *self G_GNUC_UNUSED = RAIT_DEVICE (obj_self);
127     if(G_OBJECT_CLASS(parent_class)->finalize) \
128            (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
129     if(self->private->children) {
130         g_ptr_array_foreach(self->private->children,
131                             g_object_unref_foreach, NULL);
132         g_ptr_array_free (self->private->children, TRUE);
133         self->private->children = NULL;
134     }
135     amfree(self->private);
136 }
137
138 static void 
139 rait_device_init (RaitDevice * o G_GNUC_UNUSED)
140 {
141     PRIVATE(o) = malloc(sizeof(RaitDevicePrivate));
142     PRIVATE(o)->children = g_ptr_array_new();
143     PRIVATE(o)->status = RAIT_STATUS_COMPLETE;
144     PRIVATE(o)->failed = -1;
145 }
146
147 static void 
148 rait_device_class_init (RaitDeviceClass * c G_GNUC_UNUSED)
149 {
150     GObjectClass *g_object_class G_GNUC_UNUSED = (GObjectClass*) c;
151     DeviceClass *device_class = (DeviceClass *)c;
152
153     parent_class = g_type_class_ref (TYPE_DEVICE);
154
155     device_class->open_device = rait_device_open_device;
156     device_class->start = rait_device_start;
157     device_class->start_file = rait_device_start_file;
158     device_class->write_block = rait_device_write_block;
159     device_class->finish_file = rait_device_finish_file;
160     device_class->seek_file = rait_device_seek_file;
161     device_class->seek_block = rait_device_seek_block;
162     device_class->read_block = rait_device_read_block;
163     device_class->property_get = rait_device_property_get;
164     device_class->property_set = rait_device_property_set;
165     device_class->recycle_file = rait_device_recycle_file; 
166     device_class->finish = rait_device_finish;
167     device_class->read_label = rait_device_read_label;
168
169     g_object_class->finalize = rait_device_finalize;
170
171     /* Versions of glib before 2.10.2 crash if
172      * g_thread_pool_set_max_unused_threads is called before the first
173      * invocation of g_thread_pool_new.  So we make up a thread pool, but don't
174      * start any threads in it, and free it */
175
176 #if !GLIB_CHECK_VERSION(2,10,2)
177     {
178         GThreadPool *pool = g_thread_pool_new((GFunc)-1, NULL, -1, FALSE, NULL);
179         g_thread_pool_free(pool, TRUE, FALSE);
180     }
181 #endif
182
183     g_thread_pool_set_max_unused_threads(-1);
184 }
185
186 /* This function does something a little clever and a little
187  * complicated. It takes an array of operations and runs the given
188  * function on each element in the array. The trick is that it runs them
189  * all in parallel, in different threads. This is more efficient than it
190  * sounds because we use a GThreadPool, which means calling this function
191  * will probably not start any new threads at all, but rather use
192  * existing ones. The func is called with two gpointer arguments: The
193  * first from the array, the second is the data argument.
194  * 
195  * When it returns, all the operations have been successfully
196  * executed. If you want results from your operations, do it yourself
197  * through the array. */
198 static void do_thread_pool_op(GFunc func, GPtrArray * ops, gpointer data) {
199     GThreadPool * pool;
200     guint i;
201
202     pool = g_thread_pool_new(func, data, -1, FALSE, NULL);
203     for (i = 0; i < ops->len; i ++) {
204         g_thread_pool_push(pool, g_ptr_array_index(ops, i), NULL);
205     }
206
207     g_thread_pool_free(pool, FALSE, TRUE);
208 }
209
210 /* This does the above, in a serial fashion (and without using threads) */
211 static void do_unthreaded_ops(GFunc func, GPtrArray * ops,
212                               gpointer data G_GNUC_UNUSED) {
213     guint i;
214
215     for (i = 0; i < ops->len; i ++) {
216         func(g_ptr_array_index(ops, i), NULL);
217     }
218 }
219
220 /* This is the one that code below should call. It switches
221    automatically between do_thread_pool_op and do_unthreaded_ops,
222    depending on g_thread_supported(). */
223 static void do_rait_child_ops(GFunc func, GPtrArray * ops, gpointer data) {
224     if (g_thread_supported()) {
225         do_thread_pool_op(func, ops, data);
226     } else {
227         do_unthreaded_ops(func, ops, data);
228     }
229 }
230
231 /* Take a text string user_name, and break it out into an argv-style
232    array of strings. For example, {foo,{bar,baz},bat} would return the
233    strings "foo", "{bar,baz}", "bat", and NULL. Returns NULL on
234    error. */
235 static char ** parse_device_name(char * user_name) {
236     GPtrArray * rval;
237     char * cur_end = user_name;
238     char * cur_begin = user_name;
239     
240     rval = g_ptr_array_new();
241     
242     /* Check opening brace. */
243     if (*cur_begin != '{')
244         return NULL;
245     cur_begin ++;
246     
247     cur_end = cur_begin;
248     for (;;) {
249         switch (*cur_end) {
250         case ',': {
251             g_ptr_array_add(rval, g_strndup(cur_begin, cur_end - cur_begin));
252             cur_end ++;
253             cur_begin = cur_end;
254             continue;
255         }
256
257         case '{':
258             /* We read until the matching closing brace. */
259             while (*cur_end != '}' && *cur_end != '\0')
260                 cur_end ++;
261             if (*cur_end == '}')
262                 cur_end ++;
263             continue;
264             
265         case '}':
266             g_ptr_array_add(rval, g_strndup(cur_begin, cur_end - cur_begin));
267             goto OUTER_END; /* break loop, not switch */
268
269         case '\0':
270             /* Unexpected NULL; abort. */
271             g_fprintf(stderr, "Invalid RAIT device name %s\n", user_name);
272             g_ptr_array_free_full(rval);
273             return NULL;
274
275         default:
276             cur_end ++;
277             continue;
278         }
279         g_assert_not_reached();
280     }
281  OUTER_END:
282     
283     if (cur_end[1] != '\0') {
284         g_fprintf(stderr, "Invalid RAIT device name %s\n", user_name);
285         g_ptr_array_free_full(rval);
286         return NULL;
287     }
288
289     g_ptr_array_add(rval, NULL);
290
291     return (char**) g_ptr_array_free(rval, FALSE);
292 }
293
294 /* Find a workable block size. */
295 static gboolean find_block_size(RaitDevice * self) {
296     uint min = 0;
297     uint max = G_MAXUINT;
298     uint result;
299     GValue val;
300     gboolean rval;
301     guint i;
302     guint data_children;
303     
304     for (i = 0; i < self->private->children->len; i ++) {
305         uint child_min, child_max;
306         GValue property_result;
307         bzero(&property_result, sizeof(property_result));
308
309         if (!device_property_get(g_ptr_array_index(self->private->children, i),
310                                  PROPERTY_MIN_BLOCK_SIZE, &property_result))
311             return FALSE;
312         child_min = g_value_get_uint(&property_result);
313         g_return_val_if_fail(child_min > 0, FALSE);
314         if (!device_property_get(g_ptr_array_index(self->private->children, i),
315                                  PROPERTY_MAX_BLOCK_SIZE, &property_result))
316             return FALSE;
317         child_max = g_value_get_uint(&property_result);
318         g_return_val_if_fail(child_max > 0, FALSE);
319         
320         if (child_min > max || child_max < min || child_min == 0) {
321             return FALSE;
322         } else {
323             min = MAX(min, child_min);
324             max = MIN(max, child_max);
325         }
326     }
327
328     /* Now pick a number. */
329     g_assert(min <= max);
330     if (max < MAX_TAPE_BLOCK_BYTES)
331         result = max;
332     else if (min > MAX_TAPE_BLOCK_BYTES)
333         result = min;
334     else
335         result = MAX_TAPE_BLOCK_BYTES;
336
337     /* User reads and writes bigger blocks. */
338     find_simple_params(self, NULL, &data_children, NULL);
339     self->private->block_size = result * data_children;
340
341     bzero(&val, sizeof(val));
342     g_value_init(&val, G_TYPE_INT);
343     g_value_set_int(&val, result);
344     /* We can't do device_property_set because it's disallowed
345        according to the registered property base. */
346     rval = rait_device_property_set(DEVICE(self), PROPERTY_BLOCK_SIZE, &val);
347     g_value_unset(&val);
348     return rval;
349 }
350
351 /* Register properties that belong to the RAIT device proper, and not
352    to subdevices. */
353 static void register_rait_properties(RaitDevice * self) {
354     Device * o = DEVICE(self);
355     DeviceProperty prop;
356
357     prop.access = PROPERTY_ACCESS_GET_MASK;
358
359     prop.base = &device_property_min_block_size;
360     device_add_property(o, &prop, NULL);
361
362     prop.base = &device_property_max_block_size;
363     device_add_property(o, &prop, NULL);
364   
365     prop.base = &device_property_block_size;
366     device_add_property(o, &prop, NULL);
367
368     prop.base = &device_property_canonical_name;
369     device_add_property(o, &prop, NULL);
370 }
371
372 static void property_hash_union(GHashTable * properties,
373                                 const DeviceProperty * prop) {
374     PropertyAccessFlags before, after;
375     gpointer tmp;
376     gboolean found;
377     
378     found = g_hash_table_lookup_extended(properties,
379                                          GUINT_TO_POINTER(prop->base->ID),
380                                          NULL, &tmp);
381     before = GPOINTER_TO_UINT(tmp);
382     
383     if (!found) {
384         after = prop->access;
385     } else {
386         after = before & prop->access;
387     }
388     
389     g_hash_table_insert(properties, GUINT_TO_POINTER(prop->base->ID),
390                         GUINT_TO_POINTER(after));
391 }
392
393 /* A GHRFunc. */
394 static gboolean zero_value(gpointer key G_GNUC_UNUSED, gpointer value,
395                            gpointer user_data G_GNUC_UNUSED) {
396     return (0 == GPOINTER_TO_UINT(value));
397 }
398
399 /* A GHFunc */
400 static void register_property_hash(gpointer key, gpointer value,
401                                    gpointer user_data) {
402     DevicePropertyId id = GPOINTER_TO_UINT(key);
403     DeviceProperty prop;
404     Device * device = (Device*)user_data;
405
406     g_assert(IS_DEVICE(device));
407
408     prop.access = GPOINTER_TO_UINT(value);
409     prop.base = device_property_get_by_id(id);
410
411     device_add_property(device, &prop, NULL);
412 }
413
414 /* This function figures out which properties exist for all children, and 
415  * exports the unioned access mask. */
416 static void register_properties(RaitDevice * self) {
417     GHashTable * properties; /* PropertyID => PropertyAccessFlags */
418     guint j;
419     
420     properties = g_hash_table_new(g_direct_hash, g_direct_equal);
421
422     /* Iterate the device list, find all properties. */
423     for (j = 0; j < self->private->children->len; j ++) {
424         int i;
425         Device * child = g_ptr_array_index(self->private->children, j);
426         const DeviceProperty* device_property_list;
427
428         device_property_list = device_property_get_list(child);
429         for (i = 0; device_property_list[i].base != NULL; i ++) {
430             property_hash_union(properties, &(device_property_list[i]));
431         }
432     }
433
434     /* Then toss properties that can't be accessed. */
435     g_hash_table_foreach_remove(properties, zero_value, NULL);
436     g_hash_table_remove(properties, GINT_TO_POINTER(PROPERTY_BLOCK_SIZE));
437     g_hash_table_remove(properties, GINT_TO_POINTER(PROPERTY_MIN_BLOCK_SIZE));
438     g_hash_table_remove(properties, GINT_TO_POINTER(PROPERTY_MAX_BLOCK_SIZE));
439     g_hash_table_remove(properties, GINT_TO_POINTER(PROPERTY_CANONICAL_NAME));
440
441     /* Finally, register the lot. */
442     g_hash_table_foreach(properties, register_property_hash, self);
443
444     g_hash_table_destroy(properties);
445
446     /* Then we have some of our own properties to register. */
447     register_rait_properties(self);
448 }
449
450 /* This structure contains common fields for many operations. Not all
451    operations use all fields, however. */
452 typedef struct {
453     gpointer result; /* May be a pointer; may be an integer or boolean
454                         stored with GINT_TO_POINTER. */
455     Device * child;  /* The device in question. Used by all
456                         operations. */
457     guint child_index; /* For recoverable operations (read-related
458                           operations), this field provides the number
459                           of this child in the self->private->children
460                           array. */
461 } GenericOp;
462
463 typedef gboolean (*BooleanExtractor)(gpointer data);
464
465 /* A BooleanExtractor */
466 static gboolean extract_boolean_generic_op(gpointer data) {
467     GenericOp * op = data;
468     return GPOINTER_TO_INT(op->result);
469 }
470
471 /* A BooleanExtractor */
472 static gboolean extract_boolean_pointer_op(gpointer data) {
473     GenericOp * op = data;
474     return op->result != NULL;
475 }
476
477 /* Does the equivalent of this perl command:
478      ! (first { !extractor($_) } @_
479    That is, calls extractor on each element of the array, and returns
480    TRUE if and only if all calls to extractor return TRUE.
481 */
482 static gboolean g_ptr_array_and(GPtrArray * array,
483                                 BooleanExtractor extractor) {
484     guint i;
485     if (array == NULL || array->len <= 0)
486         return FALSE;
487
488     for (i = 0; i < array->len; i ++) {
489         if (!extractor(g_ptr_array_index(array, i)))
490             return FALSE;
491     }
492
493     return TRUE;
494 }
495
496 /* Calls extractor on each element of the array, and returns
497    TRUE if at least one of the calls to extractor return TRUE.
498 */
499 static gboolean g_ptr_array_or(GPtrArray * array,
500                                 BooleanExtractor extractor) {
501     guint i;
502     if (array == NULL || array->len <= 0)
503         return FALSE;
504
505     for (i = 0; i < array->len; i ++) {
506         if (extractor(g_ptr_array_index(array, i)))
507             return TRUE;
508     }
509
510     return FALSE;
511 }
512
513 /* Takes a RaitDevice, and makes a GPtrArray of GenericOp. */
514 static GPtrArray * make_generic_boolean_op_array(RaitDevice* self) {
515     GPtrArray * rval;
516     guint i;
517
518     rval = g_ptr_array_sized_new(self->private->children->len);
519     for (i = 0; i < self->private->children->len; i ++) {
520         GenericOp * op;
521         op = malloc(sizeof(*op));
522         op->child = g_ptr_array_index(self->private->children, i);
523         op->child_index = i;
524         g_ptr_array_add(rval, op);
525     }
526
527     return rval;
528 }
529
530 /* Takes a GPtrArray of GenericOp, and a BooleanExtractor, and does
531    all the proper handling for the result of operations that allow
532    device isolation. Returns FALSE only if an unrecoverable error
533    occured. */
534 static gboolean g_ptr_array_union_robust(RaitDevice * self, GPtrArray * ops,
535                                          BooleanExtractor extractor) {
536     int nfailed;
537     guint i;
538
539     /* We found one or more failed elements.  See which elements failed, and
540      * isolate them*/
541     nfailed = 0;
542     for (i = 0; i < ops->len; i ++) {
543         GenericOp * op = g_ptr_array_index(ops, i);
544         if (!extractor(op)) {
545             self->private->failed = op->child_index;
546             g_fprintf(stderr, "RAIT array %s isolated device %s\n",
547                     DEVICE(self)->device_name,
548                     op->child->device_name);
549             nfailed++;
550         }
551     }
552
553     /* no failures? great! */
554     if (nfailed == 0)
555         return TRUE;
556
557     /* a single failure in COMPLETE just puts us in DEGRADED mode */
558     if (self->private->status == RAIT_STATUS_COMPLETE && nfailed == 1) {
559         self->private->status = RAIT_STATUS_DEGRADED;
560         g_fprintf(stderr, "RAIT array %s DEGRADED\n", DEVICE(self)->device_name);
561         return TRUE;
562     } else {
563         self->private->status = RAIT_STATUS_FAILED;
564         g_fprintf(stderr, "RAIT array %s FAILED\n", DEVICE(self)->device_name);
565         return FALSE;
566     }
567 }
568
569 typedef struct {
570     Device * result;    /* IN */
571     char * device_name; /* OUT */
572 } OpenDeviceOp;
573
574 /* A GFunc. */
575 static void open_device_do_op(gpointer data,
576                               gpointer user_data G_GNUC_UNUSED) {
577     OpenDeviceOp * op = data;
578
579     op->result = device_open(op->device_name);
580     amfree(op->device_name);
581 }
582
583 /* Returns TRUE if and only if the volume label and time are equal. */
584 static gboolean compare_volume_results(Device * a, Device * b) {
585     return (0 == compare_possibly_null_strings(a->volume_time, b->volume_time)
586          && 0 == compare_possibly_null_strings(a->volume_label, b->volume_label));
587 }
588
589 static gboolean 
590 rait_device_open_device (Device * dself, char * device_name) {
591     char ** device_names;
592     GPtrArray * open_device_ops;
593     guint i;
594     gboolean failure;
595     RaitDevice * self;
596
597     self = RAIT_DEVICE(dself);
598     g_return_val_if_fail(self != NULL, FALSE);
599     g_return_val_if_fail (device_name != NULL, FALSE);
600
601     device_names = parse_device_name(device_name);
602     
603     if (device_names == NULL)
604         return FALSE;
605
606     /* Open devices in a separate thread, in case they have to rewind etc. */
607     open_device_ops = g_ptr_array_new();
608
609     for (i = 0; device_names[i] != NULL; i ++) {
610         OpenDeviceOp *op;
611
612         op = malloc(sizeof(*op));
613         op->device_name = device_names[i];
614         op->result = NULL;
615         g_ptr_array_add(open_device_ops, op);
616     }
617
618     free(device_names);
619     do_rait_child_ops(open_device_do_op, open_device_ops, NULL);
620
621     failure = FALSE;
622     /* Check results of opening devices. */
623     for (i = 0; i < open_device_ops->len; i ++) {
624         OpenDeviceOp *op = g_ptr_array_index(open_device_ops, i);
625         
626         if (op->result != NULL) {
627             g_ptr_array_add(self->private->children, op->result);
628         } else {
629             failure = TRUE;
630         }
631     }
632
633     g_ptr_array_free_full(open_device_ops);
634
635     failure = failure || !find_block_size(self);
636     if (failure)
637         return FALSE; /* This will clean up any created children. */
638
639     register_properties(self);
640
641     /* Chain up. */
642     if (parent_class->open_device) {
643         return parent_class->open_device(dself, device_name);
644     } else {
645         return TRUE;
646     }
647 }
648
649 /* A GFunc. */
650 static void read_label_do_op(gpointer data,
651                              gpointer user_data G_GNUC_UNUSED) {
652     GenericOp * op = data;
653     op->result = GINT_TO_POINTER(device_read_label(op->child));
654 }
655
656 static ReadLabelStatusFlags rait_device_read_label(Device * dself) {
657     RaitDevice * self;
658     GPtrArray * ops;
659     ReadLabelStatusFlags failed_result = 0;
660     ReadLabelStatusFlags rval;
661     GenericOp * failed_op = NULL; /* If this is non-null, we will isolate. */
662     unsigned int i;
663     Device * first_success = NULL;
664
665     self = RAIT_DEVICE(dself);
666     g_return_val_if_fail(self != NULL, FALSE);
667
668     amfree(dself->volume_label);
669
670     ops = make_generic_boolean_op_array(self);
671     
672     do_rait_child_ops(read_label_do_op, ops, NULL);
673     
674     for (i = 0; i < ops->len; i ++) {
675         GenericOp * op = g_ptr_array_index(ops, i);
676         ReadLabelStatusFlags result = GPOINTER_TO_INT(op->result);
677         if (op->result == READ_LABEL_STATUS_SUCCESS) {
678             if (first_success == NULL) {
679                 /* This is the first successful device. */
680                 first_success = op->child;
681             } else if (!compare_volume_results(first_success, op->child)) {
682                 /* Doesn't match. :-( */
683                 g_fprintf(stderr, "Inconsistant volume labels: "
684                         "Got %s/%s against %s/%s.\n",
685                         first_success->volume_label,
686                         first_success->volume_time, 
687                         op->child->volume_label,
688                         op->child->volume_time);
689                 failed_result |= READ_LABEL_STATUS_VOLUME_ERROR;
690                 failed_op = NULL;
691             }
692         } else {
693             if (failed_result == 0 &&
694                 self->private->status == RAIT_STATUS_COMPLETE) {
695                 /* This is the first failed device; note it and we'll isolate
696                    later. */
697                 failed_op = op;
698                 failed_result = result;
699             } else {
700                 /* We've encountered multiple failures. OR them together. */
701                 failed_result |= result;
702                 failed_op = NULL;
703             }
704         }
705     }
706
707     if (failed_op != NULL) {
708         /* We have a single device to isolate. */
709         failed_result = READ_LABEL_STATUS_SUCCESS; /* Recover later */
710         self->private->failed = failed_op->child_index;
711         g_fprintf(stderr, "RAIT array %s Isolated device %s.\n",
712                 dself->device_name,
713                 failed_op->child->device_name);
714     }
715
716     if (failed_result != READ_LABEL_STATUS_SUCCESS) {
717         /* We had multiple failures or an inconsistency. */
718         rval = failed_result;
719     } else {
720         /* Everything peachy. */
721         rval = READ_LABEL_STATUS_SUCCESS;
722         g_assert(first_success != NULL);
723         if (first_success->volume_label != NULL) {
724             dself->volume_label = g_strdup(first_success->volume_label);
725         }
726         if (first_success->volume_time != NULL) {
727             dself->volume_time = g_strdup(first_success->volume_time);
728         }
729     }
730     
731     g_ptr_array_free_full(ops);
732
733     return rval;
734 }
735
736 typedef struct {
737     GenericOp base;
738     DeviceAccessMode mode; /* IN */
739     char * label;          /* IN */
740     char * timestamp;      /* IN */
741 } StartOp;
742
743 /* A GFunc. */
744 static void start_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
745     DeviceClass *klass;
746     StartOp * param = data;
747     
748     klass = DEVICE_GET_CLASS(param->base.child);
749     if (klass->start) {
750         param->base.result =
751             GINT_TO_POINTER((klass->start)(param->base.child,
752                                             param->mode, param->label,
753                                             param->timestamp));
754     } else {
755         param->base.result = FALSE;
756     }
757 }
758
759 static gboolean 
760 rait_device_start (Device * dself, DeviceAccessMode mode, char * label,
761                    char * timestamp) {
762     GPtrArray * ops;
763     guint i;
764     gboolean success;
765     RaitDevice * self;
766     char * label_from_device = NULL;
767
768     self = RAIT_DEVICE(dself);
769     g_return_val_if_fail(self != NULL, FALSE);
770
771     amfree(dself->volume_label);
772     amfree(dself->volume_time);
773
774     ops = g_ptr_array_sized_new(self->private->children->len);
775     for (i = 0; i < self->private->children->len; i ++) {
776         StartOp * op;
777         op = malloc(sizeof(*op));
778         op->base.child = g_ptr_array_index(self->private->children, i);
779         op->mode = mode;
780         op->label = g_strdup(label);
781         op->timestamp = g_strdup(timestamp);
782         g_ptr_array_add(ops, op);
783     }
784     
785     do_rait_child_ops(start_do_op, ops, NULL);
786
787     success = g_ptr_array_and(ops, extract_boolean_generic_op);
788
789     /* check that all of the volume labels agree */
790     if (success) {
791         for (i = 0; i < self->private->children->len; i ++) {
792             Device *child = g_ptr_array_index(self->private->children, i);
793
794             if (child->volume_label != NULL && child->volume_time != NULL) {
795                 if (dself->volume_label != NULL && dself->volume_time != NULL) {
796                     if (strcmp(child->volume_label, dself->volume_label) != 0 ||
797                         strcmp(child->volume_time, dself->volume_time) != 0) {
798                         /* Mismatch! (Two devices provided different labels) */
799                         g_fprintf(stderr, "%s: Label (%s/%s) is different "
800                                           "from label (%s/%s) found at "
801                                           "device %s",
802                                           child->device_name,
803                                           child->volume_label,
804                                           child->volume_time,
805                                           dself->volume_label,
806                                           dself->volume_time,
807                                           label_from_device);
808                         success = FALSE;
809                     }
810                 } else {
811                     /* First device with a volume. */
812                     dself->volume_label = g_strdup(child->volume_label);
813                     dself->volume_time = g_strdup(child->volume_time);
814                     label_from_device = g_strdup(child->device_name);
815                 }
816             } else {
817                 /* Device problem, it says it succeeded but sets no label? */
818                 g_fprintf(stderr, "%s: %s",
819                                   child->device_name,
820                                   "Says label read, but device->volume_label "
821                                   " is NULL.");
822                 success = FALSE;
823             }
824         }
825     }
826
827     amfree(label_from_device);
828     g_ptr_array_free_full(ops);
829
830     if (!success) {
831         return FALSE;
832     } else if (parent_class->start) {
833         return parent_class->start(dself, mode, label, timestamp);
834     } else {
835         return TRUE;
836     }
837 }
838
839 typedef struct {
840     GenericOp base;
841     const dumpfile_t * info; /* IN */
842     int fileno;
843 } StartFileOp;
844
845 /* a GFunc */
846 static void start_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
847     StartFileOp * op = data;
848     op->base.result = GINT_TO_POINTER(device_start_file(op->base.child,
849                                                         op->info));
850     op->fileno = op->base.child->file;
851     if (op->fileno < 1) {
852         op->base.result = FALSE;
853     }
854 }
855
856 static gboolean
857 rait_device_start_file (Device * dself, const dumpfile_t * info) {
858     GPtrArray * ops;
859     guint i;
860     gboolean success;
861     RaitDevice * self;
862     int actual_file = -1;
863
864     self = RAIT_DEVICE(dself);
865     g_return_val_if_fail(self != NULL, FALSE);
866
867     ops = g_ptr_array_sized_new(self->private->children->len);
868     for (i = 0; i < self->private->children->len; i ++) {
869         StartFileOp * op;
870         op = malloc(sizeof(*op));
871         op->base.child = g_ptr_array_index(self->private->children, i);
872         op->info = info;
873         g_ptr_array_add(ops, op);
874     }
875     
876     do_rait_child_ops(start_file_do_op, ops, NULL);
877
878     success = g_ptr_array_and(ops, extract_boolean_generic_op);
879     
880     for (i = 0; i < self->private->children->len && success; i ++) {
881         StartFileOp * op = g_ptr_array_index(ops, i);
882         if (!op->base.result)
883             continue;
884         g_assert(op->fileno >= 1);
885         if (actual_file < 1) {
886             actual_file = op->fileno;
887         }
888         if (actual_file != op->fileno) {
889             /* File number mismatch! Aah, my hair is on fire! */
890             g_fprintf(stderr, "File number mismatch in "
891                              "rait_device_start_file(): "
892                              "Child %s reported file number "
893                              "%d, another child reported "
894                              "file number %d.",
895                              op->base.child->device_name,
896                              op->fileno, actual_file);
897             success = FALSE;
898             op->base.result = FALSE;
899         }
900     }
901
902     g_ptr_array_free_full(ops);
903
904     g_assert(actual_file >= 1);
905     dself->file = actual_file - 1; /* chain-up, below, will re-increment this */
906     dself->in_file = TRUE;
907
908     if (!success) {
909         g_fprintf(stderr, _("One or more devices failed to start_file"));
910         return FALSE;
911     } else if (parent_class->start_file) {
912         return parent_class->start_file(dself, info);
913     } else {
914         return TRUE;
915     }
916 }
917
918 static void find_simple_params(RaitDevice * self,
919                                guint * num_children,
920                                guint * data_children,
921                                int * blocksize) {
922     int num, data;
923     
924     num = self->private->children->len;
925     if (num > 1)
926         data = num - 1;
927     else
928         data = num;
929     if (num_children != NULL)
930         *num_children = num;
931     if (data_children != NULL)
932         *data_children = data;
933
934     if (blocksize != NULL) {
935         *blocksize = device_write_min_size(DEVICE(self));
936     }
937 }
938
939 typedef struct {
940     GenericOp base;
941     guint size;           /* IN */
942     gpointer data;        /* IN */
943     gboolean short_block; /* IN */
944     gboolean data_needs_free; /* bookkeeping */
945 } WriteBlockOp;
946
947 /* a GFunc. */
948 static void write_block_do_op(gpointer data,
949                               gpointer user_data G_GNUC_UNUSED) {
950     WriteBlockOp * op = data;
951
952     op->base.result =
953         GINT_TO_POINTER(device_write_block(op->base.child, op->size, op->data,
954                                            op->short_block));
955 }
956
957 /* Parity block generation. Performance of this function can be improved
958    considerably by using larger-sized integers or
959    assembly-coded vector instructions. Parameters are:
960    % data       - All data chunks in series (chunk_size * num_chunks bytes)
961    % parity     - Allocated space for parity block (chunk_size bytes)
962  */
963 static void make_parity_block(char * data, char * parity,
964                               guint chunk_size, guint num_chunks) {
965     guint i;
966     bzero(parity, chunk_size);
967     for (i = 0; i < num_chunks - 1; i ++) {
968         guint j;
969         for (j = 0; j < chunk_size; j ++) {
970             parity[j] ^= data[chunk_size*i + j];
971         }
972     }
973 }
974
975 /* Does the same thing as make_parity_block, but instead of using a
976    single memory chunk holding all chunks, it takes a GPtrArray of
977    chunks. */
978 static void make_parity_block_extents(GPtrArray * data, char * parity,
979                                       guint chunk_size) {
980     guint i;
981     bzero(parity, chunk_size);
982     for (i = 0; i < data->len; i ++) {
983         guint j;
984         char * data_chunk;
985         data_chunk = g_ptr_array_index(data, i);
986         for (j = 0; j < chunk_size; j ++) {
987             parity[j] ^= data_chunk[j];
988         }
989     }
990 }
991
992 /* Does the parity creation algorithm. Allocates and returns a single
993    device block from a larger RAIT block. chunks and chunk are 1-indexed. */
994 static char * extract_data_block(char * data, guint size,
995                                  guint chunks, guint chunk) {
996     char * rval;
997     guint chunk_size;
998
999     g_return_val_if_fail(chunks > 0 && chunk > 0 && chunk <= chunks, NULL);
1000     g_return_val_if_fail(data != NULL, NULL);
1001     g_return_val_if_fail(size > 0 && size % (chunks - 1) == 0, NULL);
1002
1003     chunk_size = size / (chunks - 1);
1004     rval = malloc(chunk_size);
1005     if (chunks != chunk) {
1006         /* data block. */
1007         memcpy(rval, data + chunk_size * (chunk - 1), chunk_size);
1008     } else {
1009         make_parity_block(data, rval, chunk_size, chunks);
1010     }
1011     
1012     return rval;
1013 }
1014
1015 static gboolean 
1016 rait_device_write_block (Device * dself, guint size, gpointer data,
1017                          gboolean last_block) {
1018     GPtrArray * ops;
1019     guint i;
1020     gboolean success;
1021     guint data_children, num_children;
1022     int blocksize;
1023     RaitDevice * self;
1024
1025     self = RAIT_DEVICE(dself);
1026     g_return_val_if_fail(self != NULL, FALSE);
1027
1028     find_simple_params(RAIT_DEVICE(self), &num_children, &data_children,
1029                        &blocksize);
1030     num_children = self->private->children->len;
1031     if (num_children != 1)
1032         data_children = num_children - 1;
1033     else
1034         data_children = num_children;
1035     
1036     g_return_val_if_fail(size % data_children == 0 || last_block, FALSE);
1037
1038     if (last_block) {
1039         char *new_data;
1040
1041         new_data = malloc(blocksize);
1042         memcpy(new_data, data, size);
1043         bzero(new_data + size, blocksize - size);
1044
1045         data = new_data;
1046         size = blocksize;
1047     }
1048
1049     ops = g_ptr_array_sized_new(num_children);
1050     for (i = 0; i < self->private->children->len; i ++) {
1051         WriteBlockOp * op;
1052         op = malloc(sizeof(*op));
1053         op->base.child = g_ptr_array_index(self->private->children, i);
1054         op->short_block = last_block;
1055         op->size = size / data_children;
1056         if (num_children <= 2) {
1057             op->data = data;
1058             op->data_needs_free = FALSE;
1059         } else {
1060             op->data_needs_free = TRUE;
1061             op->data = extract_data_block(data, size, num_children, i + 1);
1062         }
1063         g_ptr_array_add(ops, op);
1064     }
1065
1066     do_rait_child_ops(write_block_do_op, ops, NULL);
1067
1068     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1069
1070     for (i = 0; i < self->private->children->len; i ++) {
1071         WriteBlockOp * op = g_ptr_array_index(ops, i);
1072         if (op->data_needs_free)
1073             free(op->data);
1074     }
1075
1076     if (last_block) {
1077         amfree(data);
1078     }
1079     
1080     g_ptr_array_free_full(ops);
1081
1082     if (!success) {
1083         /* TODO be more specific here */
1084         /* TODO: handle EOF here -- if one or more (or two or more??)
1085          * children have is_eof* set, then reflect that in our error
1086          * status, and finish_file all of the non-EOF children. What's
1087          * more fun is when one device fails and must be isolated at
1088          * the same time another hits EOF. */
1089         g_fprintf(stderr, "One or more devices failed to write_block");
1090         return FALSE;
1091     } else {
1092         /* We don't chain up here because we must handle finish_file
1093            differently. If we were called with last_block, then the
1094            children have already called finish_file themselves. So we
1095            update the device block numbers manually. */
1096         dself->block ++;
1097         if (last_block)
1098             dself->in_file = FALSE;
1099
1100         return TRUE;
1101     }
1102 }
1103
1104 /* A GFunc */
1105 static void finish_file_do_op(gpointer data,
1106                               gpointer user_data G_GNUC_UNUSED) {
1107     GenericOp * op = data;
1108     op->result = GINT_TO_POINTER(device_finish_file(op->child));
1109 }
1110
1111 static gboolean 
1112 rait_device_finish_file (Device * self) {
1113     GPtrArray * ops;
1114     gboolean success;
1115
1116     ops = make_generic_boolean_op_array(RAIT_DEVICE(self));
1117     
1118     do_rait_child_ops(finish_file_do_op, ops, NULL);
1119
1120     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1121
1122     g_ptr_array_free_full(ops);
1123
1124     if (!success) {
1125         return FALSE;
1126     } else if (parent_class->finish_file) {
1127         return parent_class->finish_file(self);
1128     } else {
1129         return TRUE;
1130     }
1131 }
1132
1133 typedef struct {
1134     GenericOp base;
1135     guint requested_file;                /* IN */
1136     guint actual_file;                   /* OUT */
1137 } SeekFileOp;
1138
1139 /* a GFunc. */
1140 static void seek_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1141     SeekFileOp * op = data;
1142     op->base.result = device_seek_file(op->base.child, op->requested_file);
1143     op->actual_file = op->base.child->file;
1144 }
1145
1146 static dumpfile_t * 
1147 rait_device_seek_file (Device * dself, guint file) {
1148     GPtrArray * ops;
1149     guint i;
1150     gboolean success;
1151     dumpfile_t * rval;
1152     RaitDevice * self = RAIT_DEVICE(dself);
1153     guint actual_file = 0;
1154     g_return_val_if_fail(self != NULL, FALSE);
1155
1156     ops = g_ptr_array_sized_new(self->private->children->len);
1157     for (i = 0; i < self->private->children->len; i ++) {
1158         SeekFileOp * op;
1159         if ((int)i == self->private->failed)
1160             continue; /* This device is broken. */
1161         op = malloc(sizeof(*op));
1162         op->base.child = g_ptr_array_index(self->private->children, i);
1163         op->base.child_index = i;
1164         op->requested_file = file;
1165         g_ptr_array_add(ops, op);
1166     }
1167     
1168     do_rait_child_ops(seek_file_do_op, ops, NULL);
1169
1170     /* This checks for NULL values, but we still have to check for
1171        consistant headers. */
1172     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1173                                        ops, extract_boolean_pointer_op);
1174
1175     rval = NULL;
1176     for (i = 0; i < self->private->children->len; i ++) {
1177         SeekFileOp * this_op;
1178         dumpfile_t * this_result;
1179         guint this_actual_file;
1180         if ((int)i == self->private->failed)
1181             continue;
1182         
1183         this_op = (SeekFileOp*)g_ptr_array_index(ops, i);
1184         this_result = this_op->base.result;
1185         this_actual_file = this_op->actual_file;
1186
1187         if (rval == NULL) {
1188             rval = this_result;
1189             actual_file = this_actual_file;
1190         } else {
1191             if (headers_are_equal(rval, this_result) &&
1192                 actual_file == this_actual_file) {
1193                 /* Do nothing. */
1194             } else {
1195                 success = FALSE;
1196             }
1197             free(this_result);
1198         }
1199     }
1200
1201     g_ptr_array_free_full(ops);
1202
1203     if (!success) {
1204         amfree(rval);
1205         return NULL;
1206     } else if (parent_class->seek_file) {
1207         parent_class->seek_file(dself, file);
1208     }
1209
1210     return rval;
1211 }
1212
1213 typedef struct {
1214     GenericOp base;
1215     guint64 block; /* IN */
1216 } SeekBlockOp;
1217
1218 /* a GFunc. */
1219 static void seek_block_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1220     SeekBlockOp * op = data;
1221     op->base.result =
1222         GINT_TO_POINTER(device_seek_block(op->base.child, op->block));
1223 }
1224
1225 static gboolean 
1226 rait_device_seek_block (Device * dself, guint64 block) {
1227     GPtrArray * ops;
1228     guint i;
1229     gboolean success;
1230
1231     RaitDevice * self = RAIT_DEVICE(dself);
1232     g_return_val_if_fail(self != NULL, FALSE);
1233
1234     ops = g_ptr_array_sized_new(self->private->children->len);
1235     for (i = 0; i < self->private->children->len; i ++) {
1236         SeekBlockOp * op;
1237         if ((int)i == self->private->failed)
1238             continue; /* This device is broken. */
1239         op = malloc(sizeof(*op));
1240         op->base.child = g_ptr_array_index(self->private->children, i);
1241         op->base.child_index = i;
1242         op->block = block;
1243         g_ptr_array_add(ops, op);
1244     }
1245     
1246     do_rait_child_ops(seek_block_do_op, ops, NULL);
1247
1248     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1249                                        ops, extract_boolean_generic_op);
1250
1251     g_ptr_array_free_full(ops);
1252
1253     if (!success) {
1254         return FALSE;
1255     } else if (parent_class->seek_block) {
1256         return parent_class->seek_block(dself, block);
1257     } else {
1258         return success;
1259     }
1260 }
1261
1262 typedef struct {
1263     GenericOp base;
1264     gpointer buffer; /* IN */
1265     int read_size;      /* IN/OUT -- note not a pointer */
1266     int desired_read_size; /* bookkeeping */
1267 } ReadBlockOp;
1268
1269 /* a GFunc. */
1270 static void read_block_do_op(gpointer data,
1271                              gpointer user_data G_GNUC_UNUSED) {
1272     ReadBlockOp * op = data;
1273     op->base.result =
1274         GINT_TO_POINTER(device_read_block(op->base.child, op->buffer,
1275                                           &(op->read_size)));
1276 }
1277
1278 /* A BooleanExtractor. This one checks for a successful read. */
1279 static gboolean extract_boolean_read_block_op_data(gpointer data) {
1280     ReadBlockOp * op = data;
1281     return GPOINTER_TO_INT(op->base.result) == op->desired_read_size;
1282 }
1283
1284 /* A BooleanExtractor. This one checks for EOF. */
1285 static gboolean extract_boolean_read_block_op_eof(gpointer data) {
1286     ReadBlockOp * op = data;
1287     return op->base.child->is_eof;
1288 }
1289
1290 /* Counts the number of elements in an array matching a given proposition. */
1291 static int g_ptr_array_count(GPtrArray * array, BooleanExtractor filter) {
1292     int rval;
1293     unsigned int i;
1294     rval = 0;
1295     for (i = 0; i < array->len ; i++) {
1296         if (filter(g_ptr_array_index(array, i)))
1297             rval ++;
1298     }
1299     return rval;
1300 }
1301
1302 static gboolean raid_block_reconstruction(RaitDevice * self, GPtrArray * ops,
1303                                       gpointer buf, size_t bufsize) {
1304     guint num_children, data_children;
1305     int blocksize, child_blocksize;
1306     guint i;
1307     int parity_child;
1308     gpointer parity_block = NULL;
1309     gboolean success;
1310
1311     success = TRUE;
1312     find_simple_params(self, &num_children, &data_children, &blocksize);
1313     if (num_children > 1)
1314         parity_child = num_children - 1;
1315     else
1316         parity_child = -1;
1317
1318     child_blocksize = blocksize / data_children;
1319
1320     for (i = 0; i < ops->len; i ++) {
1321         ReadBlockOp * op = g_ptr_array_index(ops, i);
1322         if (!extract_boolean_read_block_op_data(op))
1323             continue;
1324         if ((int)(op->base.child_index) == parity_child) {
1325             parity_block = op->buffer;
1326         } else {
1327             g_assert(child_blocksize * (op->base.child_index+1) <= bufsize);
1328             memcpy((char *)buf + child_blocksize * op->base.child_index, op->buffer,
1329                    child_blocksize);
1330         }
1331     }
1332
1333     if (self->private->status == RAIT_STATUS_COMPLETE) {
1334         if (num_children >= 2) {
1335             /* Verify the parity block. This code is inefficient but
1336                does the job for the 2-device case, too. */
1337             gpointer constructed_parity;
1338             GPtrArray * data_extents;
1339             
1340             constructed_parity = malloc(child_blocksize);
1341             data_extents = g_ptr_array_sized_new(data_children);
1342             for (i = 0; i < data_children; i ++) {
1343                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1344                 g_assert(extract_boolean_read_block_op_data(op));
1345                 if ((int)op->base.child_index == parity_child)
1346                     continue;
1347                 g_ptr_array_add(data_extents, op->buffer);
1348             }
1349             make_parity_block_extents(data_extents, constructed_parity,
1350                                       child_blocksize);
1351             
1352             if (0 != memcmp(parity_block, constructed_parity,
1353                             child_blocksize)) {
1354                 g_fprintf(stderr, "RAIT is inconsistant: "
1355                         "Parity block did not match data blocks.\n");
1356                 success = FALSE;
1357             }
1358             g_ptr_array_free(data_extents, TRUE);
1359             amfree(constructed_parity);
1360         } else { /* do nothing. */ }
1361     } else if (self->private->status == RAIT_STATUS_DEGRADED) {
1362         g_assert(self->private->failed >= 0 && self->private->failed < (int)num_children);
1363         /* We are in degraded mode. What's missing? */
1364         if (self->private->failed == parity_child) {
1365             /* do nothing. */
1366         } else if (num_children >= 2) {
1367             /* Reconstruct failed block from parity block. */
1368             GPtrArray * data_extents = g_ptr_array_new();            
1369
1370             for (i = 0; i < data_children; i ++) {
1371                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1372                 if (!extract_boolean_read_block_op_data(op))
1373                     continue;
1374                 g_ptr_array_add(data_extents, op->buffer);
1375             }
1376
1377             /* Conveniently, the reconstruction is the same procedure
1378                as the parity generation. This even works if there is
1379                only one remaining device! */
1380             make_parity_block_extents(data_extents,
1381                                       (char *)buf + (child_blocksize *
1382                                              self->private->failed),
1383                                       child_blocksize);
1384
1385             /* The array members belong to our ops argument. */
1386             g_ptr_array_free(data_extents, TRUE);
1387         } else {
1388             g_assert_not_reached();
1389         }
1390     } else {
1391         success = FALSE;
1392     }
1393     return success;
1394 }
1395
1396 static int
1397 rait_device_read_block (Device * dself, gpointer buf, int * size) {
1398     GPtrArray * ops;
1399     guint i;
1400     gboolean success;
1401     guint num_children, data_children;
1402     int blocksize;
1403     gsize child_blocksize;
1404
1405     RaitDevice * self = RAIT_DEVICE(dself);
1406     g_return_val_if_fail(self != NULL, -1);
1407
1408     find_simple_params(self, &num_children, &data_children,
1409                        &blocksize);
1410
1411     /* tell caller they haven't given us a big enough buffer */
1412     if (blocksize < *size) {
1413         *size = blocksize;
1414         return 0;
1415     }
1416
1417     g_return_val_if_fail(*size >= (int)device_write_min_size(dself), -1);
1418
1419     g_assert(blocksize % data_children == 0); /* If not we are screwed */
1420     child_blocksize = blocksize / data_children;
1421
1422     ops = g_ptr_array_sized_new(num_children);
1423     for (i = 0; i < num_children; i ++) {
1424         ReadBlockOp * op;
1425         if ((int)i == self->private->failed)
1426             continue; /* This device is broken. */
1427         op = malloc(sizeof(*op));
1428         op->base.child = g_ptr_array_index(self->private->children, i);
1429         op->base.child_index = i;
1430         op->buffer = malloc(child_blocksize);
1431         op->desired_read_size = op->read_size = blocksize / data_children;
1432         g_ptr_array_add(ops, op);
1433     }
1434     
1435     do_rait_child_ops(read_block_do_op, ops, NULL);
1436
1437     if (g_ptr_array_count(ops, extract_boolean_read_block_op_data)) {
1438         if (!g_ptr_array_union_robust(RAIT_DEVICE(self),
1439                                      ops,
1440                                      extract_boolean_read_block_op_data)) {
1441             success = FALSE;
1442         } else {
1443             success = raid_block_reconstruction(RAIT_DEVICE(self),
1444                                                 ops, buf, (size_t)*size);
1445         }
1446     } else {
1447         success = FALSE;
1448         if (g_ptr_array_union_robust(RAIT_DEVICE(self),
1449                                      ops,
1450                                      extract_boolean_read_block_op_eof)) {
1451             /* We hit EOF. */
1452             dself->is_eof = TRUE;
1453             dself->in_file = FALSE;
1454         } else {
1455             g_fprintf(stderr, _("All child devices failed to read, but not all are at eof"));
1456         }
1457     }
1458
1459     for (i = 0; i < ops->len; i ++) {
1460         ReadBlockOp * op = g_ptr_array_index(ops, i);
1461         amfree(op->buffer);
1462     }
1463     g_ptr_array_free_full(ops);
1464
1465     if (success) {
1466         if (parent_class->read_block)
1467             parent_class->read_block(dself, buf, size);
1468         *size = blocksize;
1469         return blocksize;
1470     } else {
1471         return -1;
1472     }
1473 }
1474
1475 typedef struct {
1476     GenericOp base;
1477     DevicePropertyId id;   /* IN */
1478     GValue value;          /* IN/OUT */
1479     gboolean label_changed; /* Did the device label change? OUT; _set only*/
1480 } PropertyOp;
1481
1482 /* Creates a GPtrArray of PropertyOf for a get or set operation. */
1483 static GPtrArray * make_property_op_array(RaitDevice * self,
1484                                           DevicePropertyId id,
1485                                           GValue * value) {
1486     guint i;
1487     GPtrArray * ops;
1488     ops = g_ptr_array_sized_new(self->private->children->len);
1489     for (i = 0; i < self->private->children->len; i ++) {
1490         PropertyOp * op;
1491         op = malloc(sizeof(*op));
1492         op->base.child = g_ptr_array_index(self->private->children, i);
1493         op->id = id;
1494         bzero(&(op->value), sizeof(op->value));
1495         if (value != NULL) {
1496             g_value_unset_copy(value, &(op->value));
1497         }
1498         g_ptr_array_add(ops, op);
1499     }
1500
1501     return ops;
1502 }
1503
1504 /* A GFunc. */
1505 static void property_get_do_op(gpointer data,
1506                                gpointer user_data G_GNUC_UNUSED) {
1507     PropertyOp * op = data;
1508
1509     bzero(&(op->value), sizeof(op->value));
1510     op->base.result =
1511         GINT_TO_POINTER(device_property_get(op->base.child, op->id,
1512                                             &(op->value)));
1513 }
1514
1515 /* Merge ConcurrencyParadigm results. */
1516 static gboolean property_get_concurrency(GPtrArray * ops, GValue * val) {
1517     ConcurrencyParadigm result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
1518     guint i = 0;
1519     
1520     for (i = 0; i < ops->len; i ++) {
1521         ConcurrencyParadigm cur;
1522         PropertyOp * op = g_ptr_array_index(ops, i);
1523         g_return_val_if_fail(G_VALUE_TYPE(&(op->value)) ==
1524                              CONCURRENCY_PARADIGM_TYPE, FALSE);
1525         cur = g_value_get_enum(&(op->value));
1526         if (result == CONCURRENCY_PARADIGM_EXCLUSIVE ||
1527             cur == CONCURRENCY_PARADIGM_EXCLUSIVE) {
1528             result = CONCURRENCY_PARADIGM_EXCLUSIVE;
1529         } else if (result == CONCURRENCY_PARADIGM_SHARED_READ ||
1530                    cur == CONCURRENCY_PARADIGM_SHARED_READ) {
1531             result = CONCURRENCY_PARADIGM_SHARED_READ;
1532         } else if (result == CONCURRENCY_PARADIGM_RANDOM_ACCESS &&
1533                    cur == CONCURRENCY_PARADIGM_RANDOM_ACCESS) {
1534             result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
1535         } else {
1536             g_return_val_if_fail(FALSE, FALSE);
1537         }
1538     }
1539
1540     g_value_unset_init(val, CONCURRENCY_PARADIGM_TYPE);
1541     g_value_set_enum(val, result);
1542     return TRUE;
1543 }
1544
1545 /* Merge StreamingRequirement results. */
1546 static gboolean property_get_streaming(GPtrArray * ops, GValue * val) {
1547     StreamingRequirement result = STREAMING_REQUIREMENT_NONE;
1548     guint i = 0;
1549     
1550     for (i = 0; i < ops->len; i ++) {
1551         StreamingRequirement cur;
1552         PropertyOp * op = g_ptr_array_index(ops, i);
1553         g_return_val_if_fail(G_VALUE_TYPE(&(op->value)) ==
1554                              STREAMING_REQUIREMENT_TYPE, FALSE);
1555         cur = g_value_get_enum(&(op->value));
1556         if (result == STREAMING_REQUIREMENT_REQUIRED ||
1557             cur == STREAMING_REQUIREMENT_REQUIRED) {
1558             result = STREAMING_REQUIREMENT_REQUIRED;
1559         } else if (result == STREAMING_REQUIREMENT_DESIRED ||
1560                    cur == STREAMING_REQUIREMENT_DESIRED) {
1561             result = STREAMING_REQUIREMENT_DESIRED;
1562         } else if (result == STREAMING_REQUIREMENT_NONE &&
1563                    cur == STREAMING_REQUIREMENT_NONE) {
1564             result = STREAMING_REQUIREMENT_NONE;
1565         } else {
1566             g_return_val_if_fail(FALSE, FALSE);
1567         }
1568     }
1569
1570     g_value_unset_init(val, STREAMING_REQUIREMENT_TYPE);
1571     g_value_set_enum(val, result);
1572     return TRUE;
1573 }
1574     
1575 /* Merge MediaAccessMode results. */
1576 static gboolean property_get_medium_type(GPtrArray * ops, GValue * val) {
1577     MediaAccessMode result = 0;
1578     guint i = 0;
1579
1580     for (i = 0; i < ops->len; i ++) {
1581         MediaAccessMode cur;
1582         PropertyOp * op = g_ptr_array_index(ops, i);
1583         g_return_val_if_fail(G_VALUE_TYPE(&(op->value)) ==
1584                              MEDIA_ACCESS_MODE_TYPE, FALSE);
1585         cur = g_value_get_enum(&(op->value));
1586         
1587         if (i == 0) {
1588             result = cur;
1589         } else if ((result == MEDIA_ACCESS_MODE_READ_ONLY &&
1590                     cur == MEDIA_ACCESS_MODE_WRITE_ONLY) ||
1591                    (result == MEDIA_ACCESS_MODE_WRITE_ONLY &&
1592                     cur == MEDIA_ACCESS_MODE_READ_ONLY)) {
1593             /* Invalid combination; one device can only read, other
1594                can only write. */
1595             return FALSE;
1596         } else if (result == MEDIA_ACCESS_MODE_READ_ONLY ||
1597                    cur == MEDIA_ACCESS_MODE_READ_ONLY) {
1598             result = MEDIA_ACCESS_MODE_READ_ONLY;
1599         } else if (result == MEDIA_ACCESS_MODE_WRITE_ONLY ||
1600                    cur == MEDIA_ACCESS_MODE_WRITE_ONLY) {
1601             result = MEDIA_ACCESS_MODE_WRITE_ONLY;
1602         } else if (result == MEDIA_ACCESS_MODE_WORM ||
1603                    cur == MEDIA_ACCESS_MODE_WORM) {
1604             result = MEDIA_ACCESS_MODE_WORM;
1605         } else if (result == MEDIA_ACCESS_MODE_READ_WRITE &&
1606                    cur == MEDIA_ACCESS_MODE_READ_WRITE) {
1607             result = MEDIA_ACCESS_MODE_READ_WRITE;
1608         } else {
1609             g_return_val_if_fail(FALSE, FALSE);
1610         }
1611     }
1612     
1613     g_value_unset_init(val, MEDIA_ACCESS_MODE_TYPE);
1614     g_value_set_enum(val, result);
1615     return TRUE;
1616 }
1617     
1618 /* Merge QualifiedSize results. */
1619 static gboolean property_get_free_space(GPtrArray * ops, GValue * val) {
1620     QualifiedSize result;
1621     guint i = 0;
1622
1623     for (i = 0; i < ops->len; i ++) {
1624         QualifiedSize cur;
1625         PropertyOp * op = g_ptr_array_index(ops, i);
1626         g_return_val_if_fail(G_VALUE_TYPE(&(op->value)) ==
1627                              QUALIFIED_SIZE_TYPE, FALSE);
1628         cur = *(QualifiedSize*)(g_value_get_boxed(&(op->value)));
1629
1630         if (result.accuracy != cur.accuracy) {
1631             result.accuracy = SIZE_ACCURACY_ESTIMATE;
1632         }
1633
1634         if (result.accuracy == SIZE_ACCURACY_UNKNOWN &&
1635             cur.accuracy != SIZE_ACCURACY_UNKNOWN) {
1636             result.bytes = cur.bytes;
1637         } else if (result.accuracy != SIZE_ACCURACY_UNKNOWN &&
1638                    cur.accuracy == SIZE_ACCURACY_UNKNOWN) {
1639             /* result.bytes unchanged. */
1640         } else {
1641             result.bytes = MIN(result.bytes, cur.bytes);
1642         }
1643     }
1644
1645     g_value_unset_init(val, QUALIFIED_SIZE_TYPE);
1646     g_value_set_boxed(val, &result);
1647     return TRUE;
1648 }
1649     
1650 /* Merge boolean results by ANDing them together. */
1651 static gboolean property_get_boolean_and(GPtrArray * ops, GValue * val) {
1652     gboolean result = FALSE;
1653     guint i = 0;
1654
1655     for (i = 0; i < ops->len; i ++) {
1656         gboolean cur;
1657         PropertyOp * op = g_ptr_array_index(ops, i);
1658         g_return_val_if_fail(G_VALUE_HOLDS_BOOLEAN(&(op->value)), FALSE);
1659         cur = g_value_get_boolean(&(op->value));
1660
1661         result = result && cur;
1662     }
1663
1664     g_value_unset_init(val, G_TYPE_BOOLEAN);
1665     g_value_set_boolean(val, result);
1666     return TRUE;
1667 }
1668     
1669
1670 static gboolean 
1671 rait_device_property_get (Device * dself, DevicePropertyId id, GValue * val) {
1672     GPtrArray * ops;
1673     guint i;
1674     gboolean success;
1675     GValue result;
1676     GValue * first_value;
1677     RaitDevice * self = RAIT_DEVICE(dself);
1678     g_return_val_if_fail(self != NULL, FALSE);
1679
1680     /* Some properties are handled completely differently. */
1681     if (id == PROPERTY_BLOCK_SIZE) {
1682         g_value_unset_init(val, G_TYPE_INT);
1683         g_value_set_int(val, self->private->block_size);
1684         return TRUE;
1685     } else if (id == PROPERTY_MIN_BLOCK_SIZE ||
1686         id == PROPERTY_MAX_BLOCK_SIZE) {
1687         g_value_unset_init(val, G_TYPE_UINT);
1688         g_value_set_uint(val, self->private->block_size);
1689         return TRUE;
1690     } else if (id == PROPERTY_CANONICAL_NAME) {
1691         if (parent_class->property_get != NULL) {
1692             return parent_class->property_get(dself, id, val);
1693         } else {
1694             return FALSE;
1695         }
1696     }
1697
1698     ops = make_property_op_array(self, id, NULL);
1699     
1700     do_rait_child_ops(property_get_do_op, ops, NULL);
1701
1702     if (id == PROPERTY_CONCURRENCY) {
1703         success = property_get_concurrency(ops, val);
1704     } else if (id == PROPERTY_STREAMING) { 
1705         success = property_get_streaming(ops, val);
1706     } else if (id == PROPERTY_APPENDABLE ||
1707                id == PROPERTY_PARTIAL_DELETION) {
1708         success = property_get_boolean_and(ops, val);
1709     } else if (id == PROPERTY_MEDIUM_TYPE) {
1710         success = property_get_medium_type(ops, val);
1711     } else if (id == PROPERTY_FREE_SPACE) {
1712         success = property_get_free_space(ops, val);
1713     } else {
1714         /* Generic handling; if all results are the same, we succeed
1715            and return that result. If not, we fail. */
1716         success = TRUE;
1717         
1718         /* Set up comparison value. */
1719         bzero(&result, sizeof(result));
1720         first_value = &(((PropertyOp*)g_ptr_array_index(ops,0))->value);
1721         if (G_IS_VALUE(first_value)) {
1722             g_value_unset_copy(first_value, &result);
1723         } else {
1724             success = FALSE;
1725         }
1726         
1727         for (i = 0; i < ops->len; i ++) {
1728             PropertyOp * op = g_ptr_array_index(ops, i);
1729             if (!GPOINTER_TO_INT(op->base.result) ||
1730                 !G_IS_VALUE(first_value) ||
1731                 !g_value_compare(&result, &(op->value))) {
1732                 success = FALSE;
1733             }
1734             /* free the GValue if the child call succeeded */
1735             if (GPOINTER_TO_INT(op->base.result))
1736                 g_value_unset(&(op->value));
1737         }
1738
1739         if (success) {
1740             memcpy(val, &result, sizeof(result));
1741         } else if (G_IS_VALUE(&result)) {
1742             g_value_unset(&result);
1743         }
1744     }
1745
1746     g_ptr_array_free_full(ops);
1747
1748     return success;
1749 }
1750
1751 /* A GFunc. */
1752 static void property_set_do_op(gpointer data,
1753                                gpointer user_data G_GNUC_UNUSED) {
1754     PropertyOp * op = data;
1755     gboolean label_set = (op->base.child->volume_label != NULL);
1756     op->base.result =
1757         GINT_TO_POINTER(device_property_set(op->base.child, op->id,
1758                                             &(op->value)));
1759     op->label_changed = (label_set != (op->base.child->volume_label != NULL));
1760 }
1761
1762 /* A BooleanExtractor */
1763 static gboolean extract_label_changed_property_op(gpointer data) {
1764     PropertyOp * op = data;
1765     return op->label_changed;
1766 }
1767
1768 /* A GFunc. */
1769 static void clear_volume_details_do_op(gpointer data,
1770                                        gpointer user_data G_GNUC_UNUSED) {
1771     GenericOp * op = data;
1772     device_clear_volume_details(op->child);
1773 }
1774
1775 static gboolean 
1776 rait_device_property_set (Device * d_self, DevicePropertyId id, GValue * val) {
1777     RaitDevice * self;
1778     GPtrArray * ops;
1779     gboolean success;
1780     gboolean label_changed;
1781
1782     self = RAIT_DEVICE(d_self);
1783     g_return_val_if_fail(self != NULL, FALSE);
1784
1785     /* it doesn't make sense to hand these properties down to our child devices,
1786      * so we'll just pretend we set them.  This is a 2.6.0 hack -- the device gets
1787      * this right in 2.6.1.  */
1788     if (id == PROPERTY_BLOCK_SIZE
1789         || id == PROPERTY_MIN_BLOCK_SIZE
1790         || id == PROPERTY_MAX_BLOCK_SIZE) {
1791         return TRUE; /* lies! */
1792     }
1793
1794     ops = make_property_op_array(self, id, val);
1795     
1796     do_rait_child_ops(property_set_do_op, ops, NULL);
1797
1798     success = g_ptr_array_union_robust(self, ops, extract_boolean_generic_op);
1799     label_changed = g_ptr_array_or(ops, extract_label_changed_property_op);
1800     g_ptr_array_free_full(ops);
1801
1802     if (label_changed) {
1803         /* At least one device considered this property set a label-changing
1804          * operation, so now we clear labels on all devices. */
1805         ops = make_generic_boolean_op_array(self);
1806         do_rait_child_ops(clear_volume_details_do_op, ops, NULL);
1807         g_ptr_array_free_full(ops);
1808     }
1809
1810     return success;
1811 }
1812
1813 typedef struct {
1814     GenericOp base;
1815     guint filenum;
1816 } RecycleFileOp;
1817
1818 /* A GFunc */
1819 static void recycle_file_do_op(gpointer data,
1820                                gpointer user_data G_GNUC_UNUSED) {
1821     RecycleFileOp * op = data;
1822     op->base.result =
1823         GINT_TO_POINTER(device_recycle_file(op->base.child, op->filenum));
1824 }
1825
1826 static gboolean 
1827 rait_device_recycle_file (Device * dself, guint filenum) {
1828     GPtrArray * ops;
1829     guint i;
1830     gboolean success;
1831
1832     RaitDevice * self = RAIT_DEVICE(dself);
1833     g_return_val_if_fail(self != NULL, FALSE);
1834
1835     ops = g_ptr_array_sized_new(self->private->children->len);
1836     for (i = 0; i < self->private->children->len; i ++) {
1837         RecycleFileOp * op;
1838         op = malloc(sizeof(*op));
1839         op->base.child = g_ptr_array_index(self->private->children, i);
1840         op->filenum = filenum;
1841         g_ptr_array_add(ops, op);
1842     }
1843     
1844     do_rait_child_ops(recycle_file_do_op, ops, NULL);
1845
1846     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1847
1848     g_ptr_array_free_full(ops);
1849
1850     if (!success) {
1851         return FALSE;
1852     } else if (parent_class->recycle_file) {
1853         return parent_class->recycle_file(dself, filenum);
1854     } else {
1855         return TRUE;
1856     }
1857 }
1858
1859 /* GFunc */
1860 static void finish_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1861     GenericOp * op = data;
1862     op->result = GINT_TO_POINTER(device_finish(op->child));
1863 }
1864
1865 static gboolean 
1866 rait_device_finish (Device * self) {
1867     GPtrArray * ops;
1868     gboolean success;
1869
1870     ops = make_generic_boolean_op_array(RAIT_DEVICE(self));
1871     
1872     do_rait_child_ops(finish_do_op, ops, NULL);
1873
1874     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1875
1876     g_ptr_array_free_full(ops);
1877
1878     if (!success) {
1879         return FALSE;
1880     } else if (parent_class->finish) {
1881         return parent_class->finish(self);
1882     } else {
1883         return TRUE;
1884     }
1885 }
1886
1887 Device *
1888 rait_device_factory (char * type, char * name) {
1889     Device * rval;
1890     g_assert(0 == strcmp(type, "rait"));
1891     rval = DEVICE(g_object_new(TYPE_RAIT_DEVICE, NULL));
1892     if (!device_open_device(rval, name)) {
1893         g_object_unref(rval);
1894         return NULL;
1895     } else {
1896         return rval;
1897     }
1898 }
1899
1900 Device * rait_device_new_from_devices(Device ** devices) {
1901     RaitDevice * rval;
1902     int i;
1903     gboolean success = TRUE;
1904
1905     g_return_val_if_fail(devices != NULL && *devices != NULL, NULL);
1906
1907     rval = RAIT_DEVICE(g_object_new(TYPE_RAIT_DEVICE, NULL));
1908
1909     for (i = 0; devices[i] != NULL; i ++) {
1910         g_assert(IS_DEVICE(devices[i]));
1911         if (devices[i]->access_mode != ACCESS_NULL) {
1912             success = FALSE;
1913             break;
1914         }
1915         g_object_ref(devices[i]);
1916         g_ptr_array_add(PRIVATE(rval)->children, devices[i]);
1917     }
1918
1919     success = success && find_block_size(rval);
1920
1921     if (!success) {
1922         g_ptr_array_free(PRIVATE(rval)->children, TRUE);
1923         return NULL;
1924     } else {
1925         register_properties(rval);
1926
1927         return DEVICE(rval);
1928     }
1929 }
1930
1931 void 
1932 rait_device_register (void) {
1933     static const char * device_prefix_list[] = {"rait", NULL};
1934     register_device(rait_device_factory, device_prefix_list);
1935 }