Imported Upstream version 3.3.1
[debian/amanda] / device-src / rait-device.c
1 /*
2  * Copyright (c) 2007, 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  *
17  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 /* The RAIT device encapsulates some number of other devices into a single
22  * redundant device. */
23
24 #include "amanda.h"
25 #include "property.h"
26 #include "util.h"
27 #include <glib.h>
28 #include "glib-util.h"
29 #include "device.h"
30 #include "fileheader.h"
31 #include "amsemaphore.h"
32
33 /* Just a note about the failure mode of different operations:
34    - Recovers from a failure (enters degraded mode)
35      open_device()
36      seek_file() -- explodes if headers don't match.
37      seek_block() -- explodes if headers don't match.
38      read_block() -- explodes if data doesn't match.
39
40    - Operates in degraded mode (but dies if a new problem shows up)
41      read_label() -- but dies on label mismatch.
42      start() -- but dies when writing in degraded mode.
43      property functions
44      finish()
45
46    - Dies in degraded mode (even if remaining devices are OK)
47      start_file()
48      write_block()
49      finish_file()
50      recycle_file()
51 */
52
53 /*
54  * Type checking and casting macros
55  */
56 #define TYPE_RAIT_DEVICE        (rait_device_get_type())
57 #define RAIT_DEVICE(obj)        G_TYPE_CHECK_INSTANCE_CAST((obj), rait_device_get_type(), RaitDevice)
58 #define RAIT_DEVICE_CONST(obj)  G_TYPE_CHECK_INSTANCE_CAST((obj), rait_device_get_type(), RaitDevice const)
59 #define RAIT_DEVICE_CLASS(klass)        G_TYPE_CHECK_CLASS_CAST((klass), rait_device_get_type(), RaitDeviceClass)
60 #define IS_RAIT_DEVICE(obj)     G_TYPE_CHECK_INSTANCE_TYPE((obj), rait_device_get_type ())
61
62 #define RAIT_DEVICE_GET_CLASS(obj)      G_TYPE_INSTANCE_GET_CLASS((obj), rait_device_get_type(), RaitDeviceClass)
63 static GType    rait_device_get_type    (void);
64
65 /*
66  * Main object structure
67  */
68 typedef struct RaitDevice_s {
69     Device __parent__;
70
71     struct RaitDevicePrivate_s * private;
72 } RaitDevice;
73
74 /*
75  * Class definition
76  */
77 typedef struct _RaitDeviceClass RaitDeviceClass;
78 struct _RaitDeviceClass {
79     DeviceClass __parent__;
80 };
81
82 typedef enum {
83     RAIT_STATUS_COMPLETE, /* All subdevices OK. */
84     RAIT_STATUS_DEGRADED, /* One subdevice failed. */
85     RAIT_STATUS_FAILED    /* Two or more subdevices failed. */
86 } RaitStatus;
87
88 /* Older versions of glib have a deadlock in their thread pool implementations,
89  * so we include a simple thread-pool implementation here to replace it.
90  *
91  * This implementation assumes that threads are used for paralellizing a single
92  * operation, so all threads run a function to completion before the main thread
93  * continues.  This simplifies some of the locking semantics, and in particular
94  * there is no need to wait for stray threads to finish an operation when
95  * finalizing the RaitDevice object or when beginning a new operation.
96  */
97 #if !(GLIB_CHECK_VERSION(2,10,0))
98 #define USE_INTERNAL_THREADPOOL
99 #endif
100
101 typedef struct RaitDevicePrivate_s {
102     GPtrArray * children;
103     /* These flags are only relevant for reading. */
104     RaitStatus status;
105     /* If status == RAIT_STATUS_DEGRADED, this holds the index of the
106        failed node. It holds a negative number otherwise. */
107     int failed;
108
109     /* the child block size */
110     gsize child_block_size;
111
112 #ifdef USE_INTERNAL_THREADPOOL
113     /* array of ThreadInfo for performing parallel operations */
114     GArray *threads;
115
116     /* value of this semaphore is the number of threaded operations
117      * in progress */
118     amsemaphore_t *threads_sem;
119 #endif
120 } RaitDevicePrivate;
121
122 #ifdef USE_INTERNAL_THREADPOOL
123 typedef struct ThreadInfo {
124     GThread *thread;
125
126     /* struct fields below are protected by this mutex and condition variable */
127     GMutex *mutex;
128     GCond *cond;
129
130     gboolean die;
131     GFunc func;
132     gpointer data;
133
134     /* give threads access to active_threads and its mutex/cond */
135     struct RaitDevicePrivate_s *private;
136 } ThreadInfo;
137 #endif
138
139 /* This device uses a special sentinel node to indicate that the child devices
140  * will be set later (in rait_device_open).  It contains a control character to
141  * make it difficult to enter accidentally in an Amanda config. */
142 #define DEFER_CHILDREN_SENTINEL "DEFER\1"
143
144 #define PRIVATE(o) (o->private)
145
146 #define rait_device_in_error(dev) \
147     (device_in_error((dev)) || PRIVATE(RAIT_DEVICE((dev)))->status == RAIT_STATUS_FAILED)
148
149 void rait_device_register (void);
150
151 /* here are local prototypes */
152 static void rait_device_init (RaitDevice * o);
153 static void rait_device_class_init (RaitDeviceClass * c);
154 static void rait_device_base_init (RaitDeviceClass * c);
155 static void rait_device_open_device (Device * self, char * device_name, char * device_type, char * device_node);
156 static gboolean rait_device_start (Device * self, DeviceAccessMode mode,
157                                    char * label, char * timestamp);
158 static gboolean rait_device_configure(Device * self, gboolean use_global_config);
159 static gboolean rait_device_start_file(Device * self, dumpfile_t * info);
160 static gboolean rait_device_write_block (Device * self, guint size, gpointer data);
161 static gboolean rait_device_finish_file (Device * self);
162 static dumpfile_t * rait_device_seek_file (Device * self, guint file);
163 static gboolean rait_device_seek_block (Device * self, guint64 block);
164 static int      rait_device_read_block (Device * self, gpointer buf,
165                                         int * size);
166 static gboolean rait_device_recycle_file (Device * self, guint filenum);
167 static gboolean rait_device_finish (Device * self);
168 static DeviceStatusFlags rait_device_read_label(Device * dself);
169 static void find_simple_params(RaitDevice * self, guint * num_children,
170                                guint * data_children);
171
172 /* property handlers */
173
174 static gboolean property_get_block_size_fn(Device *self,
175     DevicePropertyBase *base, GValue *val,
176     PropertySurety *surety, PropertySource *source);
177
178 static gboolean property_set_block_size_fn(Device *self,
179     DevicePropertyBase *base, GValue *val,
180     PropertySurety surety, PropertySource source);
181
182 static gboolean property_get_canonical_name_fn(Device *self,
183     DevicePropertyBase *base, GValue *val,
184     PropertySurety *surety, PropertySource *source);
185
186 static gboolean property_get_concurrency_fn(Device *self,
187     DevicePropertyBase *base, GValue *val,
188     PropertySurety *surety, PropertySource *source);
189
190 static gboolean property_get_streaming_fn(Device *self,
191     DevicePropertyBase *base, GValue *val,
192     PropertySurety *surety, PropertySource *source);
193
194 static gboolean property_get_boolean_and_fn(Device *self,
195     DevicePropertyBase *base, GValue *val,
196     PropertySurety *surety, PropertySource *source);
197
198 static gboolean property_get_medium_access_type_fn(Device *self,
199     DevicePropertyBase *base, GValue *val,
200     PropertySurety *surety, PropertySource *source);
201
202 static gboolean property_get_max_volume_usage_fn(Device *self,
203     DevicePropertyBase *base, GValue *val,
204     PropertySurety *surety, PropertySource *source);
205
206 static gboolean property_set_max_volume_usage_fn(Device *self,
207     DevicePropertyBase *base, GValue *val,
208     PropertySurety surety, PropertySource source);
209
210
211 /* pointer to the class of our parent */
212 static DeviceClass *parent_class = NULL;
213
214 static GType
215 rait_device_get_type (void)
216 {
217     static GType type = 0;
218
219     if G_UNLIKELY(type == 0) {
220         static const GTypeInfo info = {
221             sizeof (RaitDeviceClass),
222             (GBaseInitFunc) rait_device_base_init,
223             (GBaseFinalizeFunc) NULL,
224             (GClassInitFunc) rait_device_class_init,
225             (GClassFinalizeFunc) NULL,
226             NULL /* class_data */,
227             sizeof (RaitDevice),
228             0 /* n_preallocs */,
229             (GInstanceInitFunc) rait_device_init,
230             NULL
231         };
232
233         type = g_type_register_static (TYPE_DEVICE, "RaitDevice", &info,
234                                        (GTypeFlags)0);
235         }
236
237     return type;
238 }
239
240 static void g_object_unref_foreach(gpointer data,
241                                    gpointer user_data G_GNUC_UNUSED) {
242     if (data != NULL && G_IS_OBJECT(data)) {
243         g_object_unref(data);
244     }
245 }
246
247 static void
248 rait_device_finalize(GObject *obj_self)
249 {
250     RaitDevice *self = RAIT_DEVICE (obj_self);
251     if(G_OBJECT_CLASS(parent_class)->finalize) \
252            (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
253     if(self->private->children) {
254         g_ptr_array_foreach(self->private->children,
255                             g_object_unref_foreach, NULL);
256         g_ptr_array_free (self->private->children, TRUE);
257         self->private->children = NULL;
258     }
259 #ifdef USE_INTERNAL_THREADPOOL
260     g_assert(PRIVATE(self)->threads_sem == NULL || PRIVATE(self)->threads_sem->value == 0);
261
262     if (PRIVATE(self)->threads) {
263         guint i;
264
265         for (i = 0; i < PRIVATE(self)->threads->len; i++) {
266             ThreadInfo *inf = &g_array_index(PRIVATE(self)->threads, ThreadInfo, i);
267             if (inf->thread) {
268                 /* NOTE: the thread is waiting on this condition right now, not
269                  * executing an operation. */
270
271                 /* ask the thread to die */
272                 g_mutex_lock(inf->mutex);
273                 inf->die = TRUE;
274                 g_cond_signal(inf->cond);
275                 g_mutex_unlock(inf->mutex);
276
277                 /* and wait for it to die, which should happen soon */
278                 g_thread_join(inf->thread);
279             }
280
281             if (inf->mutex)
282                 g_mutex_free(inf->mutex);
283             if (inf->cond)
284                 g_cond_free(inf->cond);
285         }
286     }
287
288     if (PRIVATE(self)->threads_sem)
289         amsemaphore_free(PRIVATE(self)->threads_sem);
290 #endif
291     amfree(self->private);
292 }
293
294 static void
295 rait_device_init (RaitDevice * o G_GNUC_UNUSED)
296 {
297     PRIVATE(o) = g_new(RaitDevicePrivate, 1);
298     PRIVATE(o)->children = g_ptr_array_new();
299     PRIVATE(o)->status = RAIT_STATUS_COMPLETE;
300     PRIVATE(o)->failed = -1;
301 #ifdef USE_INTERNAL_THREADPOOL
302     PRIVATE(o)->threads = NULL;
303     PRIVATE(o)->threads_sem = NULL;
304 #endif
305 }
306
307 static void
308 rait_device_class_init (RaitDeviceClass * c)
309 {
310     GObjectClass *g_object_class = (GObjectClass*) c;
311     DeviceClass *device_class = (DeviceClass *)c;
312
313     parent_class = g_type_class_ref (TYPE_DEVICE);
314
315     device_class->open_device = rait_device_open_device;
316     device_class->configure = rait_device_configure;
317     device_class->start = rait_device_start;
318     device_class->start_file = rait_device_start_file;
319     device_class->write_block = rait_device_write_block;
320     device_class->finish_file = rait_device_finish_file;
321     device_class->seek_file = rait_device_seek_file;
322     device_class->seek_block = rait_device_seek_block;
323     device_class->read_block = rait_device_read_block;
324     device_class->recycle_file = rait_device_recycle_file;
325     device_class->finish = rait_device_finish;
326     device_class->read_label = rait_device_read_label;
327
328     g_object_class->finalize = rait_device_finalize;
329
330 #ifndef USE_INTERNAL_THREADPOOL
331 #if !GLIB_CHECK_VERSION(2,10,2)
332     /* Versions of glib before 2.10.2 crash if
333      * g_thread_pool_set_max_unused_threads is called before the first
334      * invocation of g_thread_pool_new.  So we make up a thread pool, but don't
335      * start any threads in it, and free it */
336     {
337         GThreadPool *pool = g_thread_pool_new((GFunc)-1, NULL, -1, FALSE, NULL);
338         g_thread_pool_free(pool, TRUE, FALSE);
339     }
340 #endif
341
342     g_thread_pool_set_max_unused_threads(-1);
343 #endif
344 }
345
346 static void
347 rait_device_base_init (RaitDeviceClass * c)
348 {
349     DeviceClass *device_class = (DeviceClass *)c;
350
351     /* the RAIT device overrides most of the standard properties, so that it
352      * can calculate them by querying the same property on the children */
353     device_class_register_property(device_class, PROPERTY_BLOCK_SIZE,
354             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
355             property_get_block_size_fn,
356             property_set_block_size_fn);
357
358     device_class_register_property(device_class, PROPERTY_CANONICAL_NAME,
359             PROPERTY_ACCESS_GET_MASK,
360             property_get_canonical_name_fn, NULL);
361
362     device_class_register_property(device_class, PROPERTY_CONCURRENCY,
363             PROPERTY_ACCESS_GET_MASK,
364             property_get_concurrency_fn, NULL);
365
366     device_class_register_property(device_class, PROPERTY_STREAMING,
367             PROPERTY_ACCESS_GET_MASK,
368             property_get_streaming_fn, NULL);
369
370     device_class_register_property(device_class, PROPERTY_APPENDABLE,
371             PROPERTY_ACCESS_GET_MASK,
372             property_get_boolean_and_fn, NULL);
373
374     device_class_register_property(device_class, PROPERTY_PARTIAL_DELETION,
375             PROPERTY_ACCESS_GET_MASK,
376             property_get_boolean_and_fn, NULL);
377
378     device_class_register_property(device_class, PROPERTY_FULL_DELETION,
379             PROPERTY_ACCESS_GET_MASK,
380             property_get_boolean_and_fn, NULL);
381
382     device_class_register_property(device_class, PROPERTY_LEOM,
383             PROPERTY_ACCESS_GET_MASK,
384             property_get_boolean_and_fn, NULL);
385
386     device_class_register_property(device_class, PROPERTY_MEDIUM_ACCESS_TYPE,
387             PROPERTY_ACCESS_GET_MASK,
388             property_get_medium_access_type_fn, NULL);
389
390     device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
391             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
392             property_get_max_volume_usage_fn,
393             property_set_max_volume_usage_fn);
394 }
395
396 /* This function does something a little clever and a little
397  * complicated. It takes an array of operations and runs the given
398  * function on each element in the array. The trick is that it runs them
399  * all in parallel, in different threads. This is more efficient than it
400  * sounds because we use a GThreadPool, which means calling this function
401  * will probably not start any new threads at all, but rather use
402  * existing ones. The func is called with two gpointer arguments: The
403  * first from the array, the second is the data argument.
404  *
405  * When it returns, all the operations have been successfully
406  * executed. If you want results from your operations, do it yourself
407  * through the array.
408  */
409
410 #ifdef USE_INTERNAL_THREADPOOL
411 static gpointer rait_thread_pool_func(gpointer data) {
412     ThreadInfo *inf = data;
413
414     g_mutex_lock(inf->mutex);
415     while (TRUE) {
416         while (!inf->die && !inf->func)
417             g_cond_wait(inf->cond, inf->mutex);
418
419         if (inf->die)
420             break;
421
422         if (inf->func) {
423             /* invoke the function */
424             inf->func(inf->data, NULL);
425             inf->func = NULL;
426             inf->data = NULL;
427
428             /* indicate that we're finished; will not block */
429             amsemaphore_down(inf->private->threads_sem);
430         }
431     }
432     g_mutex_unlock(inf->mutex);
433     return NULL;
434 }
435
436 static void do_thread_pool_op(RaitDevice *self, GFunc func, GPtrArray * ops) {
437     guint i;
438
439     if (PRIVATE(self)->threads_sem == NULL)
440         PRIVATE(self)->threads_sem = amsemaphore_new_with_value(0);
441
442     if (PRIVATE(self)->threads == NULL)
443         PRIVATE(self)->threads = g_array_sized_new(FALSE, TRUE,
444                                             sizeof(ThreadInfo), ops->len);
445
446     g_assert(PRIVATE(self)->threads_sem->value == 0);
447
448     if (PRIVATE(self)->threads->len < ops->len)
449         g_array_set_size(PRIVATE(self)->threads, ops->len);
450
451     /* the semaphore will hit zero when each thread has decremented it */
452     amsemaphore_force_set(PRIVATE(self)->threads_sem, ops->len);
453
454     for (i = 0; i < ops->len; i++) {
455         ThreadInfo *inf = &g_array_index(PRIVATE(self)->threads, ThreadInfo, i);
456         if (!inf->thread) {
457             inf->mutex = g_mutex_new();
458             inf->cond = g_cond_new();
459             inf->private = PRIVATE(self);
460             inf->thread = g_thread_create(rait_thread_pool_func, inf, TRUE, NULL);
461         }
462
463         /* set up the info the thread needs and trigger it to start */
464         g_mutex_lock(inf->mutex);
465         inf->data = g_ptr_array_index(ops, i);
466         inf->func = func;
467         g_cond_signal(inf->cond);
468         g_mutex_unlock(inf->mutex);
469     }
470
471     /* wait until semaphore hits zero */
472     amsemaphore_wait_empty(PRIVATE(self)->threads_sem);
473 }
474
475 #else /* USE_INTERNAL_THREADPOOL */
476
477 static void do_thread_pool_op(RaitDevice *self G_GNUC_UNUSED, GFunc func, GPtrArray * ops) {
478     GThreadPool * pool;
479     guint i;
480
481     pool = g_thread_pool_new(func, NULL, -1, FALSE, NULL);
482     for (i = 0; i < ops->len; i ++) {
483         g_thread_pool_push(pool, g_ptr_array_index(ops, i), NULL);
484     }
485
486     g_thread_pool_free(pool, FALSE, TRUE);
487 }
488
489 #endif /* USE_INTERNAL_THREADPOOL */
490
491 /* This does the above, in a serial fashion (and without using threads) */
492 static void do_unthreaded_ops(RaitDevice *self G_GNUC_UNUSED, GFunc func, GPtrArray * ops) {
493     guint i;
494
495     for (i = 0; i < ops->len; i ++) {
496         func(g_ptr_array_index(ops, i), NULL);
497     }
498 }
499
500 /* This is the one that code below should call. It switches
501    automatically between do_thread_pool_op and do_unthreaded_ops,
502    depending on g_thread_supported(). */
503 static void do_rait_child_ops(RaitDevice *self, GFunc func, GPtrArray * ops) {
504     if (g_thread_supported()) {
505         do_thread_pool_op(self, func, ops);
506     } else {
507         do_unthreaded_ops(self, func, ops);
508     }
509 }
510
511 static char *
512 child_device_names_to_rait_name(RaitDevice * self) {
513     GPtrArray *kids;
514     char *braced, *result;
515     guint i;
516
517     kids = g_ptr_array_sized_new(self->private->children->len);
518     for (i = 0; i < self->private->children->len; i ++) {
519         Device *child = g_ptr_array_index(self->private->children, i);
520         const char *child_name = NULL;
521         GValue val;
522         gboolean got_prop = FALSE;
523
524         bzero(&val, sizeof(val));
525
526         if ((signed)i != self->private->failed) {
527             if (device_property_get(child, PROPERTY_CANONICAL_NAME, &val)) {
528                 child_name = g_value_get_string(&val);
529                 got_prop = TRUE;
530             }
531         }
532
533         if (!got_prop)
534             child_name = "MISSING";
535
536         g_ptr_array_add(kids, g_strdup(child_name));
537
538         if (got_prop)
539             g_value_unset(&val);
540     }
541
542     braced = collapse_braced_alternates(kids);
543     result = g_strdup_printf("rait:%s", braced);
544     g_free(braced);
545
546     return result;
547 }
548
549 /* Find a workable child block size, based on the block size ranges of our
550  * child devices.
551  *
552  * The algorithm is to construct the intersection of all child devices'
553  * [min,max] block size ranges, and then pick the block size closest to 32k
554  * that is in the resulting range.  This avoids picking ridiculously small (1
555  * byte) or large (INT_MAX) block sizes when using devices with wide-open block
556  * size ranges.
557
558  * This function returns the calculated child block size directly, and the RAIT
559  * device's blocksize via rait_size, if not NULL.  It is resilient to errors in
560  * a single child device, but sets the device's error status and returns 0 if
561  * it cannot determine an agreeable block size.
562  */
563 static gsize
564 calculate_block_size_from_children(RaitDevice * self, gsize *rait_size)
565 {
566     gsize min = 0;
567     gsize max = SIZE_MAX;
568     gboolean found_one = FALSE;
569     gsize result;
570     guint i;
571
572     for (i = 0; i < self->private->children->len; i ++) {
573         gsize child_min = SIZE_MAX, child_max = 0;
574         Device *child;
575         GValue property_result;
576         PropertySource source;
577
578         bzero(&property_result, sizeof(property_result));
579
580         if ((signed)i == self->private->failed)
581             continue;
582
583         child = g_ptr_array_index(self->private->children, i);
584         if (!device_property_get_ex(child, PROPERTY_BLOCK_SIZE,
585                                  &property_result, NULL, &source)) {
586             g_warning("Error getting BLOCK_SIZE from %s: %s",
587                     child->device_name, device_error_or_status(child));
588             continue;
589         }
590
591         /* if the block size has been set explicitly, then we need to use that blocksize;
592          * otherwise (even if it was DETECTED), override it. */
593         if (source == PROPERTY_SOURCE_USER) {
594             child_min = child_max = g_value_get_int(&property_result);
595         } else {
596             if (!device_property_get(child, PROPERTY_MIN_BLOCK_SIZE,
597                                      &property_result)) {
598                 g_warning("Error getting MIN_BLOCK_SIZE from %s: %s",
599                         child->device_name, device_error_or_status(child));
600                 continue;
601             }
602             child_min = g_value_get_uint(&property_result);
603
604             if (!device_property_get(child, PROPERTY_MAX_BLOCK_SIZE,
605                                      &property_result)) {
606                 g_warning("Error getting MAX_BLOCK_SIZE from %s: %s",
607                         child->device_name, device_error_or_status(child));
608                 continue;
609             }
610             child_max = g_value_get_uint(&property_result);
611
612             if (child_min == 0 || child_max == 0 || (child_min > child_max)) {
613                 g_warning("Invalid min, max block sizes from %s", child->device_name);
614                 continue;
615             }
616         }
617
618         found_one = TRUE;
619         min = MAX(min, child_min);
620         max = MIN(max, child_max);
621     }
622
623     if (!found_one) {
624         device_set_error((Device*)self,
625             stralloc(_("Could not find any child devices' block size ranges")),
626             DEVICE_STATUS_DEVICE_ERROR);
627         return 0;
628     }
629
630     if (min > max) {
631         device_set_error((Device*)self,
632             stralloc(_("No block size is acceptable to all child devices")),
633             DEVICE_STATUS_DEVICE_ERROR);
634         return 0;
635     }
636
637     /* Now pick a number.  If 32k is in range, we use that; otherwise, we use
638      * the nearest acceptable size. */
639     result = CLAMP(32768, min, max);
640
641     if (rait_size) {
642         guint data_children;
643         find_simple_params(self, NULL, &data_children);
644         *rait_size = result * data_children;
645     }
646
647     return result;
648 }
649
650 /* Set BLOCK_SIZE on all children */
651 static gboolean
652 set_block_size_on_children(RaitDevice *self, gsize child_block_size)
653 {
654     GValue val;
655     guint i;
656     PropertySource source;
657
658     bzero(&val, sizeof(val));
659
660     g_assert(child_block_size < INT_MAX);
661     g_value_init(&val, G_TYPE_INT);
662     g_value_set_int(&val, (gint)child_block_size);
663
664     for (i = 0; i < self->private->children->len; i ++) {
665         Device *child;
666         GValue property_result;
667
668         bzero(&property_result, sizeof(property_result));
669
670         if ((signed)i == self->private->failed)
671             continue;
672
673         child = g_ptr_array_index(self->private->children, i);
674
675         /* first, make sure the block size is at its default, or is already
676          * correct */
677         if (device_property_get_ex(child, PROPERTY_BLOCK_SIZE,
678                                  &property_result, NULL, &source)) {
679             gsize from_child = g_value_get_int(&property_result);
680             g_value_unset(&property_result);
681             if (source != PROPERTY_SOURCE_DEFAULT
682                     && from_child != child_block_size) {
683                 device_set_error((Device *)self,
684                     vstrallocf(_("Child device %s already has its block size set to %zd, not %zd"),
685                                 child->device_name, from_child, child_block_size),
686                     DEVICE_STATUS_DEVICE_ERROR);
687                 return FALSE;
688             }
689         } else {
690             /* failing to get the block size isn't necessarily fatal.. */
691             g_warning("Error getting BLOCK_SIZE from %s: %s",
692                     child->device_name, device_error_or_status(child));
693         }
694
695         if (!device_property_set(child, PROPERTY_BLOCK_SIZE, &val)) {
696             device_set_error((Device *)self,
697                 vstrallocf(_("Error setting block size on %s"), child->device_name),
698                 DEVICE_STATUS_DEVICE_ERROR);
699             return FALSE;
700         }
701     }
702
703     return TRUE;
704 }
705
706 /* The time for users to specify block sizes has ended; set this device's
707  * block-size attributes for easy access by other RAIT functions.  Returns
708  * FALSE on error, with the device's error status already set. */
709 static gboolean
710 fix_block_size(RaitDevice *self)
711 {
712     Device *dself = (Device *)self;
713     gsize my_block_size, child_block_size;
714
715     if (dself->block_size_source == PROPERTY_SOURCE_DEFAULT) {
716         child_block_size = calculate_block_size_from_children(self, &my_block_size);
717         if (child_block_size == 0)
718             return FALSE;
719
720         self->private->child_block_size = child_block_size;
721         dself->block_size = my_block_size;
722         dself->block_size_surety = PROPERTY_SURETY_GOOD;
723         dself->block_size_source = PROPERTY_SOURCE_DETECTED;
724     } else {
725         guint data_children;
726
727         find_simple_params(self, NULL, &data_children);
728         g_assert((dself->block_size % data_children) == 0);
729         child_block_size = dself->block_size / data_children;
730     }
731
732     /* now tell the children we mean it */
733     if (!set_block_size_on_children(self, child_block_size))
734         return FALSE;
735
736     return TRUE;
737 }
738
739 /* This structure contains common fields for many operations. Not all
740    operations use all fields, however. */
741 typedef struct {
742     gpointer result; /* May be a pointer; may be an integer or boolean
743                         stored with GINT_TO_POINTER. */
744     Device * child;  /* The device in question. Used by all
745                         operations. */
746     guint child_index; /* For recoverable operations (read-related
747                           operations), this field provides the number
748                           of this child in the self->private->children
749                           array. */
750 } GenericOp;
751
752 typedef gboolean (*BooleanExtractor)(gpointer data);
753
754 /* A BooleanExtractor */
755 static gboolean extract_boolean_generic_op(gpointer data) {
756     GenericOp * op = data;
757     return GPOINTER_TO_INT(op->result);
758 }
759
760 /* A BooleanExtractor */
761 static gboolean extract_boolean_pointer_op(gpointer data) {
762     GenericOp * op = data;
763     return op->result != NULL;
764 }
765
766 /* Does the equivalent of this perl command:
767      ! (first { !extractor($_) } @_
768    That is, calls extractor on each element of the array, and returns
769    TRUE if and only if all calls to extractor return TRUE. This function
770    stops as soon as an extractor returns false, so it's best if extractor
771    functions have no side effects.
772 */
773 static gboolean g_ptr_array_and(GPtrArray * array,
774                                 BooleanExtractor extractor) {
775     guint i;
776     if (array == NULL || array->len <= 0)
777         return FALSE;
778
779     for (i = 0; i < array->len; i ++) {
780         if (!extractor(g_ptr_array_index(array, i)))
781             return FALSE;
782     }
783
784     return TRUE;
785 }
786
787 /* Takes a RaitDevice, and makes a GPtrArray of GenericOp. */
788 static GPtrArray * make_generic_boolean_op_array(RaitDevice* self) {
789     GPtrArray * rval;
790     guint i;
791
792     rval = g_ptr_array_sized_new(self->private->children->len);
793     for (i = 0; i < self->private->children->len; i ++) {
794         GenericOp * op;
795
796         if ((signed)i == self->private->failed) {
797             continue;
798         }
799
800         op = g_new(GenericOp, 1);
801         op->child = g_ptr_array_index(self->private->children, i);
802         op->child_index = i;
803         g_ptr_array_add(rval, op);
804     }
805
806     return rval;
807 }
808
809 /* Takes a GPtrArray of GenericOp, and a BooleanExtractor, and does
810    all the proper handling for the result of operations that allow
811    device isolation. Returns FALSE only if an unrecoverable error
812    occured. */
813 static gboolean g_ptr_array_union_robust(RaitDevice * self, GPtrArray * ops,
814                                          BooleanExtractor extractor) {
815     int nfailed = 0;
816     int lastfailed = 0;
817     guint i;
818
819     /* We found one or more failed elements.  See which elements failed, and
820      * isolate them*/
821     for (i = 0; i < ops->len; i ++) {
822         GenericOp * op = g_ptr_array_index(ops, i);
823         if (!extractor(op)) {
824             self->private->failed = op->child_index;
825             g_warning("RAIT array %s isolated device %s: %s",
826                     DEVICE(self)->device_name,
827                     op->child->device_name,
828                     device_error(op->child));
829             nfailed++;
830             lastfailed = i;
831         }
832     }
833
834     /* no failures? great! */
835     if (nfailed == 0)
836         return TRUE;
837
838     /* a single failure in COMPLETE just puts us in DEGRADED mode */
839     if (self->private->status == RAIT_STATUS_COMPLETE && nfailed == 1) {
840         self->private->status = RAIT_STATUS_DEGRADED;
841         self->private->failed = lastfailed;
842         g_warning("RAIT array %s DEGRADED", DEVICE(self)->device_name);
843         return TRUE;
844     } else {
845         self->private->status = RAIT_STATUS_FAILED;
846         g_warning("RAIT array %s FAILED", DEVICE(self)->device_name);
847         return FALSE;
848     }
849 }
850
851 typedef struct {
852     RaitDevice * self;
853     char *rait_name;
854     char * device_name; /* IN */
855     Device * result;    /* OUT */
856 } OpenDeviceOp;
857
858 /* A GFunc. */
859 static void device_open_do_op(gpointer data,
860                               gpointer user_data G_GNUC_UNUSED) {
861     OpenDeviceOp * op = data;
862
863     if (strcmp(op->device_name, "ERROR") == 0 ||
864         strcmp(op->device_name, "MISSING") == 0 ||
865         strcmp(op->device_name, "DEGRADED") == 0) {
866         g_warning("RAIT device %s contains a missing element, attempting "
867                   "degraded mode.\n", op->rait_name);
868         op->result = NULL;
869     } else {
870         op->result = device_open(op->device_name);
871     }
872 }
873
874 /* Returns TRUE if and only if the volume label and time are equal. */
875 static gboolean compare_volume_results(Device * a, Device * b) {
876     return (0 == compare_possibly_null_strings(a->volume_time, b->volume_time)
877          && 0 == compare_possibly_null_strings(a->volume_label, b->volume_label));
878 }
879
880 /* Stickes new_message at the end of *old_message; frees new_message and
881  * may change *old_message. */
882 static void append_message(char ** old_message, char * new_message) {
883     char * rval;
884     if (*old_message == NULL || **old_message == '\0') {
885         rval = new_message;
886     } else {
887         rval = g_strdup_printf("%s; %s", *old_message, new_message);
888         amfree(new_message);
889     }
890     amfree(*old_message);
891     *old_message = rval;
892 }
893
894 static gboolean
895 open_child_devices (Device * dself, char * device_name,
896             char * device_node) {
897     GPtrArray *device_names;
898     GPtrArray * device_open_ops;
899     guint i;
900     gboolean failure;
901     char *failure_errmsgs;
902     DeviceStatusFlags failure_flags;
903     RaitDevice * self;
904
905     self = RAIT_DEVICE(dself);
906
907     device_names = expand_braced_alternates(device_node);
908
909     if (device_names == NULL) {
910         device_set_error(dself,
911             vstrallocf(_("Invalid RAIT device name '%s'"), device_name),
912             DEVICE_STATUS_DEVICE_ERROR);
913         return FALSE;
914     }
915
916     /* Open devices in a separate thread, in case they have to rewind etc. */
917     device_open_ops = g_ptr_array_new();
918
919     for (i = 0; i < device_names->len; i++) {
920         OpenDeviceOp *op;
921         char *name = g_ptr_array_index(device_names, i);
922
923         op = g_new(OpenDeviceOp, 1);
924         op->device_name = name;
925         op->result = NULL;
926         op->self = self;
927         op->rait_name = device_name;
928         g_ptr_array_add(device_open_ops, op);
929     }
930
931     g_ptr_array_free(device_names, TRUE);
932     do_rait_child_ops(self, device_open_do_op, device_open_ops);
933
934     failure = FALSE;
935     failure_errmsgs = NULL;
936     failure_flags = 0;
937
938     /* Check results of opening devices. */
939     for (i = 0; i < device_open_ops->len; i ++) {
940         OpenDeviceOp *op = g_ptr_array_index(device_open_ops, i);
941
942         if (op->result != NULL &&
943             op->result->status == DEVICE_STATUS_SUCCESS) {
944             g_ptr_array_add(self->private->children, op->result);
945         } else {
946             char * this_failure_errmsg =
947                 g_strdup_printf("%s: %s", op->device_name,
948                                 device_error_or_status(op->result));
949             DeviceStatusFlags status =
950                 op->result == NULL ?
951                     DEVICE_STATUS_DEVICE_ERROR : op->result->status;
952             append_message(&failure_errmsgs,
953                            strdup(this_failure_errmsg));
954             failure_flags |= status;
955             if (self->private->status == RAIT_STATUS_COMPLETE) {
956                 /* The first failure just puts us in degraded mode. */
957                 g_warning("%s: %s",
958                           device_name, this_failure_errmsg);
959                 g_warning("%s: %s failed, entering degraded mode.",
960                           device_name, op->device_name);
961                 g_ptr_array_add(self->private->children, op->result);
962                 self->private->status = RAIT_STATUS_DEGRADED;
963                 self->private->failed = i;
964             } else {
965                 /* The second and further failures are fatal. */
966                 failure = TRUE;
967             }
968         }
969         amfree(op->device_name);
970     }
971
972     g_ptr_array_free_full(device_open_ops);
973
974     if (failure) {
975         self->private->status = RAIT_STATUS_FAILED;
976         device_set_error(dself, failure_errmsgs, failure_flags);
977         return FALSE;
978     }
979
980     return TRUE;
981 }
982
983 static void
984 rait_device_open_device (Device * dself, char * device_name,
985             char * device_type G_GNUC_UNUSED, char * device_node) {
986
987     if (0 != strcmp(device_node, DEFER_CHILDREN_SENTINEL)) {
988         if (!open_child_devices(dself, device_name, device_node))
989             return;
990
991         /* Chain up. */
992         if (parent_class->open_device) {
993             parent_class->open_device(dself, device_name, device_type, device_node);
994         }
995     }
996 }
997
998 Device *
999 rait_device_open_from_children (GSList *child_devices) {
1000     Device *dself;
1001     RaitDevice *self;
1002     GSList *iter;
1003     char *device_name;
1004     int nfailures;
1005     int i;
1006
1007     /* first, open a RAIT device using the DEFER_CHILDREN_SENTINEL */
1008     dself = device_open("rait:" DEFER_CHILDREN_SENTINEL);
1009     if (!IS_RAIT_DEVICE(dself)) {
1010         return dself;
1011     }
1012
1013     /* set its children */
1014     self = RAIT_DEVICE(dself);
1015     nfailures = 0;
1016     for (i=0, iter = child_devices; iter; i++, iter = iter->next) {
1017         Device *kid = iter->data;
1018
1019         /* a NULL kid is OK -- it opens the device in degraded mode */
1020         if (!kid) {
1021             nfailures++;
1022             self->private->failed = i;
1023         } else {
1024             g_assert(IS_DEVICE(kid));
1025             g_object_ref((GObject *)kid);
1026         }
1027
1028         g_ptr_array_add(self->private->children, kid);
1029     }
1030
1031     /* and set the status based on the children */
1032     switch (nfailures) {
1033         case 0:
1034             self->private->status = RAIT_STATUS_COMPLETE;
1035             break;
1036
1037         case 1:
1038             self->private->status = RAIT_STATUS_DEGRADED;
1039             break;
1040
1041         default:
1042             self->private->status = RAIT_STATUS_FAILED;
1043             device_set_error(dself,
1044                     _("more than one child device is missing"),
1045                     DEVICE_STATUS_DEVICE_ERROR);
1046             break;
1047     }
1048
1049     /* create a name from the children's names and use it to chain up
1050      * to open_device (we skipped this step in rait_device_open_device) */
1051     device_name = child_device_names_to_rait_name(self);
1052
1053     if (parent_class->open_device) {
1054         parent_class->open_device(dself,
1055             device_name, "rait",
1056             device_name+5); /* (+5 skips "rait:") */
1057     }
1058
1059     return dself;
1060 }
1061
1062 /* A GFunc. */
1063 static void read_label_do_op(gpointer data,
1064                              gpointer user_data G_GNUC_UNUSED) {
1065     GenericOp * op = data;
1066     op->result = GINT_TO_POINTER(device_read_label(op->child));
1067 }
1068
1069 static DeviceStatusFlags rait_device_read_label(Device * dself) {
1070     RaitDevice * self;
1071     GPtrArray * ops;
1072     DeviceStatusFlags failed_result = 0;
1073     char *failed_errmsg = NULL;
1074     unsigned int i;
1075     Device * first_success = NULL;
1076
1077     self = RAIT_DEVICE(dself);
1078
1079     amfree(dself->volume_time);
1080     amfree(dself->volume_label);
1081     dumpfile_free(dself->volume_header);
1082     dself->volume_header = NULL;
1083
1084     if (rait_device_in_error(self))
1085         return dself->status | DEVICE_STATUS_DEVICE_ERROR;
1086
1087     /* nail down our block size, if we haven't already */
1088     if (!fix_block_size(self))
1089         return FALSE;
1090
1091     ops = make_generic_boolean_op_array(self);
1092
1093     do_rait_child_ops(self, read_label_do_op, ops);
1094
1095     for (i = 0; i < ops->len; i ++) {
1096         GenericOp * op = g_ptr_array_index(ops, i);
1097         DeviceStatusFlags result = GPOINTER_TO_INT(op->result);
1098         if (op->result == DEVICE_STATUS_SUCCESS) {
1099             if (first_success == NULL) {
1100                 /* This is the first successful device. */
1101                 first_success = op->child;
1102             } else if (!compare_volume_results(first_success, op->child)) {
1103                 /* Doesn't match. :-( */
1104                 failed_errmsg = vstrallocf("Inconsistent volume labels/datestamps: "
1105                         "Got %s/%s on %s against %s/%s on %s.",
1106                         first_success->volume_label,
1107                         first_success->volume_time,
1108                         first_success->device_name,
1109                         op->child->volume_label,
1110                         op->child->volume_time,
1111                         op->child->device_name);
1112                 g_warning("%s", failed_errmsg);
1113                 failed_result |= DEVICE_STATUS_VOLUME_ERROR;
1114             }
1115         } else {
1116             failed_result |= result;
1117         }
1118     }
1119
1120     if (failed_result != DEVICE_STATUS_SUCCESS) {
1121         /* We had multiple failures or an inconsistency. */
1122         device_set_error(dself, failed_errmsg, failed_result);
1123     } else {
1124         /* Everything peachy. */
1125         amfree(failed_errmsg);
1126
1127         g_assert(first_success != NULL);
1128         if (first_success->volume_label != NULL) {
1129             dself->volume_label = g_strdup(first_success->volume_label);
1130         }
1131         if (first_success->volume_time != NULL) {
1132             dself->volume_time = g_strdup(first_success->volume_time);
1133         }
1134         if (first_success->volume_header != NULL) {
1135             dself->volume_header = dumpfile_copy(first_success->volume_header);
1136         }
1137         dself->header_block_size = first_success->header_block_size;
1138     }
1139
1140     g_ptr_array_free_full(ops);
1141
1142     return dself->status;
1143 }
1144
1145 typedef struct {
1146     GenericOp base;
1147     DeviceAccessMode mode; /* IN */
1148     char * label;          /* IN */
1149     char * timestamp;      /* IN */
1150 } StartOp;
1151
1152 /* A GFunc. */
1153 static void start_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1154     DeviceClass *klass;
1155     StartOp * param = data;
1156
1157     klass = DEVICE_GET_CLASS(param->base.child);
1158     if (klass->start) {
1159         param->base.result =
1160             GINT_TO_POINTER((klass->start)(param->base.child,
1161                                             param->mode, param->label,
1162                                             param->timestamp));
1163     } else {
1164         param->base.result = FALSE;
1165     }
1166 }
1167
1168 static gboolean
1169 rait_device_configure(Device * dself, gboolean use_global_config)
1170 {
1171     RaitDevice *self = RAIT_DEVICE(dself);
1172     guint i;
1173
1174     for (i = 0; i < self->private->children->len; i ++) {
1175         Device *child;
1176
1177         if ((signed)i == self->private->failed)
1178             continue;
1179
1180         child = g_ptr_array_index(self->private->children, i);
1181         /* unconditionally configure the child without the global
1182          * configuration */
1183         if (!device_configure(child, FALSE))
1184             return FALSE;
1185     }
1186
1187     if (parent_class->configure) {
1188         return parent_class->configure(dself, use_global_config);
1189     }
1190
1191     return TRUE;
1192 }
1193
1194 static gboolean
1195 rait_device_start (Device * dself, DeviceAccessMode mode, char * label,
1196                    char * timestamp) {
1197     GPtrArray * ops;
1198     guint i;
1199     gboolean success;
1200     RaitDevice * self;
1201     DeviceStatusFlags total_status;
1202     char *failure_errmsgs = NULL;
1203     char * label_from_device = NULL;
1204
1205     self = RAIT_DEVICE(dself);
1206
1207     if (rait_device_in_error(self)) return FALSE;
1208
1209     /* No starting in degraded mode. */
1210     if (self->private->status != RAIT_STATUS_COMPLETE &&
1211         (mode == ACCESS_WRITE || mode == ACCESS_APPEND)) {
1212         device_set_error(dself,
1213                          g_strdup_printf(_("RAIT device %s is read-only "
1214                                            "because it is in degraded mode.\n"),
1215                                          dself->device_name),
1216                          DEVICE_STATUS_DEVICE_ERROR);
1217         return FALSE;
1218     }
1219
1220     /* nail down our block size, if we haven't already */
1221     if (!fix_block_size(self))
1222         return FALSE;
1223
1224     dself->access_mode = mode;
1225     dself->in_file = FALSE;
1226     amfree(dself->volume_label);
1227     amfree(dself->volume_time);
1228     dumpfile_free(dself->volume_header);
1229     dself->volume_header = NULL;
1230
1231     ops = g_ptr_array_sized_new(self->private->children->len);
1232     for (i = 0; i < self->private->children->len; i ++) {
1233         StartOp * op;
1234
1235         if ((signed)i == self->private->failed) {
1236             continue;
1237         }
1238
1239         op = g_new(StartOp, 1);
1240         op->base.child = g_ptr_array_index(self->private->children, i);
1241         op->mode = mode;
1242         op->label = g_strdup(label);
1243         op->timestamp = g_strdup(timestamp);
1244         g_ptr_array_add(ops, op);
1245     }
1246
1247     do_rait_child_ops(self, start_do_op, ops);
1248
1249     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1250
1251     /* Check results of starting devices; this is mostly about the
1252      * VOLUME_UNLABELED flag. */
1253     total_status = 0;
1254     for (i = 0; i < ops->len; i ++) {
1255         StartOp * op = g_ptr_array_index(ops, i);
1256         Device *child = op->base.child;
1257
1258         total_status |= child->status;
1259         if (child->status != DEVICE_STATUS_SUCCESS) {
1260             /* record the error message and move on. */
1261             append_message(&failure_errmsgs,
1262                            g_strdup_printf("%s: %s",
1263                                            child->device_name,
1264                                            device_error_or_status(child)));
1265         } else {
1266             if (child->volume_label != NULL && child->volume_time != NULL) {
1267                 if (label_from_device) {
1268                     if (strcmp(child->volume_label, dself->volume_label) != 0 ||
1269                         strcmp(child->volume_time, dself->volume_time) != 0) {
1270                         /* Mismatch! (Two devices provided different labels) */
1271                         char * this_message =
1272                             g_strdup_printf("%s: Label (%s/%s) is different "
1273                                             "from label (%s/%s) found at "
1274                                             "device %s",
1275                                             child->device_name,
1276                                             child->volume_label,
1277                                             child->volume_time,
1278                                             dself->volume_label,
1279                                             dself->volume_time,
1280                                             label_from_device);
1281                         append_message(&failure_errmsgs, this_message);
1282                         total_status |= DEVICE_STATUS_DEVICE_ERROR;
1283                         g_warning("RAIT device children have different labels or timestamps");
1284                     }
1285                 } else {
1286                     /* First device with a volume. */
1287                     dself->volume_label = g_strdup(child->volume_label);
1288                     dself->volume_time = g_strdup(child->volume_time);
1289                     dself->volume_header = dumpfile_copy(child->volume_header);
1290                     label_from_device = g_strdup(child->device_name);
1291                 }
1292             } else {
1293                 /* Device problem, it says it succeeded but sets no label? */
1294                 char * this_message =
1295                     g_strdup_printf("%s: Says label read, but no volume "
1296                                      "label found.", child->device_name);
1297                 g_warning("RAIT device child has NULL volume or label");
1298                 append_message(&failure_errmsgs, this_message);
1299                 total_status |= DEVICE_STATUS_DEVICE_ERROR;
1300             }
1301         }
1302     }
1303
1304     if (total_status == DEVICE_STATUS_SUCCESS) {
1305         StartOp * op = g_ptr_array_index(ops, 0);
1306         Device *child = op->base.child;
1307         dself->header_block_size = child->header_block_size;
1308     }
1309
1310     amfree(label_from_device);
1311     g_ptr_array_free_full(ops);
1312
1313     dself->status = total_status;
1314
1315     if (total_status != DEVICE_STATUS_SUCCESS || !success) {
1316         device_set_error(dself, failure_errmsgs, total_status);
1317         return FALSE;
1318     }
1319     amfree(failure_errmsgs);
1320     return TRUE;
1321 }
1322
1323 typedef struct {
1324     GenericOp base;
1325     dumpfile_t * info; /* IN */
1326     int fileno;
1327 } StartFileOp;
1328
1329 /* a GFunc */
1330 static void start_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1331     StartFileOp * op = data;
1332     op->base.result = GINT_TO_POINTER(device_start_file(op->base.child,
1333                                                         op->info));
1334     op->fileno = op->base.child->file;
1335     if (op->fileno < 1) {
1336         op->base.result = FALSE;
1337     }
1338 }
1339
1340 static gboolean
1341 rait_device_start_file (Device * dself, dumpfile_t * info) {
1342     GPtrArray * ops;
1343     guint i;
1344     gboolean success;
1345     RaitDevice * self;
1346     int actual_file = -1;
1347
1348     self = RAIT_DEVICE(dself);
1349
1350     if (rait_device_in_error(self)) return FALSE;
1351     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1352
1353     ops = g_ptr_array_sized_new(self->private->children->len);
1354     for (i = 0; i < self->private->children->len; i ++) {
1355         StartFileOp * op;
1356         op = g_new(StartFileOp, 1);
1357         op->base.child = g_ptr_array_index(self->private->children, i);
1358         /* each child gets its own copy of the header, to munge as it
1359          * likes (setting blocksize, at least) */
1360         op->info = dumpfile_copy(info);
1361         g_ptr_array_add(ops, op);
1362     }
1363
1364     do_rait_child_ops(self, start_file_do_op, ops);
1365
1366     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1367
1368     for (i = 0; i < self->private->children->len && success; i ++) {
1369         StartFileOp * op = g_ptr_array_index(ops, i);
1370         if (!op->base.result)
1371             continue;
1372         g_assert(op->fileno >= 1);
1373         if (actual_file < 1) {
1374             actual_file = op->fileno;
1375         }
1376         if (actual_file != op->fileno) {
1377             /* File number mismatch! Aah, my hair is on fire! */
1378             device_set_error(dself,
1379                              g_strdup_printf("File number mismatch in "
1380                                              "rait_device_start_file(): "
1381                                              "Child %s reported file number "
1382                                              "%d, another child reported "
1383                                              "file number %d.",
1384                                              op->base.child->device_name,
1385                                              op->fileno, actual_file),
1386                              DEVICE_STATUS_DEVICE_ERROR);
1387             success = FALSE;
1388             op->base.result = FALSE;
1389         }
1390     }
1391
1392     for (i = 0; i < ops->len && success; i ++) {
1393         StartFileOp * op = g_ptr_array_index(ops, i);
1394         if (op->info) dumpfile_free(op->info);
1395     }
1396     g_ptr_array_free_full(ops);
1397
1398     if (!success) {
1399         if (!device_in_error(dself)) {
1400             device_set_error(dself, stralloc("One or more devices "
1401                                              "failed to start_file"),
1402                              DEVICE_STATUS_DEVICE_ERROR);
1403         }
1404         return FALSE;
1405     }
1406
1407     dself->in_file = TRUE;
1408     g_assert(actual_file >= 1);
1409     dself->file = actual_file;
1410
1411     return TRUE;
1412 }
1413
1414 static void find_simple_params(RaitDevice * self,
1415                                guint * num_children,
1416                                guint * data_children) {
1417     int num, data;
1418
1419     num = self->private->children->len;
1420     if (num > 1)
1421         data = num - 1;
1422     else
1423         data = num;
1424     if (num_children != NULL)
1425         *num_children = num;
1426     if (data_children != NULL)
1427         *data_children = data;
1428 }
1429
1430 typedef struct {
1431     GenericOp base;
1432     guint size;           /* IN */
1433     gpointer data;        /* IN */
1434     gboolean data_needs_free; /* bookkeeping */
1435 } WriteBlockOp;
1436
1437 /* a GFunc. */
1438 static void write_block_do_op(gpointer data,
1439                               gpointer user_data G_GNUC_UNUSED) {
1440     WriteBlockOp * op = data;
1441
1442     op->base.result =
1443         GINT_TO_POINTER(device_write_block(op->base.child, op->size, op->data));
1444 }
1445
1446 /* Parity block generation. Performance of this function can be improved
1447    considerably by using larger-sized integers or
1448    assembly-coded vector instructions. Parameters are:
1449    % data       - All data chunks in series (chunk_size * num_chunks bytes)
1450    % parity     - Allocated space for parity block (chunk_size bytes)
1451  */
1452 static void make_parity_block(char * data, char * parity,
1453                               guint chunk_size, guint num_chunks) {
1454     guint i;
1455     bzero(parity, chunk_size);
1456     for (i = 0; i < num_chunks - 1; i ++) {
1457         guint j;
1458         for (j = 0; j < chunk_size; j ++) {
1459             parity[j] ^= data[chunk_size*i + j];
1460         }
1461     }
1462 }
1463
1464 /* Does the same thing as make_parity_block, but instead of using a
1465    single memory chunk holding all chunks, it takes a GPtrArray of
1466    chunks. */
1467 static void make_parity_block_extents(GPtrArray * data, char * parity,
1468                                       guint chunk_size) {
1469     guint i;
1470     bzero(parity, chunk_size);
1471     for (i = 0; i < data->len; i ++) {
1472         guint j;
1473         char * data_chunk;
1474         data_chunk = g_ptr_array_index(data, i);
1475         for (j = 0; j < chunk_size; j ++) {
1476             parity[j] ^= data_chunk[j];
1477         }
1478     }
1479 }
1480
1481 /* Does the parity creation algorithm. Allocates and returns a single
1482    device block from a larger RAIT block. chunks and chunk are 1-indexed. */
1483 static char * extract_data_block(char * data, guint size,
1484                                  guint chunks, guint chunk) {
1485     char * rval;
1486     guint chunk_size;
1487
1488     g_assert(chunks > 0 && chunk > 0 && chunk <= chunks);
1489     g_assert(data != NULL);
1490     g_assert(size > 0 && size % (chunks - 1) == 0);
1491
1492     chunk_size = size / (chunks - 1);
1493     rval = g_malloc(chunk_size);
1494     if (chunks != chunk) {
1495         /* data block. */
1496         memcpy(rval, data + chunk_size * (chunk - 1), chunk_size);
1497     } else {
1498         make_parity_block(data, rval, chunk_size, chunks);
1499     }
1500
1501     return rval;
1502 }
1503
1504 static gboolean
1505 rait_device_write_block (Device * dself, guint size, gpointer data) {
1506     GPtrArray * ops;
1507     guint i;
1508     gboolean success;
1509     guint data_children, num_children;
1510     gsize blocksize = dself->block_size;
1511     RaitDevice * self;
1512     gboolean last_block = (size < blocksize);
1513
1514     self = RAIT_DEVICE(dself);
1515
1516     if (rait_device_in_error(self)) return FALSE;
1517     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1518
1519     find_simple_params(RAIT_DEVICE(self), &num_children, &data_children);
1520     num_children = self->private->children->len;
1521     if (num_children != 1)
1522         data_children = num_children - 1;
1523     else
1524         data_children = num_children;
1525
1526     g_assert(size % data_children == 0 || last_block);
1527
1528     /* zero out to the end of a short block -- tape devices only write
1529      * whole blocks. */
1530     if (last_block) {
1531         char *new_data;
1532
1533         new_data = g_malloc(blocksize);
1534         memcpy(new_data, data, size);
1535         bzero(new_data + size, blocksize - size);
1536
1537         data = new_data;
1538         size = blocksize;
1539     }
1540
1541     ops = g_ptr_array_sized_new(num_children);
1542     for (i = 0; i < self->private->children->len; i ++) {
1543         WriteBlockOp * op;
1544         op = g_malloc(sizeof(*op));
1545         op->base.child = g_ptr_array_index(self->private->children, i);
1546         op->size = size / data_children;
1547         if (num_children <= 2) {
1548             op->data = data;
1549             op->data_needs_free = FALSE;
1550         } else {
1551             op->data_needs_free = TRUE;
1552             op->data = extract_data_block(data, size, num_children, i + 1);
1553         }
1554         g_ptr_array_add(ops, op);
1555     }
1556
1557     do_rait_child_ops(self, write_block_do_op, ops);
1558
1559     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1560
1561     for (i = 0; i < self->private->children->len; i ++) {
1562         WriteBlockOp * op = g_ptr_array_index(ops, i);
1563         if (op->data_needs_free)
1564             free(op->data);
1565     }
1566
1567     if (last_block) {
1568         amfree(data);
1569     }
1570
1571     g_ptr_array_free_full(ops);
1572
1573     if (!success) {
1574         /* TODO be more specific here */
1575         /* TODO: handle EOM here -- if one or more (or two or more??)
1576          * children have is_eom set, then reflect that in our error
1577          * status. What's more fun is when one device fails and must be isolated at
1578          * the same time another hits EOF. */
1579         device_set_error(dself,
1580             stralloc("One or more devices failed to write_block"),
1581             DEVICE_STATUS_DEVICE_ERROR);
1582         /* this is EOM or an error, so call it EOM */
1583         dself->is_eom = TRUE;
1584         return FALSE;
1585     } else {
1586         dself->block ++;
1587
1588         return TRUE;
1589     }
1590 }
1591
1592 /* A GFunc */
1593 static void finish_file_do_op(gpointer data,
1594                               gpointer user_data G_GNUC_UNUSED) {
1595     GenericOp * op = data;
1596     if (op->child) {
1597         op->result = GINT_TO_POINTER(device_finish_file(op->child));
1598     } else {
1599         op->result = FALSE;
1600     }
1601 }
1602
1603 static gboolean
1604 rait_device_finish_file (Device * dself) {
1605     GPtrArray * ops;
1606     gboolean success;
1607     RaitDevice * self = RAIT_DEVICE(dself);
1608
1609     g_assert(self != NULL);
1610     if (rait_device_in_error(dself)) return FALSE;
1611     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1612
1613     ops = make_generic_boolean_op_array(self);
1614
1615     do_rait_child_ops(self, finish_file_do_op, ops);
1616
1617     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1618
1619     g_ptr_array_free_full(ops);
1620
1621     if (!success) {
1622         /* TODO: be more specific here */
1623         device_set_error(dself,
1624                          g_strdup("One or more devices failed to finish_file"),
1625             DEVICE_STATUS_DEVICE_ERROR);
1626         return FALSE;
1627     }
1628
1629     dself->in_file = FALSE;
1630     return TRUE;
1631 }
1632
1633 typedef struct {
1634     GenericOp base;
1635     guint requested_file;                /* IN */
1636     guint actual_file;                   /* OUT */
1637 } SeekFileOp;
1638
1639 /* a GFunc. */
1640 static void seek_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1641     SeekFileOp * op = data;
1642     op->base.result = device_seek_file(op->base.child, op->requested_file);
1643     op->actual_file = op->base.child->file;
1644 }
1645
1646 static dumpfile_t *
1647 rait_device_seek_file (Device * dself, guint file) {
1648     GPtrArray * ops;
1649     guint i;
1650     gboolean success;
1651     dumpfile_t * rval;
1652     RaitDevice * self = RAIT_DEVICE(dself);
1653     guint actual_file = 0;
1654     gboolean in_file = FALSE;
1655
1656     if (rait_device_in_error(self)) return NULL;
1657
1658     dself->in_file = FALSE;
1659     dself->is_eof = FALSE;
1660     dself->block = 0;
1661
1662     ops = g_ptr_array_sized_new(self->private->children->len);
1663     for (i = 0; i < self->private->children->len; i ++) {
1664         SeekFileOp * op;
1665         if ((int)i == self->private->failed)
1666             continue; /* This device is broken. */
1667         op = g_new(SeekFileOp, 1);
1668         op->base.child = g_ptr_array_index(self->private->children, i);
1669         op->base.child_index = i;
1670         op->requested_file = file;
1671         g_ptr_array_add(ops, op);
1672     }
1673
1674     do_rait_child_ops(self, seek_file_do_op, ops);
1675
1676     /* This checks for NULL values, but we still have to check for
1677        consistant headers. */
1678     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1679                                        ops, extract_boolean_pointer_op);
1680
1681     rval = NULL;
1682     for (i = 0; i < ops->len; i ++) {
1683         SeekFileOp * this_op;
1684         dumpfile_t * this_result;
1685         guint this_actual_file;
1686         gboolean this_in_file;
1687
1688         this_op = (SeekFileOp*)g_ptr_array_index(ops, i);
1689
1690         if ((signed)this_op->base.child_index == self->private->failed)
1691             continue;
1692
1693         this_result = this_op->base.result;
1694         this_actual_file = this_op->actual_file;
1695         this_in_file = this_op->base.child->in_file;
1696
1697         if (rval == NULL) {
1698             rval = this_result;
1699             actual_file = this_actual_file;
1700             in_file = this_in_file;
1701         } else {
1702             if (headers_are_equal(rval, this_result) &&
1703                 actual_file == this_actual_file &&
1704                 in_file == this_in_file) {
1705                 /* Do nothing. */
1706             } else {
1707                 success = FALSE;
1708             }
1709             free(this_result);
1710         }
1711     }
1712
1713     g_ptr_array_free_full(ops);
1714
1715     if (!success) {
1716         amfree(rval);
1717         /* TODO: be more specific here */
1718         device_set_error(dself,
1719                          g_strdup("One or more devices failed to seek_file"),
1720             DEVICE_STATUS_DEVICE_ERROR);
1721         return NULL;
1722     }
1723
1724     /* update our state */
1725     dself->in_file = in_file;
1726     dself->file = actual_file;
1727
1728     return rval;
1729 }
1730
1731 typedef struct {
1732     GenericOp base;
1733     guint64 block; /* IN */
1734 } SeekBlockOp;
1735
1736 /* a GFunc. */
1737 static void seek_block_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1738     SeekBlockOp * op = data;
1739     op->base.result =
1740         GINT_TO_POINTER(device_seek_block(op->base.child, op->block));
1741 }
1742
1743 static gboolean
1744 rait_device_seek_block (Device * dself, guint64 block) {
1745     GPtrArray * ops;
1746     guint i;
1747     gboolean success;
1748
1749     RaitDevice * self = RAIT_DEVICE(dself);
1750
1751     if (rait_device_in_error(self)) return FALSE;
1752
1753     ops = g_ptr_array_sized_new(self->private->children->len);
1754     for (i = 0; i < self->private->children->len; i ++) {
1755         SeekBlockOp * op;
1756         if ((int)i == self->private->failed)
1757             continue; /* This device is broken. */
1758         op = g_new(SeekBlockOp, 1);
1759         op->base.child = g_ptr_array_index(self->private->children, i);
1760         op->base.child_index = i;
1761         op->block = block;
1762         g_ptr_array_add(ops, op);
1763     }
1764
1765     do_rait_child_ops(self, seek_block_do_op, ops);
1766
1767     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1768                                        ops, extract_boolean_generic_op);
1769
1770     g_ptr_array_free_full(ops);
1771
1772     if (!success) {
1773         /* TODO: be more specific here */
1774         device_set_error(dself,
1775             stralloc("One or more devices failed to seek_block"),
1776             DEVICE_STATUS_DEVICE_ERROR);
1777         return FALSE;
1778     }
1779
1780     dself->block = block;
1781     return TRUE;
1782 }
1783
1784 typedef struct {
1785     GenericOp base;
1786     gpointer buffer; /* IN */
1787     int read_size;      /* IN/OUT -- note not a pointer */
1788     int desired_read_size; /* bookkeeping */
1789 } ReadBlockOp;
1790
1791 /* a GFunc. */
1792 static void read_block_do_op(gpointer data,
1793                              gpointer user_data G_GNUC_UNUSED) {
1794     ReadBlockOp * op = data;
1795     op->base.result =
1796         GINT_TO_POINTER(device_read_block(op->base.child, op->buffer,
1797                                           &(op->read_size)));
1798     if (op->read_size > op->desired_read_size) {
1799         g_warning("child device %s tried to return an oversized block, which the RAIT device does not support",
1800                   op->base.child->device_name);
1801     }
1802 }
1803
1804 /* A BooleanExtractor. This one checks for a successful read. */
1805 static gboolean extract_boolean_read_block_op_data(gpointer data) {
1806     ReadBlockOp * op = data;
1807     return GPOINTER_TO_INT(op->base.result) == op->desired_read_size;
1808 }
1809
1810 /* A BooleanExtractor. This one checks for EOF. */
1811 static gboolean extract_boolean_read_block_op_eof(gpointer data) {
1812     ReadBlockOp * op = data;
1813     return op->base.child->is_eof;
1814 }
1815
1816 /* Counts the number of elements in an array matching a given proposition. */
1817 static int g_ptr_array_count(GPtrArray * array, BooleanExtractor filter) {
1818     int rval;
1819     unsigned int i;
1820     rval = 0;
1821     for (i = 0; i < array->len ; i++) {
1822         if (filter(g_ptr_array_index(array, i)))
1823             rval ++;
1824     }
1825     return rval;
1826 }
1827
1828 static gboolean raid_block_reconstruction(RaitDevice * self, GPtrArray * ops,
1829                                       gpointer buf, size_t bufsize) {
1830     guint num_children, data_children;
1831     gsize blocksize;
1832     gsize child_blocksize;
1833     guint i;
1834     int parity_child;
1835     gpointer parity_block = NULL;
1836     gboolean success;
1837
1838     success = TRUE;
1839
1840     blocksize = DEVICE(self)->block_size;
1841     find_simple_params(self, &num_children, &data_children);
1842
1843     if (num_children > 1)
1844         parity_child = num_children - 1;
1845     else
1846         parity_child = -1;
1847
1848     child_blocksize = blocksize / data_children;
1849
1850     for (i = 0; i < ops->len; i ++) {
1851         ReadBlockOp * op = g_ptr_array_index(ops, i);
1852         if (!extract_boolean_read_block_op_data(op))
1853             continue;
1854         if ((int)(op->base.child_index) == parity_child) {
1855             parity_block = op->buffer;
1856         } else {
1857             g_assert(child_blocksize * (op->base.child_index+1) <= bufsize);
1858             memcpy((char *)buf + child_blocksize * op->base.child_index, op->buffer,
1859                    child_blocksize);
1860         }
1861     }
1862     if (self->private->status == RAIT_STATUS_COMPLETE) {
1863         g_assert(parity_block != NULL); /* should have found parity_child */
1864
1865         if (num_children >= 2) {
1866             /* Verify the parity block. This code is inefficient but
1867                does the job for the 2-device case, too. */
1868             gpointer constructed_parity;
1869             GPtrArray * data_extents;
1870
1871             constructed_parity = g_malloc(child_blocksize);
1872             data_extents = g_ptr_array_sized_new(data_children);
1873             for (i = 0; i < data_children; i ++) {
1874                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1875                 g_assert(extract_boolean_read_block_op_data(op));
1876                 if ((int)op->base.child_index == parity_child)
1877                     continue;
1878                 g_ptr_array_add(data_extents, op->buffer);
1879             }
1880             make_parity_block_extents(data_extents, constructed_parity,
1881                                       child_blocksize);
1882
1883             if (0 != memcmp(parity_block, constructed_parity,
1884                             child_blocksize)) {
1885                 device_set_error(DEVICE(self),
1886                     stralloc(_("RAIT is inconsistent: Parity block did not match data blocks.")),
1887                     DEVICE_STATUS_DEVICE_ERROR);
1888                 /* TODO: can't we just isolate the device in this case? */
1889                 success = FALSE;
1890             }
1891             g_ptr_array_free(data_extents, TRUE);
1892             amfree(constructed_parity);
1893         } else { /* do nothing. */ }
1894     } else if (self->private->status == RAIT_STATUS_DEGRADED) {
1895         g_assert(self->private->failed >= 0 && self->private->failed < (int)num_children);
1896         /* We are in degraded mode. What's missing? */
1897         if (self->private->failed == parity_child) {
1898             /* do nothing. */
1899         } else if (num_children >= 2) {
1900             /* Reconstruct failed block from parity block. */
1901             GPtrArray * data_extents = g_ptr_array_new();
1902
1903             for (i = 0; i < data_children; i ++) {
1904                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1905                 if (!extract_boolean_read_block_op_data(op))
1906                     continue;
1907                 g_ptr_array_add(data_extents, op->buffer);
1908             }
1909
1910             /* Conveniently, the reconstruction is the same procedure
1911                as the parity generation. This even works if there is
1912                only one remaining device! */
1913             make_parity_block_extents(data_extents,
1914                                       (char *)buf + (child_blocksize *
1915                                              self->private->failed),
1916                                       child_blocksize);
1917
1918             /* The array members belong to our ops argument. */
1919             g_ptr_array_free(data_extents, TRUE);
1920         } else {
1921             g_assert_not_reached();
1922         }
1923     } else {
1924         /* device is already in FAILED state -- we shouldn't even be here */
1925         success = FALSE;
1926     }
1927     return success;
1928 }
1929
1930 static int
1931 rait_device_read_block (Device * dself, gpointer buf, int * size) {
1932     GPtrArray * ops;
1933     guint i;
1934     gboolean success;
1935     guint num_children, data_children;
1936     gsize blocksize = dself->block_size;
1937     gsize child_blocksize;
1938
1939     RaitDevice * self = RAIT_DEVICE(dself);
1940
1941     if (rait_device_in_error(self)) return -1;
1942
1943     find_simple_params(self, &num_children, &data_children);
1944
1945     /* tell caller they haven't given us a big enough buffer */
1946     if (blocksize > (gsize)*size) {
1947         g_assert(blocksize < INT_MAX);
1948         *size = (int)blocksize;
1949         return 0;
1950     }
1951
1952     g_assert(blocksize % data_children == 0); /* see find_block_size */
1953     child_blocksize = blocksize / data_children;
1954
1955     ops = g_ptr_array_sized_new(num_children);
1956     for (i = 0; i < num_children; i ++) {
1957         ReadBlockOp * op;
1958         if ((int)i == self->private->failed)
1959             continue; /* This device is broken. */
1960         op = g_new(ReadBlockOp, 1);
1961         op->base.child = g_ptr_array_index(self->private->children, i);
1962         op->base.child_index = i;
1963         op->buffer = g_malloc(child_blocksize);
1964         op->desired_read_size = op->read_size = child_blocksize;
1965         g_ptr_array_add(ops, op);
1966     }
1967
1968     do_rait_child_ops(self, read_block_do_op, ops);
1969
1970     if (g_ptr_array_count(ops, extract_boolean_read_block_op_data)) {
1971         if (!g_ptr_array_union_robust(RAIT_DEVICE(self),
1972                                      ops,
1973                                      extract_boolean_read_block_op_data)) {
1974             /* TODO: be more specific */
1975             device_set_error(dself,
1976                 stralloc(_("Error occurred combining blocks from child devices")),
1977                 DEVICE_STATUS_DEVICE_ERROR);
1978             success = FALSE;
1979         } else {
1980             /* raid_block_reconstruction sets the error status if necessary */
1981             success = raid_block_reconstruction(RAIT_DEVICE(self),
1982                                                 ops, buf, (size_t)*size);
1983         }
1984     } else {
1985         success = FALSE;
1986         if (g_ptr_array_union_robust(RAIT_DEVICE(self),
1987                                      ops,
1988                                      extract_boolean_read_block_op_eof)) {
1989             device_set_error(dself,
1990                 stralloc(_("EOF")),
1991                 DEVICE_STATUS_SUCCESS);
1992             dself->is_eof = TRUE;
1993             dself->in_file = FALSE;
1994         } else {
1995             device_set_error(dself,
1996                 stralloc(_("All child devices failed to read, but not all are at eof")),
1997                 DEVICE_STATUS_DEVICE_ERROR);
1998         }
1999     }
2000
2001     for (i = 0; i < ops->len; i ++) {
2002         ReadBlockOp * op = g_ptr_array_index(ops, i);
2003         amfree(op->buffer);
2004     }
2005     g_ptr_array_free_full(ops);
2006
2007     if (success) {
2008         dself->block++;
2009         *size = blocksize;
2010         return blocksize;
2011     } else {
2012         return -1;
2013     }
2014 }
2015
2016 /* property utility functions */
2017
2018 typedef struct {
2019     GenericOp base;
2020     DevicePropertyId id;   /* IN */
2021     GValue value;          /* IN/OUT */
2022     PropertySurety surety; /* IN (for set) */
2023     PropertySource source; /* IN (for set) */
2024 } PropertyOp;
2025
2026 /* Creates a GPtrArray of PropertyOf for a get or set operation. */
2027 static GPtrArray * make_property_op_array(RaitDevice * self,
2028                                           DevicePropertyId id,
2029                                           GValue * value,
2030                                           PropertySurety surety,
2031                                           PropertySource source) {
2032     guint i;
2033     GPtrArray * ops;
2034     ops = g_ptr_array_sized_new(self->private->children->len);
2035     for (i = 0; i < self->private->children->len; i ++) {
2036         PropertyOp * op;
2037
2038         if ((signed)i == self->private->failed) {
2039             continue;
2040         }
2041
2042         op = g_new(PropertyOp, 1);
2043         op->base.child = g_ptr_array_index(self->private->children, i);
2044         op->id = id;
2045         bzero(&(op->value), sizeof(op->value));
2046         if (value != NULL) {
2047             g_value_unset_copy(value, &(op->value));
2048         }
2049         op->surety = surety;
2050         op->source = source;
2051         g_ptr_array_add(ops, op);
2052     }
2053
2054     return ops;
2055 }
2056
2057 /* A GFunc. */
2058 static void property_get_do_op(gpointer data,
2059                                gpointer user_data G_GNUC_UNUSED) {
2060     PropertyOp * op = data;
2061
2062     bzero(&(op->value), sizeof(op->value));
2063     op->base.result =
2064         GINT_TO_POINTER(device_property_get(op->base.child, op->id,
2065                                             &(op->value)));
2066 }
2067
2068 /* A GFunc. */
2069 static void property_set_do_op(gpointer data,
2070                                gpointer user_data G_GNUC_UNUSED) {
2071     PropertyOp * op = data;
2072
2073     op->base.result =
2074         GINT_TO_POINTER(device_property_set_ex(op->base.child, op->id,
2075                                                &(op->value), op->surety,
2076                                                op->source));
2077     g_value_unset(&(op->value));
2078 }
2079
2080 /* PropertyGetFns and PropertySetFns */
2081
2082 static gboolean
2083 property_get_block_size_fn(Device *dself,
2084     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2085     PropertySurety *surety, PropertySource *source)
2086 {
2087     RaitDevice *self = RAIT_DEVICE(dself);
2088     gsize my_block_size;
2089
2090     if (dself->block_size_source != PROPERTY_SOURCE_DEFAULT) {
2091         my_block_size = dself->block_size;
2092
2093         if (surety)
2094             *surety = dself->block_size_surety;
2095     } else {
2096         gsize child_block_size;
2097         child_block_size = calculate_block_size_from_children(self,
2098                                                     &my_block_size);
2099         if (child_block_size == 0)
2100             return FALSE;
2101
2102         if (surety)
2103             *surety = PROPERTY_SURETY_BAD; /* may still change */
2104     }
2105
2106     if (val) {
2107         g_value_unset_init(val, G_TYPE_INT);
2108         g_assert(my_block_size < G_MAXINT); /* gsize -> gint */
2109         g_value_set_int(val, (gint)my_block_size);
2110     }
2111
2112     if (source)
2113         *source = dself->block_size_source;
2114
2115     return TRUE;
2116 }
2117
2118 static gboolean
2119 property_set_block_size_fn(Device *dself,
2120     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2121     PropertySurety surety, PropertySource source)
2122 {
2123     RaitDevice *self = RAIT_DEVICE(dself);
2124     gint my_block_size = g_value_get_int(val);
2125     guint data_children;
2126
2127     find_simple_params(self, NULL, &data_children);
2128     if ((my_block_size % data_children) != 0) {
2129         device_set_error(dself,
2130             vstrallocf(_("Block size must be a multiple of %d"), data_children),
2131             DEVICE_STATUS_DEVICE_ERROR);
2132         return FALSE;
2133     }
2134
2135     dself->block_size = my_block_size;
2136     dself->block_size_source = source;
2137     dself->block_size_surety = surety;
2138
2139     if (!fix_block_size(self))
2140         return FALSE;
2141
2142     return TRUE;
2143 }
2144
2145 static gboolean
2146 property_get_canonical_name_fn(Device *dself,
2147     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2148     PropertySurety *surety, PropertySource *source)
2149 {
2150     RaitDevice *self = RAIT_DEVICE(dself);
2151     char *canonical = child_device_names_to_rait_name(self);
2152
2153     if (val) {
2154         g_value_unset_init(val, G_TYPE_STRING);
2155         g_value_set_string(val, canonical);
2156         g_free(canonical);
2157     }
2158
2159     if (surety)
2160         *surety = PROPERTY_SURETY_GOOD;
2161
2162     if (source)
2163         *source = PROPERTY_SOURCE_DETECTED;
2164
2165     return TRUE;
2166 }
2167
2168 static gboolean
2169 property_get_concurrency_fn(Device *dself,
2170     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2171     PropertySurety *surety, PropertySource *source)
2172 {
2173     RaitDevice *self = RAIT_DEVICE(dself);
2174     ConcurrencyParadigm result;
2175     guint i;
2176     GPtrArray * ops;
2177     gboolean success;
2178
2179     ops = make_property_op_array(self, PROPERTY_CONCURRENCY, NULL, 0, 0);
2180     do_rait_child_ops(self, property_get_do_op, ops);
2181
2182     /* find the most restrictive paradigm acceptable to all
2183      * child devices */
2184     result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
2185     success = TRUE;
2186     for (i = 0; i < ops->len; i ++) {
2187         ConcurrencyParadigm cur;
2188         PropertyOp * op = g_ptr_array_index(ops, i);
2189
2190         if (!op->base.result
2191             || G_VALUE_TYPE(&(op->value)) != CONCURRENCY_PARADIGM_TYPE) {
2192             success = FALSE;
2193             break;
2194         }
2195
2196         cur = g_value_get_enum(&(op->value));
2197         if (result == CONCURRENCY_PARADIGM_EXCLUSIVE ||
2198             cur == CONCURRENCY_PARADIGM_EXCLUSIVE) {
2199             result = CONCURRENCY_PARADIGM_EXCLUSIVE;
2200         } else if (result == CONCURRENCY_PARADIGM_SHARED_READ ||
2201                    cur == CONCURRENCY_PARADIGM_SHARED_READ) {
2202             result = CONCURRENCY_PARADIGM_SHARED_READ;
2203         } else if (result == CONCURRENCY_PARADIGM_RANDOM_ACCESS &&
2204                    cur == CONCURRENCY_PARADIGM_RANDOM_ACCESS) {
2205             result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
2206         } else {
2207             success = FALSE;
2208             break;
2209         }
2210     }
2211
2212     g_ptr_array_free_full(ops);
2213
2214     if (success) {
2215         if (val) {
2216             g_value_unset_init(val, CONCURRENCY_PARADIGM_TYPE);
2217             g_value_set_enum(val, result);
2218         }
2219
2220         if (surety)
2221             *surety = PROPERTY_SURETY_GOOD;
2222
2223         if (source)
2224             *source = PROPERTY_SOURCE_DETECTED;
2225     }
2226
2227     return success;
2228 }
2229
2230 static gboolean
2231 property_get_streaming_fn(Device *dself,
2232     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2233     PropertySurety *surety, PropertySource *source)
2234 {
2235     RaitDevice *self = RAIT_DEVICE(dself);
2236     StreamingRequirement result;
2237     guint i;
2238     GPtrArray * ops;
2239     gboolean success;
2240
2241     ops = make_property_op_array(self, PROPERTY_STREAMING, NULL, 0, 0);
2242     do_rait_child_ops(self, property_get_do_op, ops);
2243
2244     /* combine the child streaming requirements, selecting the strongest
2245      * requirement of the bunch. */
2246     result = STREAMING_REQUIREMENT_NONE;
2247     success = TRUE;
2248     for (i = 0; i < ops->len; i ++) {
2249         StreamingRequirement cur;
2250         PropertyOp * op = g_ptr_array_index(ops, i);
2251
2252         if (!op->base.result
2253             || G_VALUE_TYPE(&(op->value)) != STREAMING_REQUIREMENT_TYPE) {
2254             success = FALSE;
2255             break;
2256         }
2257
2258         cur = g_value_get_enum(&(op->value));
2259         if (result == STREAMING_REQUIREMENT_REQUIRED ||
2260             cur == STREAMING_REQUIREMENT_REQUIRED) {
2261             result = STREAMING_REQUIREMENT_REQUIRED;
2262         } else if (result == STREAMING_REQUIREMENT_DESIRED ||
2263                    cur == STREAMING_REQUIREMENT_DESIRED) {
2264             result = STREAMING_REQUIREMENT_DESIRED;
2265         } else if (result == STREAMING_REQUIREMENT_NONE &&
2266                    cur == STREAMING_REQUIREMENT_NONE) {
2267             result = STREAMING_REQUIREMENT_NONE;
2268         } else {
2269             success = FALSE;
2270             break;
2271         }
2272     }
2273
2274     g_ptr_array_free_full(ops);
2275
2276     if (success) {
2277         if (val) {
2278             g_value_unset_init(val, STREAMING_REQUIREMENT_TYPE);
2279             g_value_set_enum(val, result);
2280         }
2281
2282         if (surety)
2283             *surety = PROPERTY_SURETY_GOOD;
2284
2285         if (source)
2286             *source = PROPERTY_SOURCE_DETECTED;
2287     }
2288
2289     return success;
2290 }
2291
2292 static gboolean
2293 property_get_boolean_and_fn(Device *dself,
2294     DevicePropertyBase *base, GValue *val,
2295     PropertySurety *surety, PropertySource *source)
2296 {
2297     RaitDevice *self = RAIT_DEVICE(dself);
2298     gboolean result;
2299     guint i;
2300     GPtrArray * ops;
2301     gboolean success;
2302
2303     ops = make_property_op_array(self, base->ID, NULL, 0, 0);
2304     do_rait_child_ops(self, property_get_do_op, ops);
2305
2306     /* combine the child values, applying a simple AND */
2307     result = TRUE;
2308     success = TRUE;
2309     for (i = 0; i < ops->len; i ++) {
2310         PropertyOp * op = g_ptr_array_index(ops, i);
2311
2312         if (!op->base.result || !G_VALUE_HOLDS_BOOLEAN(&(op->value))) {
2313             success = FALSE;
2314             break;
2315         }
2316
2317         if (!g_value_get_boolean(&(op->value))) {
2318             result = FALSE;
2319             break;
2320         }
2321     }
2322
2323     g_ptr_array_free_full(ops);
2324
2325     if (success) {
2326         if (val) {
2327             g_value_unset_init(val, G_TYPE_BOOLEAN);
2328             g_value_set_boolean(val, result);
2329         }
2330
2331         if (surety)
2332             *surety = PROPERTY_SURETY_GOOD;
2333
2334         if (source)
2335             *source = PROPERTY_SOURCE_DETECTED;
2336     }
2337
2338     return success;
2339 }
2340
2341 static gboolean
2342 property_get_medium_access_type_fn(Device *dself,
2343     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2344     PropertySurety *surety, PropertySource *source)
2345 {
2346     RaitDevice *self = RAIT_DEVICE(dself);
2347     MediaAccessMode result;
2348     guint i;
2349     GPtrArray * ops;
2350     gboolean success;
2351
2352     ops = make_property_op_array(self, PROPERTY_MEDIUM_ACCESS_TYPE, NULL, 0, 0);
2353     do_rait_child_ops(self, property_get_do_op, ops);
2354
2355     /* combine the modes as best we can */
2356     result = 0;
2357     success = TRUE;
2358     for (i = 0; i < ops->len; i ++) {
2359         MediaAccessMode cur;
2360         PropertyOp * op = g_ptr_array_index(ops, i);
2361
2362         if (!op->base.result || G_VALUE_TYPE(&(op->value)) != MEDIA_ACCESS_MODE_TYPE) {
2363             success = FALSE;
2364             break;
2365         }
2366
2367         cur = g_value_get_enum(&(op->value));
2368
2369         if (i == 0) {
2370             result = cur;
2371         } else if ((result == MEDIA_ACCESS_MODE_READ_ONLY &&
2372                     cur == MEDIA_ACCESS_MODE_WRITE_ONLY) ||
2373                    (result == MEDIA_ACCESS_MODE_WRITE_ONLY &&
2374                     cur == MEDIA_ACCESS_MODE_READ_ONLY)) {
2375             /* Invalid combination; one device can only read, other
2376                can only write. */
2377             success = FALSE;
2378             break;
2379         } else if (result == MEDIA_ACCESS_MODE_READ_ONLY ||
2380                    cur == MEDIA_ACCESS_MODE_READ_ONLY) {
2381             result = MEDIA_ACCESS_MODE_READ_ONLY;
2382         } else if (result == MEDIA_ACCESS_MODE_WRITE_ONLY ||
2383                    cur == MEDIA_ACCESS_MODE_WRITE_ONLY) {
2384             result = MEDIA_ACCESS_MODE_WRITE_ONLY;
2385         } else if (result == MEDIA_ACCESS_MODE_WORM ||
2386                    cur == MEDIA_ACCESS_MODE_WORM) {
2387             result = MEDIA_ACCESS_MODE_WORM;
2388         } else if (result == MEDIA_ACCESS_MODE_READ_WRITE &&
2389                    cur == MEDIA_ACCESS_MODE_READ_WRITE) {
2390             result = MEDIA_ACCESS_MODE_READ_WRITE;
2391         } else {
2392             success = FALSE;
2393             break;
2394         }
2395     }
2396
2397     g_ptr_array_free_full(ops);
2398
2399     if (success) {
2400         if (val) {
2401             g_value_unset_init(val, MEDIA_ACCESS_MODE_TYPE);
2402             g_value_set_enum(val, result);
2403         }
2404
2405         if (surety)
2406             *surety = PROPERTY_SURETY_GOOD;
2407
2408         if (source)
2409             *source = PROPERTY_SOURCE_DETECTED;
2410     }
2411
2412     return success;
2413 }
2414
2415 static gboolean
2416 property_get_max_volume_usage_fn(Device *dself,
2417     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2418     PropertySurety *surety, PropertySource *source)
2419 {
2420     RaitDevice *self = RAIT_DEVICE(dself);
2421     guint64 result;
2422     guint i;
2423     GPtrArray * ops;
2424     guint data_children;
2425
2426     ops = make_property_op_array(self, PROPERTY_MAX_VOLUME_USAGE, NULL, 0, 0);
2427     do_rait_child_ops(self, property_get_do_op, ops);
2428
2429     /* look for the smallest value that is set */
2430     result = 0;
2431     for (i = 0; i < ops->len; i ++) {
2432         guint64 cur;
2433         PropertyOp * op = g_ptr_array_index(ops, i);
2434
2435         if (!op->base.result || !G_VALUE_HOLDS_UINT64(&(op->value))) {
2436             continue; /* ignore children without this property */
2437         }
2438
2439         cur = g_value_get_uint64(&(op->value));
2440
2441         if (!result || (cur && cur < result)) {
2442             result = cur;
2443         }
2444     }
2445
2446     g_ptr_array_free_full(ops);
2447
2448     if (result) {
2449         /* result contains the minimum usage on any child.  We can use that space
2450          * on each of our data children, so the total is larger */
2451         find_simple_params(self, NULL, &data_children);
2452         result *= data_children;
2453
2454         if (val) {
2455             g_value_unset_init(val, G_TYPE_UINT64);
2456             g_value_set_uint64(val, result);
2457         }
2458
2459         if (surety)
2460             *surety = PROPERTY_SURETY_GOOD;
2461
2462         if (source)
2463             *source = PROPERTY_SOURCE_DETECTED;
2464
2465         return TRUE;
2466     } else {
2467         /* no result from any children, so we effectively don't have this property */
2468         return FALSE;
2469     }
2470 }
2471
2472 static gboolean
2473 property_set_max_volume_usage_fn(Device *dself,
2474     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2475     PropertySurety surety, PropertySource source)
2476 {
2477     RaitDevice *self = RAIT_DEVICE(dself);
2478     guint64 parent_usage;
2479     guint64 child_usage;
2480     GValue child_val;
2481     guint i;
2482     gboolean success;
2483     GPtrArray * ops;
2484     guint data_children;
2485
2486     parent_usage = g_value_get_uint64(val);
2487     find_simple_params(self, NULL, &data_children);
2488
2489     child_usage = parent_usage / data_children;
2490
2491     bzero(&child_val, sizeof(child_val));
2492     g_value_init(&child_val, G_TYPE_UINT64);
2493     g_value_set_uint64(&child_val, child_usage);
2494
2495     ops = make_property_op_array(self, PROPERTY_MAX_VOLUME_USAGE,
2496                                 &child_val, surety, source);
2497     do_rait_child_ops(self, property_set_do_op, ops);
2498
2499     /* if any of the kids succeeded, then we did too */
2500     success = FALSE;
2501     for (i = 0; i < ops->len; i ++) {
2502         PropertyOp * op = g_ptr_array_index(ops, i);
2503
2504         if (op->base.result) {
2505             success = TRUE;
2506             break;
2507         }
2508     }
2509
2510     g_ptr_array_free_full(ops);
2511
2512     return success;
2513 }
2514
2515 typedef struct {
2516     GenericOp base;
2517     guint filenum;
2518 } RecycleFileOp;
2519
2520 /* A GFunc */
2521 static void recycle_file_do_op(gpointer data,
2522                                gpointer user_data G_GNUC_UNUSED) {
2523     RecycleFileOp * op = data;
2524     op->base.result =
2525         GINT_TO_POINTER(device_recycle_file(op->base.child, op->filenum));
2526 }
2527
2528 static gboolean
2529 rait_device_recycle_file (Device * dself, guint filenum) {
2530     GPtrArray * ops;
2531     guint i;
2532     gboolean success;
2533
2534     RaitDevice * self = RAIT_DEVICE(dself);
2535
2536     if (rait_device_in_error(self)) return FALSE;
2537
2538     ops = g_ptr_array_sized_new(self->private->children->len);
2539     for (i = 0; i < self->private->children->len; i ++) {
2540         RecycleFileOp * op;
2541         op = g_new(RecycleFileOp, 1);
2542         op->base.child = g_ptr_array_index(self->private->children, i);
2543         op->filenum = filenum;
2544         g_ptr_array_add(ops, op);
2545     }
2546
2547     do_rait_child_ops(self, recycle_file_do_op, ops);
2548
2549     success = g_ptr_array_and(ops, extract_boolean_generic_op);
2550
2551     g_ptr_array_free_full(ops);
2552
2553     if (!success) {
2554         /* TODO: be more specific here */
2555         device_set_error(dself,
2556             stralloc(_("One or more devices failed to recycle_file")),
2557             DEVICE_STATUS_DEVICE_ERROR);
2558         return FALSE;
2559     }
2560     return TRUE;
2561 }
2562
2563 /* GFunc */
2564 static void finish_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
2565     GenericOp * op = data;
2566     op->result = GINT_TO_POINTER(device_finish(op->child));
2567 }
2568
2569 static gboolean
2570 rait_device_finish (Device * self) {
2571     GPtrArray * ops;
2572     gboolean success;
2573     gboolean rval = TRUE;
2574
2575     rval = !rait_device_in_error(self);
2576
2577     ops = make_generic_boolean_op_array(RAIT_DEVICE(self));
2578
2579     do_rait_child_ops(RAIT_DEVICE(self), finish_do_op, ops);
2580
2581     success = g_ptr_array_and(ops, extract_boolean_generic_op);
2582     if (!success)
2583         rval = FALSE;
2584
2585     g_ptr_array_free_full(ops);
2586
2587     self->access_mode = ACCESS_NULL;
2588
2589     return rval;
2590 }
2591
2592 static Device *
2593 rait_device_factory (char * device_name, char * device_type, char * device_node) {
2594     Device * rval;
2595     g_assert(0 == strcmp(device_type, "rait"));
2596     rval = DEVICE(g_object_new(TYPE_RAIT_DEVICE, NULL));
2597     device_open_device(rval, device_name, device_type, device_node);
2598     return rval;
2599 }
2600
2601 void
2602 rait_device_register (void) {
2603     static const char * device_prefix_list[] = {"rait", NULL};
2604     register_device(rait_device_factory, device_prefix_list);
2605 }