b38bf8ded93b25302e96e3f8bfaf50e55f57938e
[debian/amanda] / device-src / rait-device.c
1 /*
2  * Copyright (c) 2005 Zmanda, Inc.  All Rights Reserved.
3  * 
4  * This library is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU Lesser General Public License version 2.1 as 
6  * published by the Free Software Foundation.
7  * 
8  * This library is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
11  * License for more details.
12  * 
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this library; if not, write to the Free Software Foundation,
15  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
16  * 
17  * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 /* The RAIT device encapsulates some number of other devices into a single
22  * redundant device. */
23
24 #include "rait-device.h"
25 #include <amanda.h>
26 #include "property.h"
27 #include <util.h>
28
29 typedef enum {
30     RAIT_STATUS_COMPLETE, /* All subdevices OK. */
31     RAIT_STATUS_DEGRADED, /* One subdevice failed. */
32     RAIT_STATUS_FAILED    /* Two or more subdevices failed. */
33 } RaitStatus;
34
35 struct RaitDevicePrivate_s {
36     GPtrArray * children;
37     /* These flags are only relevant for reading. */
38     RaitStatus status;
39     /* If status == RAIT_STATUS_DEGRADED, this holds the index of the
40        failed node. It holds a negative number otherwise. */
41     int failed;
42     guint block_size;
43 };
44
45 #define PRIVATE(o) (o->private)
46
47 /* here are local prototypes */
48 static void rait_device_init (RaitDevice * o);
49 static void rait_device_class_init (RaitDeviceClass * c);
50 static gboolean rait_device_open_device (Device * self, char * device_name);
51 static gboolean rait_device_start (Device * self, DeviceAccessMode mode,
52                                    char * label, char * timestamp);
53 static gboolean rait_device_start_file(Device * self, const dumpfile_t * info);
54 static gboolean rait_device_write_block (Device * self, guint size,
55                                          gpointer data, gboolean last_block);
56 static gboolean rait_device_finish_file (Device * self);
57 static dumpfile_t * rait_device_seek_file (Device * self, guint file);
58 static gboolean rait_device_seek_block (Device * self, guint64 block);
59 static int      rait_device_read_block (Device * self, gpointer buf,
60                                         int * size);
61 static gboolean rait_device_property_get (Device * self, DevicePropertyId id,
62                                           GValue * val);
63 static gboolean rait_device_property_set (Device * self, DevicePropertyId id,
64                                           GValue * val);
65 static gboolean rait_device_recycle_file (Device * self, guint filenum);
66 static gboolean rait_device_finish (Device * self);
67 static ReadLabelStatusFlags rait_device_read_label(Device * dself);
68 static void find_simple_params(RaitDevice * self, guint * num_children,
69                                guint * data_children, int * blocksize);
70
71 /* pointer to the class of our parent */
72 static DeviceClass *parent_class = NULL;
73
74 /* This function is replicated here in case we have GLib from before 2.4.
75  * It should probably go eventually. */
76 #if !GLIB_CHECK_VERSION(2,4,0)
77 static void
78 g_ptr_array_foreach (GPtrArray *array,
79                      GFunc      func,
80                      gpointer   user_data)
81 {
82   guint i;
83
84   g_return_if_fail (array);
85
86   for (i = 0; i < array->len; i++)
87     (*func) (array->pdata[i], user_data);
88 }
89 #endif
90
91 GType
92 rait_device_get_type (void)
93 {
94     static GType type = 0;
95
96     if G_UNLIKELY(type == 0) {
97         static const GTypeInfo info = {
98             sizeof (RaitDeviceClass),
99             (GBaseInitFunc) NULL,
100             (GBaseFinalizeFunc) NULL,
101             (GClassInitFunc) rait_device_class_init,
102             (GClassFinalizeFunc) NULL,
103             NULL /* class_data */,
104             sizeof (RaitDevice),
105             0 /* n_preallocs */,
106             (GInstanceInitFunc) rait_device_init,
107             NULL
108         };
109         
110         type = g_type_register_static (TYPE_DEVICE, "RaitDevice", &info,
111                                        (GTypeFlags)0);
112         }
113     
114     return type;
115 }
116
117 static void g_object_unref_foreach(gpointer data,
118                                    gpointer user_data G_GNUC_UNUSED) {
119     g_return_if_fail(G_IS_OBJECT(data));
120     g_object_unref(data);
121 }
122
123 static void
124 rait_device_finalize(GObject *obj_self)
125 {
126     RaitDevice *self G_GNUC_UNUSED = RAIT_DEVICE (obj_self);
127     if(G_OBJECT_CLASS(parent_class)->finalize) \
128            (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
129     if(self->private->children) {
130         g_ptr_array_foreach(self->private->children,
131                             g_object_unref_foreach, NULL);
132         g_ptr_array_free (self->private->children, TRUE);
133         self->private->children = NULL;
134     }
135     amfree(self->private);
136 }
137
138 static void 
139 rait_device_init (RaitDevice * o G_GNUC_UNUSED)
140 {
141     PRIVATE(o) = malloc(sizeof(RaitDevicePrivate));
142     PRIVATE(o)->children = g_ptr_array_new();
143     PRIVATE(o)->status = RAIT_STATUS_COMPLETE;
144     PRIVATE(o)->failed = -1;
145 }
146
147 static void 
148 rait_device_class_init (RaitDeviceClass * c G_GNUC_UNUSED)
149 {
150     GObjectClass *g_object_class G_GNUC_UNUSED = (GObjectClass*) c;
151     DeviceClass *device_class = (DeviceClass *)c;
152
153     parent_class = g_type_class_ref (TYPE_DEVICE);
154
155     device_class->open_device = rait_device_open_device;
156     device_class->start = rait_device_start;
157     device_class->start_file = rait_device_start_file;
158     device_class->write_block = rait_device_write_block;
159     device_class->finish_file = rait_device_finish_file;
160     device_class->seek_file = rait_device_seek_file;
161     device_class->seek_block = rait_device_seek_block;
162     device_class->read_block = rait_device_read_block;
163     device_class->property_get = rait_device_property_get;
164     device_class->property_set = rait_device_property_set;
165     device_class->recycle_file = rait_device_recycle_file; 
166     device_class->finish = rait_device_finish;
167     device_class->read_label = rait_device_read_label;
168
169     g_object_class->finalize = rait_device_finalize;
170
171     g_thread_pool_set_max_unused_threads(-1);
172 }
173
174 /* This function does something a little clever and a little
175  * complicated. It takes an array of operations and runs the given
176  * function on each element in the array. The trick is that it runs them
177  * all in parallel, in different threads. This is more efficient than it
178  * sounds because we use a GThreadPool, which means calling this function
179  * will probably not start any new threads at all, but rather use
180  * existing ones. The func is called with two gpointer arguments: The
181  * first from the array, the second is the data argument.
182  * 
183  * When it returns, all the operations have been successfully
184  * executed. If you want results from your operations, do it yourself
185  * through the array. */
186 static void do_thread_pool_op(GFunc func, GPtrArray * ops, gpointer data) {
187     GThreadPool * pool;
188     guint i;
189
190     pool = g_thread_pool_new(func, data, -1, FALSE, NULL);
191     for (i = 0; i < ops->len; i ++) {
192         g_thread_pool_push(pool, g_ptr_array_index(ops, i), NULL);
193     }
194
195     g_thread_pool_free(pool, FALSE, TRUE);
196 }
197
198 /* This does the above, in a serial fashion (and without using threads) */
199 static void do_unthreaded_ops(GFunc func, GPtrArray * ops,
200                               gpointer data G_GNUC_UNUSED) {
201     guint i;
202
203     for (i = 0; i < ops->len; i ++) {
204         func(g_ptr_array_index(ops, i), NULL);
205     }
206 }
207
208 /* This is the one that code below should call. It switches
209    automatically between do_thread_pool_op and do_unthreaded_ops,
210    depending on g_thread_supported(). */
211 static void do_rait_child_ops(GFunc func, GPtrArray * ops, gpointer data) {
212     if (g_thread_supported()) {
213         do_thread_pool_op(func, ops, data);
214     } else {
215         do_unthreaded_ops(func, ops, data);
216     }
217 }
218
219 /* Take a text string user_name, and break it out into an argv-style
220    array of strings. For example, {foo,{bar,baz},bat} would return the
221    strings "foo", "{bar,baz}", "bat", and NULL. Returns NULL on
222    error. */
223 static char ** parse_device_name(char * user_name) {
224     GPtrArray * rval;
225     char * cur_end = user_name;
226     char * cur_begin = user_name;
227     
228     rval = g_ptr_array_new();
229     
230     /* Check opening brace. */
231     if (*cur_begin != '{')
232         return NULL;
233     cur_begin ++;
234     
235     cur_end = cur_begin;
236     for (;;) {
237         switch (*cur_end) {
238         case ',': {
239             g_ptr_array_add(rval, g_strndup(cur_begin, cur_end - cur_begin));
240             cur_end ++;
241             cur_begin = cur_end;
242             continue;
243         }
244
245         case '{':
246             /* We read until the matching closing brace. */
247             while (*cur_end != '}' && *cur_end != '\0')
248                 cur_end ++;
249             if (*cur_end == '}')
250                 cur_end ++;
251             continue;
252             
253         case '}':
254             g_ptr_array_add(rval, g_strndup(cur_begin, cur_end - cur_begin));
255             goto OUTER_END; /* break loop, not switch */
256
257         case '\0':
258             /* Unexpected NULL; abort. */
259             g_fprintf(stderr, "Invalid RAIT device name %s\n", user_name);
260             g_ptr_array_free_full(rval);
261             return NULL;
262
263         default:
264             cur_end ++;
265             continue;
266         }
267         g_assert_not_reached();
268     }
269  OUTER_END:
270     
271     if (cur_end[1] != '\0') {
272         g_fprintf(stderr, "Invalid RAIT device name %s\n", user_name);
273         g_ptr_array_free_full(rval);
274         return NULL;
275     }
276
277     g_ptr_array_add(rval, NULL);
278
279     return (char**) g_ptr_array_free(rval, FALSE);
280 }
281
282 /* Find a workable block size. */
283 static gboolean find_block_size(RaitDevice * self) {
284     uint min = 0;
285     uint max = G_MAXUINT;
286     uint result;
287     GValue val;
288     gboolean rval;
289     guint i;
290     guint data_children;
291     
292     for (i = 0; i < self->private->children->len; i ++) {
293         uint child_min, child_max;
294         GValue property_result;
295         bzero(&property_result, sizeof(property_result));
296
297         if (!device_property_get(g_ptr_array_index(self->private->children, i),
298                                  PROPERTY_MIN_BLOCK_SIZE, &property_result))
299             return FALSE;
300         child_min = g_value_get_uint(&property_result);
301         g_return_val_if_fail(child_min > 0, FALSE);
302         if (!device_property_get(g_ptr_array_index(self->private->children, i),
303                                  PROPERTY_MAX_BLOCK_SIZE, &property_result))
304             return FALSE;
305         child_max = g_value_get_uint(&property_result);
306         g_return_val_if_fail(child_max > 0, FALSE);
307         
308         if (child_min > max || child_max < min || child_min == 0) {
309             return FALSE;
310         } else {
311             min = MAX(min, child_min);
312             max = MIN(max, child_max);
313         }
314     }
315
316     /* Now pick a number. */
317     g_assert(min <= max);
318     if (max < MAX_TAPE_BLOCK_BYTES)
319         result = max;
320     else if (min > MAX_TAPE_BLOCK_BYTES)
321         result = min;
322     else
323         result = MAX_TAPE_BLOCK_BYTES;
324
325     /* User reads and writes bigger blocks. */
326     find_simple_params(self, NULL, &data_children, NULL);
327     self->private->block_size = result * data_children;
328
329     bzero(&val, sizeof(val));
330     g_value_init(&val, G_TYPE_INT);
331     g_value_set_int(&val, result);
332     /* We can't do device_property_set because it's disallowed
333        according to the registered property base. */
334     rval = rait_device_property_set(DEVICE(self), PROPERTY_BLOCK_SIZE, &val);
335     g_value_unset(&val);
336     return rval;
337 }
338
339 /* Register properties that belong to the RAIT device proper, and not
340    to subdevices. */
341 static void register_rait_properties(RaitDevice * self) {
342     Device * o = DEVICE(self);
343     DeviceProperty prop;
344
345     prop.access = PROPERTY_ACCESS_GET_MASK;
346
347     prop.base = &device_property_min_block_size;
348     device_add_property(o, &prop, NULL);
349
350     prop.base = &device_property_max_block_size;
351     device_add_property(o, &prop, NULL);
352   
353     prop.base = &device_property_block_size;
354     device_add_property(o, &prop, NULL);
355
356     prop.base = &device_property_canonical_name;
357     device_add_property(o, &prop, NULL);
358 }
359
360 static void property_hash_union(GHashTable * properties,
361                                 DeviceProperty * prop) {
362     PropertyAccessFlags before, after;
363     gpointer tmp;
364     gboolean found;
365     
366     found = g_hash_table_lookup_extended(properties,
367                                          GUINT_TO_POINTER(prop->base->ID),
368                                          NULL, &tmp);
369     before = GPOINTER_TO_UINT(tmp);
370     
371     if (!found) {
372         after = prop->access;
373     } else {
374         after = before & prop->access;
375     }
376     
377     g_hash_table_insert(properties, GUINT_TO_POINTER(prop->base->ID),
378                         GUINT_TO_POINTER(after));
379 }
380
381 /* A GHRFunc. */
382 static gboolean zero_value(gpointer key G_GNUC_UNUSED, gpointer value,
383                            gpointer user_data G_GNUC_UNUSED) {
384     return (0 == GPOINTER_TO_UINT(value));
385 }
386
387 /* A GHFunc */
388 static void register_property_hash(gpointer key, gpointer value,
389                                    gpointer user_data) {
390     DevicePropertyId id = GPOINTER_TO_UINT(key);
391     DeviceProperty prop;
392     Device * device = (Device*)user_data;
393
394     g_assert(IS_DEVICE(device));
395
396     prop.access = GPOINTER_TO_UINT(value);
397     prop.base = device_property_get_by_id(id);
398
399     device_add_property(device, &prop, NULL);
400 }
401
402 /* This function figures out which properties exist for all children, and 
403  * exports the unioned access mask. */
404 static void register_properties(RaitDevice * self) {
405     GHashTable * properties; /* PropertyID => PropertyAccessFlags */
406     guint j;
407     
408     properties = g_hash_table_new(g_direct_hash, g_direct_equal);
409
410     /* Iterate the device list, find all properties. */
411     for (j = 0; j < self->private->children->len; j ++) {
412         int i;
413         Device * child = g_ptr_array_index(self->private->children, j);
414         const DeviceProperty* device_property_list;
415
416         device_property_list = device_property_get_list(child);
417         for (i = 0; device_property_list[i].base != NULL; i ++) {
418             property_hash_union(properties, (gpointer)&(device_property_list[i]));
419         }
420     }
421
422     /* Then toss properties that can't be accessed. */
423     g_hash_table_foreach_remove(properties, zero_value, NULL);
424     g_hash_table_remove(properties, GINT_TO_POINTER(PROPERTY_BLOCK_SIZE));
425     g_hash_table_remove(properties, GINT_TO_POINTER(PROPERTY_MIN_BLOCK_SIZE));
426     g_hash_table_remove(properties, GINT_TO_POINTER(PROPERTY_MAX_BLOCK_SIZE));
427     g_hash_table_remove(properties, GINT_TO_POINTER(PROPERTY_CANONICAL_NAME));
428
429     /* Finally, register the lot. */
430     g_hash_table_foreach(properties, register_property_hash, self);
431
432     g_hash_table_destroy(properties);
433
434     /* Then we have some of our own properties to register. */
435     register_rait_properties(self);
436 }
437
438 /* This structure contains common fields for many operations. Not all
439    operations use all fields, however. */
440 typedef struct {
441     gpointer result; /* May be a pointer; may be an integer or boolean
442                         stored with GINT_TO_POINTER. */
443     Device * child;  /* The device in question. Used by all
444                         operations. */
445     guint child_index; /* For recoverable operations (read-related
446                           operations), this field provides the number
447                           of this child in the self->private->children
448                           array. */
449 } GenericOp;
450
451 typedef gboolean (*BooleanExtractor)(gpointer data);
452
453 /* A BooleanExtractor */
454 static gboolean extract_boolean_generic_op(gpointer data) {
455     GenericOp * op = data;
456     return GPOINTER_TO_INT(op->result);
457 }
458
459 /* A BooleanExtractor */
460 static gboolean extract_boolean_pointer_op(gpointer data) {
461     GenericOp * op = data;
462     return op->result != NULL;
463 }
464
465 /* Does the equivalent of this perl command:
466      ! (first { !extractor($_) } @_
467    That is, calls extractor on each element of the array, and returns
468    TRUE if and only if all calls to extractor return TRUE.
469 */
470 static gboolean g_ptr_array_and(GPtrArray * array,
471                                 BooleanExtractor extractor) {
472     guint i;
473     if (array == NULL || array->len <= 0)
474         return FALSE;
475
476     for (i = 0; i < array->len; i ++) {
477         if (!extractor(g_ptr_array_index(array, i)))
478             return FALSE;
479     }
480
481     return TRUE;
482 }
483
484 /* Takes a RaitDevice, and makes a GPtrArray of GenericOp. */
485 static GPtrArray * make_generic_boolean_op_array(RaitDevice* self) {
486     GPtrArray * rval;
487     guint i;
488
489     rval = g_ptr_array_sized_new(self->private->children->len);
490     for (i = 0; i < self->private->children->len; i ++) {
491         GenericOp * op;
492         op = malloc(sizeof(*op));
493         op->child = g_ptr_array_index(self->private->children, i);
494         op->child_index = i;
495         g_ptr_array_add(rval, op);
496     }
497
498     return rval;
499 }
500
501 /* Takes a GPtrArray of GenericOp, and a BooleanExtractor, and does
502    all the proper handling for the result of operations that allow
503    device isolation. Returns FALSE only if an unrecoverable error
504    occured. */
505 static gboolean g_ptr_array_union_robust(RaitDevice * self, GPtrArray * ops,
506                                          BooleanExtractor extractor) {
507     gboolean success;
508     gpointer isolated_op = NULL;
509
510     for (;;) {
511         success = g_ptr_array_and(ops, extractor);
512         
513         if (success || self->private->status != RAIT_STATUS_COMPLETE) {
514             break;
515         } else {
516             guint i;
517             /* First device failure, note the device and march on. */
518             self->private->status = RAIT_STATUS_DEGRADED;
519             for (i = 0; i < ops->len; i ++) {
520                 GenericOp * op = g_ptr_array_index(ops, i);
521                 if (!(op->result)) {
522                     isolated_op = g_ptr_array_remove_index_fast(ops, i);
523                     self->private->failed = op->child_index;
524                     g_fprintf(stderr, "RAIT array %s Isolated device %s.\n",
525                             DEVICE(self)->device_name,
526                             op->child->device_name);
527                     break;
528                 }
529             }
530         }
531     }
532
533     /* Return isolated op so any data members can be freed. */
534     if (isolated_op != NULL) {
535         g_ptr_array_add(ops, isolated_op);
536     }
537     return success;
538 }
539
540 typedef struct {
541     Device * result;    /* IN */
542     char * device_name; /* OUT */
543 } OpenDeviceOp;
544
545 /* A GFunc. */
546 static void open_device_do_op(gpointer data,
547                               gpointer user_data G_GNUC_UNUSED) {
548     OpenDeviceOp * op = data;
549
550     op->result = device_open(op->device_name);
551     amfree(op->device_name);
552 }
553
554 /* Returns TRUE if and only if the volume label and time are equal. */
555 static gboolean compare_volume_results(Device * a, Device * b) {
556     if (a->volume_time != b->volume_time)
557         return FALSE;
558     if (a->volume_label == NULL && b->volume_label == NULL)
559         return TRUE;
560     if (a->volume_label == NULL || b->volume_label == NULL)
561         return FALSE;
562     return 0 == strcmp(a->volume_label, b->volume_label);
563 }
564
565 static gboolean 
566 rait_device_open_device (Device * dself, char * device_name) {
567     char ** device_names;
568     GPtrArray * open_device_ops;
569     guint i;
570     gboolean failure;
571     RaitDevice * self;
572
573     self = RAIT_DEVICE(dself);
574     g_return_val_if_fail(self != NULL, FALSE);
575     g_return_val_if_fail (device_name != NULL, FALSE);
576
577     device_names = parse_device_name(device_name);
578     
579     if (device_names == NULL)
580         return FALSE;
581
582     /* Open devices in a separate thread, in case they have to rewind etc. */
583     open_device_ops = g_ptr_array_new();
584
585     for (i = 0; device_names[i] != NULL; i ++) {
586         OpenDeviceOp *op;
587
588         op = malloc(sizeof(*op));
589         op->device_name = device_names[i];
590         op->result = NULL;
591         g_ptr_array_add(open_device_ops, op);
592     }
593
594     free(device_names);
595     do_rait_child_ops(open_device_do_op, open_device_ops, NULL);
596
597     failure = FALSE;
598     /* Check results of opening devices. */
599     for (i = 0; i < open_device_ops->len; i ++) {
600         OpenDeviceOp *op = g_ptr_array_index(open_device_ops, i);
601         
602         if (op->result != NULL) {
603             g_ptr_array_add(self->private->children, op->result);
604         } else {
605             failure = TRUE;
606         }
607     }
608
609     g_ptr_array_free_full(open_device_ops);
610
611     failure = failure || !find_block_size(self);
612     if (failure)
613         return FALSE; /* This will clean up any created children. */
614
615     register_properties(self);
616
617     /* Chain up. */
618     if (parent_class->open_device) {
619         return parent_class->open_device(dself, device_name);
620     } else {
621         return TRUE;
622     }
623 }
624
625 /* A GFunc. */
626 static void read_label_do_op(gpointer data,
627                              gpointer user_data G_GNUC_UNUSED) {
628     GenericOp * op = data;
629     op->result = GINT_TO_POINTER(device_read_label(op->child));
630 }
631
632 static ReadLabelStatusFlags rait_device_read_label(Device * dself) {
633     RaitDevice * self;
634     GPtrArray * ops;
635     ReadLabelStatusFlags failed_result = 0;
636     ReadLabelStatusFlags rval;
637     GenericOp * failed_op = NULL; /* If this is non-null, we will isolate. */
638     unsigned int i;
639     Device * first_success = NULL;
640
641     self = RAIT_DEVICE(dself);
642     g_return_val_if_fail(self != NULL, FALSE);
643
644     amfree(dself->volume_label);
645
646     ops = make_generic_boolean_op_array(self);
647     
648     do_rait_child_ops(read_label_do_op, ops, NULL);
649     
650     for (i = 0; i < ops->len; i ++) {
651         GenericOp * op = g_ptr_array_index(ops, i);
652         ReadLabelStatusFlags result = GPOINTER_TO_INT(op->result);
653         if (op->result == READ_LABEL_STATUS_SUCCESS) {
654             if (first_success == NULL) {
655                 /* This is the first successful device. */
656                 first_success = op->child;
657             } else if (!compare_volume_results(first_success, op->child)) {
658                 /* Doesn't match. :-( */
659                 g_fprintf(stderr, "Inconsistant volume labels: "
660                         "Got %s/%s against %s/%s.\n",
661                         first_success->volume_label,
662                         first_success->volume_time, 
663                         op->child->volume_label,
664                         op->child->volume_time);
665                 failed_result |= READ_LABEL_STATUS_VOLUME_ERROR;
666                 failed_op = NULL;
667             }
668         } else {
669             if (failed_result == 0 &&
670                 self->private->status == RAIT_STATUS_COMPLETE) {
671                 /* This is the first failed device; note it and we'll isolate
672                    later. */
673                 failed_op = op;
674                 failed_result = result;
675             } else {
676                 /* We've encountered multiple failures. OR them together. */
677                 failed_result |= result;
678                 failed_op = NULL;
679             }
680         }
681     }
682
683     if (failed_op != NULL) {
684         /* We have a single device to isolate. */
685         failed_result = READ_LABEL_STATUS_SUCCESS; /* Recover later */
686         self->private->failed = failed_op->child_index;
687         g_fprintf(stderr, "RAIT array %s Isolated device %s.\n",
688                 dself->device_name,
689                 failed_op->child->device_name);
690     }
691
692     if (failed_result != READ_LABEL_STATUS_SUCCESS) {
693         /* We had multiple failures or an inconsistency. */
694         rval = failed_result;
695     } else {
696         /* Everything peachy. */
697         rval = READ_LABEL_STATUS_SUCCESS;
698         g_assert(first_success != NULL);
699         if (first_success->volume_label != NULL) {
700             dself->volume_label = g_strdup(first_success->volume_label);
701         }
702         if (first_success->volume_time != NULL) {
703             dself->volume_time = g_strdup(first_success->volume_time);
704         }
705     }
706     
707     g_ptr_array_free_full(ops);
708
709     return rval;
710 }
711
712 typedef struct {
713     GenericOp base;
714     DeviceAccessMode mode; /* IN */
715     char * label;          /* IN */
716     char * timestamp;      /* IN */
717 } StartOp;
718
719 /* A GFunc. */
720 static void start_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
721     DeviceClass *klass;
722     StartOp * param = data;
723     
724     klass = DEVICE_GET_CLASS(param->base.child);
725     if (klass->start) {
726         param->base.result =
727             GINT_TO_POINTER((klass->start)(param->base.child,
728                                             param->mode, param->label,
729                                             param->timestamp));
730     } else {
731         param->base.result = FALSE;
732     }
733 }
734
735 static gboolean 
736 rait_device_start (Device * dself, DeviceAccessMode mode, char * label,
737                    char * timestamp) {
738     GPtrArray * ops;
739     guint i;
740     gboolean success;
741     RaitDevice * self;
742
743     self = RAIT_DEVICE(dself);
744     g_return_val_if_fail(self != NULL, FALSE);
745
746     ops = g_ptr_array_sized_new(self->private->children->len);
747     for (i = 0; i < self->private->children->len; i ++) {
748         StartOp * op;
749         op = malloc(sizeof(*op));
750         op->base.child = g_ptr_array_index(self->private->children, i);
751         op->mode = mode;
752         op->label = g_strdup(label);
753         op->timestamp = g_strdup(timestamp);
754         g_ptr_array_add(ops, op);
755     }
756     
757     do_rait_child_ops(start_do_op, ops, NULL);
758
759     success = g_ptr_array_and(ops, extract_boolean_generic_op);
760
761     g_ptr_array_free_full(ops);
762
763     if (!success) {
764         return FALSE;
765     } else if (parent_class->start) {
766         return parent_class->start(dself, mode, label, timestamp);
767     } else {
768         return TRUE;
769     }
770 }
771
772 typedef struct {
773     GenericOp base;
774     const dumpfile_t * info; /* IN */
775 } StartFileOp;
776
777 /* a GFunc */
778 static void start_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
779     StartFileOp * op = data;
780     op->base.result = GINT_TO_POINTER(device_start_file(op->base.child,
781                                                         op->info));
782 }
783
784 static gboolean
785 rait_device_start_file (Device * dself, const dumpfile_t * info) {
786     GPtrArray * ops;
787     guint i;
788     gboolean success;
789     RaitDevice * self;
790
791     self = RAIT_DEVICE(dself);
792     g_return_val_if_fail(self != NULL, FALSE);
793
794     ops = g_ptr_array_sized_new(self->private->children->len);
795     for (i = 0; i < self->private->children->len; i ++) {
796         StartFileOp * op;
797         op = malloc(sizeof(*op));
798         op->base.child = g_ptr_array_index(self->private->children, i);
799         op->info = info;
800         g_ptr_array_add(ops, op);
801     }
802     
803     do_rait_child_ops(start_file_do_op, ops, NULL);
804
805     success = g_ptr_array_and(ops, extract_boolean_generic_op);
806
807     g_ptr_array_free_full(ops);
808
809     if (!success) {
810         return FALSE;
811     } else if (parent_class->start_file) {
812         return parent_class->start_file(dself, info);
813     } else {
814         return TRUE;
815     }
816 }
817
818 static void find_simple_params(RaitDevice * self,
819                                guint * num_children,
820                                guint * data_children,
821                                int * blocksize) {
822     int num, data;
823     
824     num = self->private->children->len;
825     if (num > 1)
826         data = num - 1;
827     else
828         data = num;
829     if (num_children != NULL)
830         *num_children = num;
831     if (data_children != NULL)
832         *data_children = data;
833
834     if (blocksize != NULL) {
835         *blocksize = device_write_min_size(DEVICE(self));
836     }
837 }
838
839 typedef struct {
840     GenericOp base;
841     guint size;           /* IN */
842     gpointer data;        /* IN */
843     gboolean short_block; /* IN */
844     gboolean data_needs_free; /* bookkeeping */
845 } WriteBlockOp;
846
847 /* a GFunc. */
848 static void write_block_do_op(gpointer data,
849                               gpointer user_data G_GNUC_UNUSED) {
850     WriteBlockOp * op = data;
851
852     op->base.result =
853         GINT_TO_POINTER(device_write_block(op->base.child, op->size, op->data,
854                                            op->short_block));
855 }
856
857 /* Parity block generation. Performance of this function can be improved
858    considerably by using larger-sized integers or
859    assembly-coded vector instructions. Parameters are:
860    % data       - All data chunks in series (chunk_size * num_chunks bytes)
861    % parity     - Allocated space for parity block (chunk_size bytes)
862  */
863 static void make_parity_block(char * data, char * parity,
864                               guint chunk_size, guint num_chunks) {
865     guint i;
866     bzero(parity, chunk_size);
867     for (i = 0; i < num_chunks - 1; i ++) {
868         guint j;
869         for (j = 0; j < chunk_size; j ++) {
870             parity[j] ^= data[chunk_size*i + j];
871         }
872     }
873 }
874
875 /* Does the same thing as make_parity_block, but instead of using a
876    single memory chunk holding all chunks, it takes a GPtrArray of
877    chunks. */
878 static void make_parity_block_extents(GPtrArray * data, char * parity,
879                                       guint chunk_size) {
880     guint i;
881     bzero(parity, chunk_size);
882     for (i = 0; i < data->len; i ++) {
883         guint j;
884         char * data_chunk;
885         data_chunk = g_ptr_array_index(data, i);
886         for (j = 0; j < chunk_size; j ++) {
887             parity[j] ^= data_chunk[j];
888         }
889     }
890 }
891
892 /* Does the parity creation algorithm. Allocates and returns a single
893    device block from a larger RAIT block. chunks and chunk are 1-indexed. */
894 static char * extract_data_block(char * data, guint size,
895                                  guint chunks, guint chunk) {
896     char * rval;
897     guint chunk_size;
898
899     g_return_val_if_fail(chunks > 0 && chunk > 0 && chunk <= chunks, NULL);
900     g_return_val_if_fail(data != NULL, NULL);
901     g_return_val_if_fail(size > 0 && size % (chunks - 1) == 0, NULL);
902
903     chunk_size = size / (chunks - 1);
904     rval = malloc(chunk_size);
905     if (chunks != chunk) {
906         /* data block. */
907         memcpy(rval, data + chunk_size * (chunk - 1), chunk_size);
908     } else {
909         make_parity_block(data, rval, chunk_size, chunks);
910     }
911     
912     return rval;
913 }
914
915 static gboolean 
916 rait_device_write_block (Device * dself, guint size, gpointer data,
917                          gboolean last_block) {
918     GPtrArray * ops;
919     guint i;
920     gboolean success;
921     guint data_children, num_children;
922     int blocksize;
923     RaitDevice * self;
924
925     self = RAIT_DEVICE(dself);
926     g_return_val_if_fail(self != NULL, FALSE);
927
928     find_simple_params(RAIT_DEVICE(self), &num_children, &data_children,
929                        &blocksize);
930     num_children = self->private->children->len;
931     if (num_children != 1)
932         data_children = num_children - 1;
933     else
934         data_children = num_children;
935     
936     g_return_val_if_fail(size % data_children == 0 || last_block, FALSE);
937
938     if (last_block) {
939         char *new_data;
940
941         new_data = malloc(blocksize);
942         memcpy(new_data, data, size);
943         bzero(new_data + size, blocksize - size);
944
945         data = new_data;
946         size = blocksize;
947     }
948
949     ops = g_ptr_array_sized_new(num_children);
950     for (i = 0; i < self->private->children->len; i ++) {
951         WriteBlockOp * op;
952         op = malloc(sizeof(*op));
953         op->base.child = g_ptr_array_index(self->private->children, i);
954         op->short_block = last_block;
955         op->size = size / data_children;
956         if (num_children <= 2) {
957             op->data = data;
958             op->data_needs_free = FALSE;
959         } else {
960             op->data_needs_free = TRUE;
961             op->data = extract_data_block(data, size, num_children, i + 1);
962         }
963         g_ptr_array_add(ops, op);
964     }
965
966     if (last_block) {
967         amfree(data);
968     }
969     
970     do_rait_child_ops(write_block_do_op, ops, NULL);
971
972     success = g_ptr_array_and(ops, extract_boolean_generic_op);
973
974     for (i = 0; i < self->private->children->len; i ++) {
975         WriteBlockOp * op = g_ptr_array_index(ops, i);
976         if (op->data_needs_free)
977             free(op->data);
978     }
979
980     g_ptr_array_free_full(ops);
981
982     if (!success) {
983         return FALSE;
984     } else {
985         /* We don't chain up here because we must handle finish_file
986            differently. If we were called with last_block, then the
987            children have already called finish_file themselves. So we
988            update the device block numbers manually. */
989         dself->block ++;
990         if (last_block)
991             dself->in_file = FALSE;
992
993         return TRUE;
994     }
995 }
996
997 /* A GFunc */
998 static void finish_file_do_op(gpointer data,
999                               gpointer user_data G_GNUC_UNUSED) {
1000     GenericOp * op = data;
1001     op->result = GINT_TO_POINTER(device_finish_file(op->child));
1002 }
1003
1004 static gboolean 
1005 rait_device_finish_file (Device * self) {
1006     GPtrArray * ops;
1007     gboolean success;
1008
1009     ops = make_generic_boolean_op_array(RAIT_DEVICE(self));
1010     
1011     do_rait_child_ops(finish_file_do_op, ops, NULL);
1012
1013     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1014
1015     g_ptr_array_free_full(ops);
1016
1017     if (!success) {
1018         return FALSE;
1019     } else if (parent_class->finish_file) {
1020         return parent_class->finish_file(self);
1021     } else {
1022         return TRUE;
1023     }
1024 }
1025
1026 typedef struct {
1027     GenericOp base;
1028     guint requested_file;                /* IN */
1029     guint actual_file;                   /* OUT */
1030 } SeekFileOp;
1031
1032 /* a GFunc. */
1033 static void seek_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1034     SeekFileOp * op = data;
1035     op->base.result = device_seek_file(op->base.child, op->requested_file);
1036     op->actual_file = op->base.child->file;
1037 }
1038
1039 static dumpfile_t * 
1040 rait_device_seek_file (Device * dself, guint file) {
1041     GPtrArray * ops;
1042     guint i;
1043     gboolean success;
1044     dumpfile_t * rval;
1045     RaitDevice * self = RAIT_DEVICE(dself);
1046     guint actual_file = 0;
1047     g_return_val_if_fail(self != NULL, FALSE);
1048
1049     ops = g_ptr_array_sized_new(self->private->children->len);
1050     for (i = 0; i < self->private->children->len; i ++) {
1051         SeekFileOp * op;
1052         if ((int)i == self->private->failed)
1053             continue; /* This device is broken. */
1054         op = malloc(sizeof(*op));
1055         op->base.child = g_ptr_array_index(self->private->children, i);
1056         op->base.child_index = i;
1057         op->requested_file = file;
1058         g_ptr_array_add(ops, op);
1059     }
1060     
1061     do_rait_child_ops(seek_file_do_op, ops, NULL);
1062
1063     /* This checks for NULL values, but we still have to check for
1064        consistant headers. */
1065     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1066                                        ops, extract_boolean_pointer_op);
1067
1068     rval = NULL;
1069     for (i = 0; i < self->private->children->len; i ++) {
1070         SeekFileOp * this_op;
1071         dumpfile_t * this_result;
1072         guint this_actual_file;
1073         if ((int)i == self->private->failed)
1074             continue;
1075         
1076         this_op = (SeekFileOp*)g_ptr_array_index(ops, i);
1077         this_result = this_op->base.result;
1078         this_actual_file = this_op->actual_file;
1079
1080         if (rval == NULL) {
1081             rval = this_result;
1082             actual_file = this_actual_file;
1083         } else {
1084             if (headers_are_equal(rval, this_result) &&
1085                 actual_file == this_actual_file) {
1086                 /* Do nothing. */
1087             } else {
1088                 success = FALSE;
1089             }
1090             free(this_result);
1091         }
1092     }
1093
1094     g_ptr_array_free_full(ops);
1095
1096     if (!success) {
1097         amfree(rval);
1098         return NULL;
1099     } else if (parent_class->seek_file) {
1100         parent_class->seek_file(dself, file);
1101     }
1102
1103     return rval;
1104 }
1105
1106 typedef struct {
1107     GenericOp base;
1108     guint64 block; /* IN */
1109 } SeekBlockOp;
1110
1111 /* a GFunc. */
1112 static void seek_block_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1113     SeekBlockOp * op = data;
1114     op->base.result =
1115         GINT_TO_POINTER(device_seek_block(op->base.child, op->block));
1116 }
1117
1118 static gboolean 
1119 rait_device_seek_block (Device * dself, guint64 block) {
1120     GPtrArray * ops;
1121     guint i;
1122     gboolean success;
1123
1124     RaitDevice * self = RAIT_DEVICE(dself);
1125     g_return_val_if_fail(self != NULL, FALSE);
1126
1127     ops = g_ptr_array_sized_new(self->private->children->len);
1128     for (i = 0; i < self->private->children->len; i ++) {
1129         SeekBlockOp * op;
1130         if ((int)i == self->private->failed)
1131             continue; /* This device is broken. */
1132         op = malloc(sizeof(*op));
1133         op->base.child = g_ptr_array_index(self->private->children, i);
1134         op->base.child_index = i;
1135         op->block = block;
1136         g_ptr_array_add(ops, op);
1137     }
1138     
1139     do_rait_child_ops(seek_block_do_op, ops, NULL);
1140
1141     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1142                                        ops, extract_boolean_generic_op);
1143
1144     g_ptr_array_free_full(ops);
1145
1146     if (!success) {
1147         return FALSE;
1148     } else if (parent_class->seek_block) {
1149         return parent_class->seek_block(dself, block);
1150     } else {
1151         return success;
1152     }
1153 }
1154
1155 typedef struct {
1156     GenericOp base;
1157     gpointer buffer; /* IN */
1158     int read_size;      /* IN/OUT -- note not a pointer */
1159     int desired_read_size; /* bookkeeping */
1160 } ReadBlockOp;
1161
1162 /* a GFunc. */
1163 static void read_block_do_op(gpointer data,
1164                              gpointer user_data G_GNUC_UNUSED) {
1165     ReadBlockOp * op = data;
1166     op->base.result =
1167         GINT_TO_POINTER(device_read_block(op->base.child, op->buffer,
1168                                           &(op->read_size)));
1169 }
1170
1171 /* A BooleanExtractor. This one checks for a successful read. */
1172 static gboolean extract_boolean_read_block_op_data(gpointer data) {
1173     ReadBlockOp * op = data;
1174     return GPOINTER_TO_INT(op->base.result) == op->desired_read_size;
1175 }
1176
1177 /* A BooleanExtractor. This one checks for EOF. */
1178 static gboolean extract_boolean_read_block_op_eof(gpointer data) {
1179     ReadBlockOp * op = data;
1180     return op->base.child->is_eof;
1181 }
1182
1183 static int g_ptr_array_count(GPtrArray * array, BooleanExtractor filter) {
1184     int rval;
1185     unsigned int i;
1186     rval = 0;
1187     for (i = 0; i < array->len ; i++) {
1188         if (filter(g_ptr_array_index(array, i)))
1189             rval ++;
1190     }
1191     return rval;
1192 }
1193
1194 static gboolean raid_block_reconstruction(RaitDevice * self, GPtrArray * ops,
1195                                       gpointer buf) {
1196     guint num_children, data_children;
1197     int blocksize, child_blocksize;
1198     guint i;
1199     int parity_child;
1200     gpointer parity_block = NULL;
1201     gboolean success;
1202
1203     success = TRUE;
1204     find_simple_params(self, &num_children, &data_children, &blocksize);
1205     if (num_children > 1)
1206         parity_child = num_children - 1;
1207     else
1208         parity_child = -1;
1209
1210     child_blocksize = blocksize / data_children;
1211
1212     for (i = 0; i < ops->len; i ++) {
1213         ReadBlockOp * op = g_ptr_array_index(ops, i);
1214         if (!extract_boolean_read_block_op_data(op))
1215             continue;
1216         if ((int)(op->base.child_index) == parity_child) {
1217             parity_block = op->buffer;
1218         } else {
1219             memcpy((char *)buf + child_blocksize * op->base.child_index, op->buffer,
1220                    child_blocksize);
1221         }
1222     }
1223
1224     if (self->private->status == RAIT_STATUS_COMPLETE) {
1225         if (num_children >= 2) {
1226             /* Verify the parity block. This code is inefficient but
1227                does the job for the 2-device case, too. */
1228             gpointer constructed_parity;
1229             GPtrArray * data_extents;
1230             
1231             constructed_parity = malloc(child_blocksize);
1232             data_extents = g_ptr_array_sized_new(data_children);
1233             for (i = 0; i < data_children; i ++) {
1234                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1235                 g_assert(extract_boolean_read_block_op_data(op));
1236                 if ((int)op->base.child_index == parity_child)
1237                     continue;
1238                 g_ptr_array_add(data_extents, op->buffer);
1239             }
1240             make_parity_block_extents(data_extents, constructed_parity,
1241                                       child_blocksize);
1242             
1243             if (0 != memcmp(parity_block, constructed_parity,
1244                             child_blocksize)) {
1245                 g_fprintf(stderr, "RAIT is inconsistant: "
1246                         "Parity block did not match data blocks.\n");
1247                 success = FALSE;
1248             }
1249             g_ptr_array_free(data_extents, TRUE);
1250             amfree(constructed_parity);
1251         } else { /* do nothing. */ }
1252     } else if (self->private->status == RAIT_STATUS_DEGRADED) {
1253         /* We are in degraded mode. What's missing? */
1254         if (self->private->failed == parity_child) {
1255             /* do nothing. */
1256         } else if (num_children >= 2) {
1257             /* Reconstruct failed block from parity block. */
1258             GPtrArray * data_extents = g_ptr_array_new();            
1259
1260             for (i = 0; i < data_children; i ++) {
1261                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1262                 if (!extract_boolean_read_block_op_data(op))
1263                     continue;
1264                 g_ptr_array_add(data_extents, op->buffer);
1265             }
1266
1267             /* Conveniently, the reconstruction is the same procedure
1268                as the parity generation. This even works if there is
1269                only one remaining device! */
1270             make_parity_block_extents(data_extents,
1271                                       (char *)buf + (child_blocksize *
1272                                              self->private->failed),
1273                                       child_blocksize);
1274
1275             /* The array members belong to our ops argument. */
1276             g_ptr_array_free(data_extents, TRUE);
1277         } else {
1278             g_assert_not_reached();
1279         }
1280     } else {
1281         success = FALSE;
1282     }
1283     return success;
1284 }
1285
1286 static int
1287 rait_device_read_block (Device * dself, gpointer buf, int * size) {
1288     GPtrArray * ops;
1289     guint i;
1290     gboolean success;
1291     guint num_children, data_children;
1292     int blocksize;
1293
1294     RaitDevice * self = RAIT_DEVICE(dself);
1295     g_return_val_if_fail(self != NULL, -1);
1296
1297     find_simple_params(self, &num_children, &data_children,
1298                        &blocksize);
1299
1300     g_return_val_if_fail(*size >= (int)device_write_min_size(dself), -1);
1301     g_assert(blocksize % data_children == 0); /* If not we are screwed */
1302
1303
1304     ops = g_ptr_array_sized_new(num_children);
1305     for (i = 0; i < num_children; i ++) {
1306         ReadBlockOp * op;
1307         if ((int)i == self->private->failed)
1308             continue; /* This device is broken. */
1309         op = malloc(sizeof(*op));
1310         op->base.child = g_ptr_array_index(self->private->children, i);
1311         op->base.child_index = i;
1312         op->buffer = malloc(blocksize / data_children);
1313         op->desired_read_size = op->read_size = blocksize / data_children;
1314         g_ptr_array_add(ops, op);
1315     }
1316     
1317     do_rait_child_ops(read_block_do_op, ops, NULL);
1318
1319     if (g_ptr_array_count(ops, extract_boolean_read_block_op_data)) {
1320         success =
1321             g_ptr_array_union_robust(RAIT_DEVICE(self),
1322                                      ops,
1323                                      extract_boolean_read_block_op_data) &&
1324             raid_block_reconstruction(RAIT_DEVICE(self),
1325                                       ops, buf);
1326     } else {
1327         success = FALSE;
1328         if (g_ptr_array_union_robust(RAIT_DEVICE(self),
1329                                      ops,
1330                                      extract_boolean_read_block_op_eof)) {
1331             /* We hit EOF. */
1332             dself->is_eof = TRUE;
1333         }
1334     }
1335
1336     for (i = 0; i < ops->len; i ++) {
1337         ReadBlockOp * op = g_ptr_array_index(ops, i);
1338         amfree(op->buffer);
1339     }
1340     g_ptr_array_free_full(ops);
1341
1342     if (success) {
1343         if (parent_class->read_block)
1344             parent_class->read_block(dself, buf, size);
1345         return blocksize;
1346     } else {
1347         return -1;
1348     }
1349 }
1350
1351 typedef struct {
1352     GenericOp base;
1353     DevicePropertyId id;   /* IN */
1354     GValue value;          /* IN/OUT */
1355     gboolean label_changed; /* Did the device label change? OUT; _set only*/
1356 } PropertyOp;
1357
1358 /* Creates a GPtrArray of PropertyOf for a get or set operation. */
1359 static GPtrArray * make_property_op_array(RaitDevice * self,
1360                                           DevicePropertyId id,
1361                                           GValue * value) {
1362     guint i;
1363     GPtrArray * ops;
1364     ops = g_ptr_array_sized_new(self->private->children->len);
1365     for (i = 0; i < self->private->children->len; i ++) {
1366         PropertyOp * op;
1367         op = malloc(sizeof(*op));
1368         op->base.child = g_ptr_array_index(self->private->children, i);
1369         op->id = id;
1370         bzero(&(op->value), sizeof(op->value));
1371         if (value != NULL) {
1372             g_value_unset_copy(value, &(op->value));
1373         }
1374         g_ptr_array_add(ops, op);
1375     }
1376
1377     return ops;
1378 }
1379
1380 /* A GFunc. */
1381 static void property_get_do_op(gpointer data,
1382                                gpointer user_data G_GNUC_UNUSED) {
1383     PropertyOp * op = data;
1384
1385     bzero(&(op->value), sizeof(op->value));
1386     op->base.result =
1387         GINT_TO_POINTER(device_property_get(op->base.child, op->id,
1388                                             &(op->value)));
1389 }
1390
1391 /* Merge ConcurrencyParadigm results. */
1392 static gboolean property_get_concurrency(GPtrArray * ops, GValue * val) {
1393     ConcurrencyParadigm result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
1394     guint i = 0;
1395     
1396     for (i = 0; i < ops->len; i ++) {
1397         ConcurrencyParadigm cur;
1398         PropertyOp * op = g_ptr_array_index(ops, i);
1399         g_return_val_if_fail(G_VALUE_TYPE(&(op->value)) ==
1400                              CONCURRENCY_PARADIGM_TYPE, FALSE);
1401         cur = g_value_get_enum(&(op->value));
1402         if (result == CONCURRENCY_PARADIGM_EXCLUSIVE ||
1403             cur == CONCURRENCY_PARADIGM_EXCLUSIVE) {
1404             result = CONCURRENCY_PARADIGM_EXCLUSIVE;
1405         } else if (result == CONCURRENCY_PARADIGM_SHARED_READ ||
1406                    cur == CONCURRENCY_PARADIGM_SHARED_READ) {
1407             result = CONCURRENCY_PARADIGM_SHARED_READ;
1408         } else if (result == CONCURRENCY_PARADIGM_RANDOM_ACCESS &&
1409                    cur == CONCURRENCY_PARADIGM_RANDOM_ACCESS) {
1410             result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
1411         } else {
1412             g_return_val_if_fail(FALSE, FALSE);
1413         }
1414     }
1415
1416     g_value_unset_init(val, CONCURRENCY_PARADIGM_TYPE);
1417     g_value_set_enum(val, result);
1418     return TRUE;
1419 }
1420
1421 /* Merge StreamingRequirement results. */
1422 static gboolean property_get_streaming(GPtrArray * ops, GValue * val) {
1423     StreamingRequirement result = STREAMING_REQUIREMENT_NONE;
1424     guint i = 0;
1425     
1426     for (i = 0; i < ops->len; i ++) {
1427         StreamingRequirement cur;
1428         PropertyOp * op = g_ptr_array_index(ops, i);
1429         g_return_val_if_fail(G_VALUE_TYPE(&(op->value)) ==
1430                              STREAMING_REQUIREMENT_TYPE, FALSE);
1431         cur = g_value_get_enum(&(op->value));
1432         if (result == STREAMING_REQUIREMENT_REQUIRED ||
1433             cur == STREAMING_REQUIREMENT_REQUIRED) {
1434             result = STREAMING_REQUIREMENT_REQUIRED;
1435         } else if (result == STREAMING_REQUIREMENT_DESIRED ||
1436                    cur == STREAMING_REQUIREMENT_DESIRED) {
1437             result = STREAMING_REQUIREMENT_DESIRED;
1438         } else if (result == STREAMING_REQUIREMENT_NONE &&
1439                    cur == STREAMING_REQUIREMENT_NONE) {
1440             result = STREAMING_REQUIREMENT_NONE;
1441         } else {
1442             g_return_val_if_fail(FALSE, FALSE);
1443         }
1444     }
1445
1446     g_value_unset_init(val, STREAMING_REQUIREMENT_TYPE);
1447     g_value_set_enum(val, result);
1448     return TRUE;
1449 }
1450     
1451 /* Merge MediaAccessMode results. */
1452 static gboolean property_get_medium_type(GPtrArray * ops, GValue * val) {
1453     MediaAccessMode result = 0;
1454     guint i = 0;
1455
1456     for (i = 0; i < ops->len; i ++) {
1457         MediaAccessMode cur;
1458         PropertyOp * op = g_ptr_array_index(ops, i);
1459         g_return_val_if_fail(G_VALUE_TYPE(&(op->value)) ==
1460                              MEDIA_ACCESS_MODE_TYPE, FALSE);
1461         cur = g_value_get_enum(&(op->value));
1462         
1463         if (i == 0) {
1464             result = cur;
1465         } else if ((result == MEDIA_ACCESS_MODE_READ_ONLY &&
1466                     cur == MEDIA_ACCESS_MODE_WRITE_ONLY) ||
1467                    (result == MEDIA_ACCESS_MODE_WRITE_ONLY &&
1468                     cur == MEDIA_ACCESS_MODE_READ_ONLY)) {
1469             /* Invalid combination; one device can only read, other
1470                can only write. */
1471             return FALSE;
1472         } else if (result == MEDIA_ACCESS_MODE_READ_ONLY ||
1473                    cur == MEDIA_ACCESS_MODE_READ_ONLY) {
1474             result = MEDIA_ACCESS_MODE_READ_ONLY;
1475         } else if (result == MEDIA_ACCESS_MODE_WRITE_ONLY ||
1476                    cur == MEDIA_ACCESS_MODE_WRITE_ONLY) {
1477             result = MEDIA_ACCESS_MODE_WRITE_ONLY;
1478         } else if (result == MEDIA_ACCESS_MODE_WORM ||
1479                    cur == MEDIA_ACCESS_MODE_WORM) {
1480             result = MEDIA_ACCESS_MODE_WORM;
1481         } else if (result == MEDIA_ACCESS_MODE_READ_WRITE &&
1482                    cur == MEDIA_ACCESS_MODE_READ_WRITE) {
1483             result = MEDIA_ACCESS_MODE_READ_WRITE;
1484         } else {
1485             g_return_val_if_fail(FALSE, FALSE);
1486         }
1487     }
1488     
1489     g_value_unset_init(val, MEDIA_ACCESS_MODE_TYPE);
1490     g_value_set_enum(val, result);
1491     return TRUE;
1492 }
1493     
1494 /* Merge QualifiedSize results. */
1495 static gboolean property_get_free_space(GPtrArray * ops, GValue * val) {
1496     QualifiedSize result;
1497     guint i = 0;
1498
1499     for (i = 0; i < ops->len; i ++) {
1500         QualifiedSize cur;
1501         PropertyOp * op = g_ptr_array_index(ops, i);
1502         g_return_val_if_fail(G_VALUE_TYPE(&(op->value)) ==
1503                              QUALIFIED_SIZE_TYPE, FALSE);
1504         cur = *(QualifiedSize*)(g_value_get_boxed(&(op->value)));
1505
1506         if (result.accuracy != cur.accuracy) {
1507             result.accuracy = SIZE_ACCURACY_ESTIMATE;
1508         }
1509
1510         if (result.accuracy == SIZE_ACCURACY_UNKNOWN &&
1511             cur.accuracy != SIZE_ACCURACY_UNKNOWN) {
1512             result.bytes = cur.bytes;
1513         } else if (result.accuracy != SIZE_ACCURACY_UNKNOWN &&
1514                    cur.accuracy == SIZE_ACCURACY_UNKNOWN) {
1515             /* result.bytes unchanged. */
1516         } else {
1517             result.bytes = MIN(result.bytes, cur.bytes);
1518         }
1519     }
1520
1521     g_value_unset_init(val, QUALIFIED_SIZE_TYPE);
1522     g_value_set_boxed(val, &result);
1523     return TRUE;
1524 }
1525     
1526 /* Merge boolean results by ANDing them together. */
1527 static gboolean property_get_boolean_and(GPtrArray * ops, GValue * val) {
1528     gboolean result = FALSE;
1529     guint i = 0;
1530
1531     for (i = 0; i < ops->len; i ++) {
1532         gboolean cur;
1533         PropertyOp * op = g_ptr_array_index(ops, i);
1534         g_return_val_if_fail(G_VALUE_HOLDS_BOOLEAN(&(op->value)), FALSE);
1535         cur = g_value_get_boolean(&(op->value));
1536
1537         result = result && cur;
1538     }
1539
1540     g_value_unset_init(val, G_TYPE_BOOLEAN);
1541     g_value_set_boolean(val, result);
1542     return TRUE;
1543 }
1544     
1545
1546 static gboolean 
1547 rait_device_property_get (Device * dself, DevicePropertyId id, GValue * val) {
1548     GPtrArray * ops;
1549     guint i;
1550     gboolean success;
1551     GValue result;
1552     GValue * first_value;
1553     RaitDevice * self = RAIT_DEVICE(dself);
1554     g_return_val_if_fail(self != NULL, FALSE);
1555
1556     /* Some properties are handled completely differently. */
1557     if (id == PROPERTY_BLOCK_SIZE) {
1558         g_value_unset_init(val, G_TYPE_INT);
1559         g_value_set_int(val, self->private->block_size);
1560         return TRUE;
1561     } else if (id == PROPERTY_MIN_BLOCK_SIZE ||
1562         id == PROPERTY_MAX_BLOCK_SIZE) {
1563         g_value_unset_init(val, G_TYPE_UINT);
1564         g_value_set_uint(val, self->private->block_size);
1565         return TRUE;
1566     } else if (id == PROPERTY_CANONICAL_NAME) {
1567         if (parent_class->property_get != NULL) {
1568             return parent_class->property_get(dself, id, val);
1569         } else {
1570             return FALSE;
1571         }
1572     }
1573
1574     ops = make_property_op_array(self, id, NULL);
1575     
1576     do_rait_child_ops(property_get_do_op, ops, NULL);
1577
1578     if (id == PROPERTY_CONCURRENCY) {
1579         success = property_get_concurrency(ops, val);
1580     } else if (id == PROPERTY_STREAMING) { 
1581         success = property_get_streaming(ops, val);
1582     } else if (id == PROPERTY_APPENDABLE ||
1583                id == PROPERTY_PARTIAL_DELETION) {
1584         success = property_get_boolean_and(ops, val);
1585     } else if (id == PROPERTY_MEDIUM_TYPE) {
1586         success = property_get_medium_type(ops, val);
1587     } else if (id == PROPERTY_FREE_SPACE) {
1588         success = property_get_free_space(ops, val);
1589     } else {
1590         /* Generic handling; if all results are the same, we succeed
1591            and return that result. If not, we fail. */
1592         success = TRUE;
1593         
1594         /* Set up comparison value. */
1595         bzero(&result, sizeof(result));
1596         first_value = &(((PropertyOp*)g_ptr_array_index(ops,0))->value);
1597         if (G_IS_VALUE(first_value)) {
1598             g_value_unset_copy(first_value, &result);
1599         } else {
1600             success = FALSE;
1601         }
1602         
1603         for (i = 0; i < ops->len; i ++) {
1604             PropertyOp * op = g_ptr_array_index(ops, i);
1605             if (!GPOINTER_TO_INT(op->base.result) ||
1606                 !G_IS_VALUE(first_value) ||
1607                 !g_value_compare(&result, &(op->value))) {
1608                 success = FALSE;
1609             }
1610             g_value_unset(&(op->value));
1611         }
1612
1613         if (success) {
1614             memcpy(val, &result, sizeof(result));
1615         } else if (G_IS_VALUE(&result)) {
1616             g_value_unset(&result);
1617         }
1618     }
1619
1620     g_ptr_array_free_full(ops);
1621
1622     return success;
1623 }
1624
1625 /* A GFunc. */
1626 static void property_set_do_op(gpointer data,
1627                                gpointer user_data G_GNUC_UNUSED) {
1628     PropertyOp * op = data;
1629     gboolean label_set = (op->base.child->volume_label != NULL);
1630     op->base.result =
1631         GINT_TO_POINTER(device_property_set(op->base.child, op->id,
1632                                             &(op->value)));
1633     op->label_changed = (label_set != (op->base.child->volume_label != NULL));
1634 }
1635
1636 /* A BooleanExtractor */
1637 static gboolean extract_label_changed_property_op(gpointer data) {
1638     PropertyOp * op = data;
1639     return op->label_changed;
1640 }
1641
1642 /* A GFunc. */
1643 static void clear_volume_details_do_op(gpointer data,
1644                                        gpointer user_data G_GNUC_UNUSED) {
1645     GenericOp * op = data;
1646     device_clear_volume_details(op->child);
1647 }
1648
1649 static gboolean 
1650 rait_device_property_set (Device * d_self, DevicePropertyId id, GValue * val) {
1651     RaitDevice * self;
1652     GPtrArray * ops;
1653     gboolean success;
1654     gboolean label_changed;
1655
1656     self = RAIT_DEVICE(d_self);
1657     g_return_val_if_fail(self != NULL, FALSE);
1658
1659     ops = make_property_op_array(self, id, val);
1660     
1661     do_rait_child_ops(property_set_do_op, ops, NULL);
1662
1663     success = g_ptr_array_union_robust(self, ops, extract_boolean_generic_op);
1664     label_changed =
1665         g_ptr_array_union_robust(self, ops,
1666                                  extract_label_changed_property_op);
1667     g_ptr_array_free_full(ops);
1668
1669     if (label_changed) {
1670         /* At least one device considered this property set a label-changing
1671          * operation, so now we clear labels on all devices. */
1672         ops = make_generic_boolean_op_array(self);
1673         do_rait_child_ops(clear_volume_details_do_op, ops, NULL);
1674         g_ptr_array_free_full(ops);
1675     }
1676
1677     return success;
1678 }
1679
1680 typedef struct {
1681     GenericOp base;
1682     guint filenum;
1683 } RecycleFileOp;
1684
1685 /* A GFunc */
1686 static void recycle_file_do_op(gpointer data,
1687                                gpointer user_data G_GNUC_UNUSED) {
1688     RecycleFileOp * op = data;
1689     op->base.result =
1690         GINT_TO_POINTER(device_recycle_file(op->base.child, op->filenum));
1691 }
1692
1693 static gboolean 
1694 rait_device_recycle_file (Device * dself, guint filenum) {
1695     GPtrArray * ops;
1696     guint i;
1697     gboolean success;
1698
1699     RaitDevice * self = RAIT_DEVICE(dself);
1700     g_return_val_if_fail(self != NULL, FALSE);
1701
1702     ops = g_ptr_array_sized_new(self->private->children->len);
1703     for (i = 0; i < self->private->children->len; i ++) {
1704         RecycleFileOp * op;
1705         op = malloc(sizeof(*op));
1706         op->base.child = g_ptr_array_index(self->private->children, i);
1707         op->filenum = filenum;
1708         g_ptr_array_add(ops, op);
1709     }
1710     
1711     do_rait_child_ops(recycle_file_do_op, ops, NULL);
1712
1713     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1714
1715     g_ptr_array_free_full(ops);
1716
1717     if (!success) {
1718         return FALSE;
1719     } else if (parent_class->recycle_file) {
1720         return parent_class->recycle_file(dself, filenum);
1721     } else {
1722         return TRUE;
1723     }
1724 }
1725
1726 /* GFunc */
1727 static void finish_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1728     GenericOp * op = data;
1729     op->result = GINT_TO_POINTER(device_finish(op->child));
1730 }
1731
1732 static gboolean 
1733 rait_device_finish (Device * self) {
1734     GPtrArray * ops;
1735     gboolean success;
1736
1737     ops = make_generic_boolean_op_array(RAIT_DEVICE(self));
1738     
1739     do_rait_child_ops(finish_do_op, ops, NULL);
1740
1741     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1742
1743     g_ptr_array_free_full(ops);
1744
1745     if (!success) {
1746         return FALSE;
1747     } else if (parent_class->finish) {
1748         return parent_class->finish(self);
1749     } else {
1750         return TRUE;
1751     }
1752 }
1753
1754 Device *
1755 rait_device_factory (char * type, char * name) {
1756     Device * rval;
1757     g_assert(0 == strcmp(type, "rait"));
1758     rval = DEVICE(g_object_new(TYPE_RAIT_DEVICE, NULL));
1759     if (!device_open_device(rval, name)) {
1760         g_object_unref(rval);
1761         return NULL;
1762     } else {
1763         return rval;
1764     }
1765 }
1766
1767 Device * rait_device_new_from_devices(Device ** devices) {
1768     RaitDevice * rval;
1769     int i;
1770     gboolean success = TRUE;
1771
1772     g_return_val_if_fail(devices != NULL && *devices != NULL, NULL);
1773
1774     rval = RAIT_DEVICE(g_object_new(TYPE_RAIT_DEVICE, NULL));
1775
1776     for (i = 0; devices[i] != NULL; i ++) {
1777         g_assert(IS_DEVICE(devices[i]));
1778         if (devices[i]->access_mode != ACCESS_NULL) {
1779             success = FALSE;
1780             break;
1781         }
1782         g_object_ref(devices[i]);
1783         g_ptr_array_add(PRIVATE(rval)->children, devices[i]);
1784     }
1785
1786     success = success && find_block_size(rval);
1787
1788     if (!success) {
1789         g_ptr_array_free(PRIVATE(rval)->children, TRUE);
1790         return NULL;
1791     } else {
1792         register_properties(rval);
1793
1794         return DEVICE(rval);
1795     }
1796 }
1797
1798 void 
1799 rait_device_register (void) {
1800     static const char * device_prefix_list[] = {"rait", NULL};
1801     register_device(rait_device_factory, device_prefix_list);
1802 }