Imported Upstream version 3.2.0
[debian/amanda] / device-src / rait-device.c
1 /*
2  * Copyright (c) 2007, 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  *
17  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 /* The RAIT device encapsulates some number of other devices into a single
22  * redundant device. */
23
24 #include "amanda.h"
25 #include "property.h"
26 #include "util.h"
27 #include <glib.h>
28 #include "glib-util.h"
29 #include "device.h"
30 #include "fileheader.h"
31 #include "semaphore.h"
32
33 /* Just a note about the failure mode of different operations:
34    - Recovers from a failure (enters degraded mode)
35      open_device()
36      seek_file() -- explodes if headers don't match.
37      seek_block() -- explodes if headers don't match.
38      read_block() -- explodes if data doesn't match.
39
40    - Operates in degraded mode (but dies if a new problem shows up)
41      read_label() -- but dies on label mismatch.
42      start() -- but dies when writing in degraded mode.
43      property functions
44      finish()
45
46    - Dies in degraded mode (even if remaining devices are OK)
47      start_file()
48      write_block()
49      finish_file()
50      recycle_file()
51 */
52
53 /*
54  * Type checking and casting macros
55  */
56 #define TYPE_RAIT_DEVICE        (rait_device_get_type())
57 #define RAIT_DEVICE(obj)        G_TYPE_CHECK_INSTANCE_CAST((obj), rait_device_get_type(), RaitDevice)
58 #define RAIT_DEVICE_CONST(obj)  G_TYPE_CHECK_INSTANCE_CAST((obj), rait_device_get_type(), RaitDevice const)
59 #define RAIT_DEVICE_CLASS(klass)        G_TYPE_CHECK_CLASS_CAST((klass), rait_device_get_type(), RaitDeviceClass)
60 #define IS_RAIT_DEVICE(obj)     G_TYPE_CHECK_INSTANCE_TYPE((obj), rait_device_get_type ())
61
62 #define RAIT_DEVICE_GET_CLASS(obj)      G_TYPE_INSTANCE_GET_CLASS((obj), rait_device_get_type(), RaitDeviceClass)
63 static GType    rait_device_get_type    (void);
64
65 /*
66  * Main object structure
67  */
68 typedef struct RaitDevice_s {
69     Device __parent__;
70
71     struct RaitDevicePrivate_s * private;
72 } RaitDevice;
73
74 /*
75  * Class definition
76  */
77 typedef struct _RaitDeviceClass RaitDeviceClass;
78 struct _RaitDeviceClass {
79     DeviceClass __parent__;
80 };
81
82 typedef enum {
83     RAIT_STATUS_COMPLETE, /* All subdevices OK. */
84     RAIT_STATUS_DEGRADED, /* One subdevice failed. */
85     RAIT_STATUS_FAILED    /* Two or more subdevices failed. */
86 } RaitStatus;
87
88 /* Older versions of glib have a deadlock in their thread pool implementations,
89  * so we include a simple thread-pool implementation here to replace it.
90  *
91  * This implementation assumes that threads are used for paralellizing a single
92  * operation, so all threads run a function to completion before the main thread
93  * continues.  This simplifies some of the locking semantics, and in particular
94  * there is no need to wait for stray threads to finish an operation when
95  * finalizing the RaitDevice object or when beginning a new operation.
96  */
97 #if !(GLIB_CHECK_VERSION(2,10,0))
98 #define USE_INTERNAL_THREADPOOL
99 #endif
100
101 typedef struct RaitDevicePrivate_s {
102     GPtrArray * children;
103     /* These flags are only relevant for reading. */
104     RaitStatus status;
105     /* If status == RAIT_STATUS_DEGRADED, this holds the index of the
106        failed node. It holds a negative number otherwise. */
107     int failed;
108
109     /* the child block size */
110     gsize child_block_size;
111
112 #ifdef USE_INTERNAL_THREADPOOL
113     /* array of ThreadInfo for performing parallel operations */
114     GArray *threads;
115
116     /* value of this semaphore is the number of threaded operations
117      * in progress */
118     semaphore_t *threads_sem;
119 #endif
120 } RaitDevicePrivate;
121
122 #ifdef USE_INTERNAL_THREADPOOL
123 typedef struct ThreadInfo {
124     GThread *thread;
125
126     /* struct fields below are protected by this mutex and condition variable */
127     GMutex *mutex;
128     GCond *cond;
129
130     gboolean die;
131     GFunc func;
132     gpointer data;
133
134     /* give threads access to active_threads and its mutex/cond */
135     struct RaitDevicePrivate_s *private;
136 } ThreadInfo;
137 #endif
138
139 /* This device uses a special sentinel node to indicate that the child devices
140  * will be set later (in rait_device_open).  It contains a control character to
141  * make it difficult to enter accidentally in an Amanda config. */
142 #define DEFER_CHILDREN_SENTINEL "DEFER\1"
143
144 #define PRIVATE(o) (o->private)
145
146 #define rait_device_in_error(dev) \
147     (device_in_error((dev)) || PRIVATE(RAIT_DEVICE((dev)))->status == RAIT_STATUS_FAILED)
148
149 void rait_device_register (void);
150
151 /* here are local prototypes */
152 static void rait_device_init (RaitDevice * o);
153 static void rait_device_class_init (RaitDeviceClass * c);
154 static void rait_device_base_init (RaitDeviceClass * c);
155 static void rait_device_open_device (Device * self, char * device_name, char * device_type, char * device_node);
156 static gboolean rait_device_start (Device * self, DeviceAccessMode mode,
157                                    char * label, char * timestamp);
158 static gboolean rait_device_configure(Device * self, gboolean use_global_config);
159 static gboolean rait_device_start_file(Device * self, dumpfile_t * info);
160 static gboolean rait_device_write_block (Device * self, guint size, gpointer data);
161 static gboolean rait_device_finish_file (Device * self);
162 static dumpfile_t * rait_device_seek_file (Device * self, guint file);
163 static gboolean rait_device_seek_block (Device * self, guint64 block);
164 static int      rait_device_read_block (Device * self, gpointer buf,
165                                         int * size);
166 static gboolean rait_device_recycle_file (Device * self, guint filenum);
167 static gboolean rait_device_finish (Device * self);
168 static DeviceStatusFlags rait_device_read_label(Device * dself);
169 static void find_simple_params(RaitDevice * self, guint * num_children,
170                                guint * data_children);
171
172 /* property handlers */
173
174 static gboolean property_get_block_size_fn(Device *self,
175     DevicePropertyBase *base, GValue *val,
176     PropertySurety *surety, PropertySource *source);
177
178 static gboolean property_set_block_size_fn(Device *self,
179     DevicePropertyBase *base, GValue *val,
180     PropertySurety surety, PropertySource source);
181
182 static gboolean property_get_canonical_name_fn(Device *self,
183     DevicePropertyBase *base, GValue *val,
184     PropertySurety *surety, PropertySource *source);
185
186 static gboolean property_get_concurrency_fn(Device *self,
187     DevicePropertyBase *base, GValue *val,
188     PropertySurety *surety, PropertySource *source);
189
190 static gboolean property_get_streaming_fn(Device *self,
191     DevicePropertyBase *base, GValue *val,
192     PropertySurety *surety, PropertySource *source);
193
194 static gboolean property_get_boolean_and_fn(Device *self,
195     DevicePropertyBase *base, GValue *val,
196     PropertySurety *surety, PropertySource *source);
197
198 static gboolean property_get_medium_access_type_fn(Device *self,
199     DevicePropertyBase *base, GValue *val,
200     PropertySurety *surety, PropertySource *source);
201
202 static gboolean property_get_max_volume_usage_fn(Device *self,
203     DevicePropertyBase *base, GValue *val,
204     PropertySurety *surety, PropertySource *source);
205
206 static gboolean property_set_max_volume_usage_fn(Device *self,
207     DevicePropertyBase *base, GValue *val,
208     PropertySurety surety, PropertySource source);
209
210
211 /* pointer to the class of our parent */
212 static DeviceClass *parent_class = NULL;
213
214 static GType
215 rait_device_get_type (void)
216 {
217     static GType type = 0;
218
219     if G_UNLIKELY(type == 0) {
220         static const GTypeInfo info = {
221             sizeof (RaitDeviceClass),
222             (GBaseInitFunc) rait_device_base_init,
223             (GBaseFinalizeFunc) NULL,
224             (GClassInitFunc) rait_device_class_init,
225             (GClassFinalizeFunc) NULL,
226             NULL /* class_data */,
227             sizeof (RaitDevice),
228             0 /* n_preallocs */,
229             (GInstanceInitFunc) rait_device_init,
230             NULL
231         };
232
233         type = g_type_register_static (TYPE_DEVICE, "RaitDevice", &info,
234                                        (GTypeFlags)0);
235         }
236
237     return type;
238 }
239
240 static void g_object_unref_foreach(gpointer data,
241                                    gpointer user_data G_GNUC_UNUSED) {
242     if (data != NULL && G_IS_OBJECT(data)) {
243         g_object_unref(data);
244     }
245 }
246
247 static void
248 rait_device_finalize(GObject *obj_self)
249 {
250     RaitDevice *self = RAIT_DEVICE (obj_self);
251     if(G_OBJECT_CLASS(parent_class)->finalize) \
252            (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
253     if(self->private->children) {
254         g_ptr_array_foreach(self->private->children,
255                             g_object_unref_foreach, NULL);
256         g_ptr_array_free (self->private->children, TRUE);
257         self->private->children = NULL;
258     }
259 #ifdef USE_INTERNAL_THREADPOOL
260     g_assert(PRIVATE(self)->threads_sem == NULL || PRIVATE(self)->threads_sem->value == 0);
261
262     if (PRIVATE(self)->threads) {
263         guint i;
264
265         for (i = 0; i < PRIVATE(self)->threads->len; i++) {
266             ThreadInfo *inf = &g_array_index(PRIVATE(self)->threads, ThreadInfo, i);
267             if (inf->thread) {
268                 /* NOTE: the thread is waiting on this condition right now, not
269                  * executing an operation. */
270
271                 /* ask the thread to die */
272                 g_mutex_lock(inf->mutex);
273                 inf->die = TRUE;
274                 g_cond_signal(inf->cond);
275                 g_mutex_unlock(inf->mutex);
276
277                 /* and wait for it to die, which should happen soon */
278                 g_thread_join(inf->thread);
279             }
280
281             if (inf->mutex)
282                 g_mutex_free(inf->mutex);
283             if (inf->cond)
284                 g_cond_free(inf->cond);
285         }
286     }
287
288     if (PRIVATE(self)->threads_sem)
289         semaphore_free(PRIVATE(self)->threads_sem);
290 #endif
291     amfree(self->private);
292 }
293
294 static void
295 rait_device_init (RaitDevice * o G_GNUC_UNUSED)
296 {
297     PRIVATE(o) = g_new(RaitDevicePrivate, 1);
298     PRIVATE(o)->children = g_ptr_array_new();
299     PRIVATE(o)->status = RAIT_STATUS_COMPLETE;
300     PRIVATE(o)->failed = -1;
301 #ifdef USE_INTERNAL_THREADPOOL
302     PRIVATE(o)->threads = NULL;
303     PRIVATE(o)->threads_sem = NULL;
304 #endif
305 }
306
307 static void
308 rait_device_class_init (RaitDeviceClass * c)
309 {
310     GObjectClass *g_object_class = (GObjectClass*) c;
311     DeviceClass *device_class = (DeviceClass *)c;
312
313     parent_class = g_type_class_ref (TYPE_DEVICE);
314
315     device_class->open_device = rait_device_open_device;
316     device_class->configure = rait_device_configure;
317     device_class->start = rait_device_start;
318     device_class->start_file = rait_device_start_file;
319     device_class->write_block = rait_device_write_block;
320     device_class->finish_file = rait_device_finish_file;
321     device_class->seek_file = rait_device_seek_file;
322     device_class->seek_block = rait_device_seek_block;
323     device_class->read_block = rait_device_read_block;
324     device_class->recycle_file = rait_device_recycle_file;
325     device_class->finish = rait_device_finish;
326     device_class->read_label = rait_device_read_label;
327
328     g_object_class->finalize = rait_device_finalize;
329
330 #ifndef USE_INTERNAL_THREADPOOL
331 #if !GLIB_CHECK_VERSION(2,10,2)
332     /* Versions of glib before 2.10.2 crash if
333      * g_thread_pool_set_max_unused_threads is called before the first
334      * invocation of g_thread_pool_new.  So we make up a thread pool, but don't
335      * start any threads in it, and free it */
336     {
337         GThreadPool *pool = g_thread_pool_new((GFunc)-1, NULL, -1, FALSE, NULL);
338         g_thread_pool_free(pool, TRUE, FALSE);
339     }
340 #endif
341
342     g_thread_pool_set_max_unused_threads(-1);
343 #endif
344 }
345
346 static void
347 rait_device_base_init (RaitDeviceClass * c)
348 {
349     DeviceClass *device_class = (DeviceClass *)c;
350
351     /* the RAIT device overrides most of the standard properties, so that it
352      * can calculate them by querying the same property on the children */
353     device_class_register_property(device_class, PROPERTY_BLOCK_SIZE,
354             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
355             property_get_block_size_fn,
356             property_set_block_size_fn);
357
358     device_class_register_property(device_class, PROPERTY_CANONICAL_NAME,
359             PROPERTY_ACCESS_GET_MASK,
360             property_get_canonical_name_fn, NULL);
361
362     device_class_register_property(device_class, PROPERTY_CONCURRENCY,
363             PROPERTY_ACCESS_GET_MASK,
364             property_get_concurrency_fn, NULL);
365
366     device_class_register_property(device_class, PROPERTY_STREAMING,
367             PROPERTY_ACCESS_GET_MASK,
368             property_get_streaming_fn, NULL);
369
370     device_class_register_property(device_class, PROPERTY_APPENDABLE,
371             PROPERTY_ACCESS_GET_MASK,
372             property_get_boolean_and_fn, NULL);
373
374     device_class_register_property(device_class, PROPERTY_PARTIAL_DELETION,
375             PROPERTY_ACCESS_GET_MASK,
376             property_get_boolean_and_fn, NULL);
377
378     device_class_register_property(device_class, PROPERTY_FULL_DELETION,
379             PROPERTY_ACCESS_GET_MASK,
380             property_get_boolean_and_fn, NULL);
381
382     device_class_register_property(device_class, PROPERTY_LEOM,
383             PROPERTY_ACCESS_GET_MASK,
384             property_get_boolean_and_fn, NULL);
385
386     device_class_register_property(device_class, PROPERTY_MEDIUM_ACCESS_TYPE,
387             PROPERTY_ACCESS_GET_MASK,
388             property_get_medium_access_type_fn, NULL);
389
390     device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
391             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
392             property_get_max_volume_usage_fn,
393             property_set_max_volume_usage_fn);
394 }
395
396 /* This function does something a little clever and a little
397  * complicated. It takes an array of operations and runs the given
398  * function on each element in the array. The trick is that it runs them
399  * all in parallel, in different threads. This is more efficient than it
400  * sounds because we use a GThreadPool, which means calling this function
401  * will probably not start any new threads at all, but rather use
402  * existing ones. The func is called with two gpointer arguments: The
403  * first from the array, the second is the data argument.
404  *
405  * When it returns, all the operations have been successfully
406  * executed. If you want results from your operations, do it yourself
407  * through the array.
408  */
409
410 #ifdef USE_INTERNAL_THREADPOOL
411 static gpointer rait_thread_pool_func(gpointer data) {
412     ThreadInfo *inf = data;
413
414     g_mutex_lock(inf->mutex);
415     while (TRUE) {
416         while (!inf->die && !inf->func)
417             g_cond_wait(inf->cond, inf->mutex);
418
419         if (inf->die)
420             break;
421
422         if (inf->func) {
423             /* invoke the function */
424             inf->func(inf->data, NULL);
425             inf->func = NULL;
426             inf->data = NULL;
427
428             /* indicate that we're finished; will not block */
429             semaphore_down(inf->private->threads_sem);
430         }
431     }
432     g_mutex_unlock(inf->mutex);
433     return NULL;
434 }
435
436 static void do_thread_pool_op(RaitDevice *self, GFunc func, GPtrArray * ops) {
437     guint i;
438
439     if (PRIVATE(self)->threads_sem == NULL)
440         PRIVATE(self)->threads_sem = semaphore_new_with_value(0);
441
442     if (PRIVATE(self)->threads == NULL)
443         PRIVATE(self)->threads = g_array_sized_new(FALSE, TRUE,
444                                             sizeof(ThreadInfo), ops->len);
445
446     g_assert(PRIVATE(self)->threads_sem->value == 0);
447
448     if (PRIVATE(self)->threads->len < ops->len)
449         g_array_set_size(PRIVATE(self)->threads, ops->len);
450
451     /* the semaphore will hit zero when each thread has decremented it */
452     semaphore_force_set(PRIVATE(self)->threads_sem, ops->len);
453
454     for (i = 0; i < ops->len; i++) {
455         ThreadInfo *inf = &g_array_index(PRIVATE(self)->threads, ThreadInfo, i);
456         if (!inf->thread) {
457             inf->mutex = g_mutex_new();
458             inf->cond = g_cond_new();
459             inf->private = PRIVATE(self);
460             inf->thread = g_thread_create(rait_thread_pool_func, inf, TRUE, NULL);
461         }
462
463         /* set up the info the thread needs and trigger it to start */
464         g_mutex_lock(inf->mutex);
465         inf->data = g_ptr_array_index(ops, i);
466         inf->func = func;
467         g_cond_signal(inf->cond);
468         g_mutex_unlock(inf->mutex);
469     }
470
471     /* wait until semaphore hits zero */
472     semaphore_wait_empty(PRIVATE(self)->threads_sem);
473 }
474
475 #else /* USE_INTERNAL_THREADPOOL */
476
477 static void do_thread_pool_op(RaitDevice *self G_GNUC_UNUSED, GFunc func, GPtrArray * ops) {
478     GThreadPool * pool;
479     guint i;
480
481     pool = g_thread_pool_new(func, NULL, -1, FALSE, NULL);
482     for (i = 0; i < ops->len; i ++) {
483         g_thread_pool_push(pool, g_ptr_array_index(ops, i), NULL);
484     }
485
486     g_thread_pool_free(pool, FALSE, TRUE);
487 }
488
489 #endif /* USE_INTERNAL_THREADPOOL */
490
491 /* This does the above, in a serial fashion (and without using threads) */
492 static void do_unthreaded_ops(RaitDevice *self G_GNUC_UNUSED, GFunc func, GPtrArray * ops) {
493     guint i;
494
495     for (i = 0; i < ops->len; i ++) {
496         func(g_ptr_array_index(ops, i), NULL);
497     }
498 }
499
500 /* This is the one that code below should call. It switches
501    automatically between do_thread_pool_op and do_unthreaded_ops,
502    depending on g_thread_supported(). */
503 static void do_rait_child_ops(RaitDevice *self, GFunc func, GPtrArray * ops) {
504     if (g_thread_supported()) {
505         do_thread_pool_op(self, func, ops);
506     } else {
507         do_unthreaded_ops(self, func, ops);
508     }
509 }
510
511 static char *
512 child_device_names_to_rait_name(RaitDevice * self) {
513     GPtrArray *kids;
514     char *braced, *result;
515     guint i;
516
517     kids = g_ptr_array_sized_new(self->private->children->len);
518     for (i = 0; i < self->private->children->len; i ++) {
519         Device *child = g_ptr_array_index(self->private->children, i);
520         const char *child_name = NULL;
521         GValue val;
522         gboolean got_prop = FALSE;
523
524         bzero(&val, sizeof(val));
525
526         if ((signed)i != self->private->failed) {
527             if (device_property_get(child, PROPERTY_CANONICAL_NAME, &val)) {
528                 child_name = g_value_get_string(&val);
529                 got_prop = TRUE;
530             }
531         }
532
533         if (!got_prop)
534             child_name = "MISSING";
535
536         g_ptr_array_add(kids, g_strdup(child_name));
537
538         if (got_prop)
539             g_value_unset(&val);
540     }
541
542     braced = collapse_braced_alternates(kids);
543     result = g_strdup_printf("rait:%s", braced);
544     g_free(braced);
545
546     return result;
547 }
548
549 /* Find a workable child block size, based on the block size ranges of our
550  * child devices.
551  *
552  * The algorithm is to construct the intersection of all child devices'
553  * [min,max] block size ranges, and then pick the block size closest to 32k
554  * that is in the resulting range.  This avoids picking ridiculously small (1
555  * byte) or large (INT_MAX) block sizes when using devices with wide-open block
556  * size ranges.
557
558  * This function returns the calculated child block size directly, and the RAIT
559  * device's blocksize via rait_size, if not NULL.  It is resilient to errors in
560  * a single child device, but sets the device's error status and returns 0 if
561  * it cannot determine an agreeable block size.
562  */
563 static gsize
564 calculate_block_size_from_children(RaitDevice * self, gsize *rait_size)
565 {
566     gsize min = 0;
567     gsize max = SIZE_MAX;
568     gboolean found_one = FALSE;
569     gsize result;
570     guint i;
571
572     for (i = 0; i < self->private->children->len; i ++) {
573         gsize child_min = SIZE_MAX, child_max = 0;
574         Device *child;
575         GValue property_result;
576         PropertySource source;
577
578         bzero(&property_result, sizeof(property_result));
579
580         if ((signed)i == self->private->failed)
581             continue;
582
583         child = g_ptr_array_index(self->private->children, i);
584         if (!device_property_get_ex(child, PROPERTY_BLOCK_SIZE,
585                                  &property_result, NULL, &source)) {
586             g_warning("Error getting BLOCK_SIZE from %s: %s",
587                     child->device_name, device_error_or_status(child));
588             continue;
589         }
590
591         /* if the block size has been set explicitly, then we need to use that blocksize;
592          * otherwise (even if it was DETECTED), override it. */
593         if (source == PROPERTY_SOURCE_USER) {
594             child_min = child_max = g_value_get_int(&property_result);
595         } else {
596             if (!device_property_get(child, PROPERTY_MIN_BLOCK_SIZE,
597                                      &property_result)) {
598                 g_warning("Error getting MIN_BLOCK_SIZE from %s: %s",
599                         child->device_name, device_error_or_status(child));
600                 continue;
601             }
602             child_min = g_value_get_uint(&property_result);
603
604             if (!device_property_get(child, PROPERTY_MAX_BLOCK_SIZE,
605                                      &property_result)) {
606                 g_warning("Error getting MAX_BLOCK_SIZE from %s: %s",
607                         child->device_name, device_error_or_status(child));
608                 continue;
609             }
610             child_max = g_value_get_uint(&property_result);
611
612             if (child_min == 0 || child_max == 0 || (child_min > child_max)) {
613                 g_warning("Invalid min, max block sizes from %s", child->device_name);
614                 continue;
615             }
616         }
617
618         found_one = TRUE;
619         min = MAX(min, child_min);
620         max = MIN(max, child_max);
621     }
622
623     if (!found_one) {
624         device_set_error((Device*)self,
625             stralloc(_("Could not find any child devices' block size ranges")),
626             DEVICE_STATUS_DEVICE_ERROR);
627         return 0;
628     }
629
630     if (min > max) {
631         device_set_error((Device*)self,
632             stralloc(_("No block size is acceptable to all child devices")),
633             DEVICE_STATUS_DEVICE_ERROR);
634         return 0;
635     }
636
637     /* Now pick a number.  If 32k is in range, we use that; otherwise, we use
638      * the nearest acceptable size. */
639     result = CLAMP(32768, min, max);
640
641     if (rait_size) {
642         guint data_children;
643         find_simple_params(self, NULL, &data_children);
644         *rait_size = result * data_children;
645     }
646
647     return result;
648 }
649
650 /* Set BLOCK_SIZE on all children */
651 static gboolean
652 set_block_size_on_children(RaitDevice *self, gsize child_block_size)
653 {
654     GValue val;
655     guint i;
656     PropertySource source;
657
658     bzero(&val, sizeof(val));
659
660     g_assert(child_block_size < INT_MAX);
661     g_value_init(&val, G_TYPE_INT);
662     g_value_set_int(&val, (gint)child_block_size);
663
664     for (i = 0; i < self->private->children->len; i ++) {
665         Device *child;
666         GValue property_result;
667
668         bzero(&property_result, sizeof(property_result));
669
670         if ((signed)i == self->private->failed)
671             continue;
672
673         child = g_ptr_array_index(self->private->children, i);
674
675         /* first, make sure the block size is at its default, or is already
676          * correct */
677         if (device_property_get_ex(child, PROPERTY_BLOCK_SIZE,
678                                  &property_result, NULL, &source)) {
679             gsize from_child = g_value_get_int(&property_result);
680             g_value_unset(&property_result);
681             if (source != PROPERTY_SOURCE_DEFAULT
682                     && from_child != child_block_size) {
683                 device_set_error((Device *)self,
684                     vstrallocf(_("Child device %s already has its block size set to %zd, not %zd"),
685                                 child->device_name, from_child, child_block_size),
686                     DEVICE_STATUS_DEVICE_ERROR);
687                 return FALSE;
688             }
689         } else {
690             /* failing to get the block size isn't necessarily fatal.. */
691             g_warning("Error getting BLOCK_SIZE from %s: %s",
692                     child->device_name, device_error_or_status(child));
693         }
694
695         if (!device_property_set(child, PROPERTY_BLOCK_SIZE, &val)) {
696             device_set_error((Device *)self,
697                 vstrallocf(_("Error setting block size on %s"), child->device_name),
698                 DEVICE_STATUS_DEVICE_ERROR);
699             return FALSE;
700         }
701     }
702
703     return TRUE;
704 }
705
706 /* The time for users to specify block sizes has ended; set this device's
707  * block-size attributes for easy access by other RAIT functions.  Returns
708  * FALSE on error, with the device's error status already set. */
709 static gboolean
710 fix_block_size(RaitDevice *self)
711 {
712     Device *dself = (Device *)self;
713     gsize my_block_size, child_block_size;
714
715     if (dself->block_size_source == PROPERTY_SOURCE_DEFAULT) {
716         child_block_size = calculate_block_size_from_children(self, &my_block_size);
717         if (child_block_size == 0)
718             return FALSE;
719
720         self->private->child_block_size = child_block_size;
721         dself->block_size = my_block_size;
722         dself->block_size_surety = PROPERTY_SURETY_GOOD;
723         dself->block_size_source = PROPERTY_SOURCE_DETECTED;
724     } else {
725         guint data_children;
726
727         find_simple_params(self, NULL, &data_children);
728         g_assert((dself->block_size % data_children) == 0);
729         child_block_size = dself->block_size / data_children;
730     }
731
732     /* now tell the children we mean it */
733     if (!set_block_size_on_children(self, child_block_size))
734         return FALSE;
735
736     return TRUE;
737 }
738
739 /* This structure contains common fields for many operations. Not all
740    operations use all fields, however. */
741 typedef struct {
742     gpointer result; /* May be a pointer; may be an integer or boolean
743                         stored with GINT_TO_POINTER. */
744     Device * child;  /* The device in question. Used by all
745                         operations. */
746     guint child_index; /* For recoverable operations (read-related
747                           operations), this field provides the number
748                           of this child in the self->private->children
749                           array. */
750 } GenericOp;
751
752 typedef gboolean (*BooleanExtractor)(gpointer data);
753
754 /* A BooleanExtractor */
755 static gboolean extract_boolean_generic_op(gpointer data) {
756     GenericOp * op = data;
757     return GPOINTER_TO_INT(op->result);
758 }
759
760 /* A BooleanExtractor */
761 static gboolean extract_boolean_pointer_op(gpointer data) {
762     GenericOp * op = data;
763     return op->result != NULL;
764 }
765
766 /* Does the equivalent of this perl command:
767      ! (first { !extractor($_) } @_
768    That is, calls extractor on each element of the array, and returns
769    TRUE if and only if all calls to extractor return TRUE. This function
770    stops as soon as an extractor returns false, so it's best if extractor
771    functions have no side effects.
772 */
773 static gboolean g_ptr_array_and(GPtrArray * array,
774                                 BooleanExtractor extractor) {
775     guint i;
776     if (array == NULL || array->len <= 0)
777         return FALSE;
778
779     for (i = 0; i < array->len; i ++) {
780         if (!extractor(g_ptr_array_index(array, i)))
781             return FALSE;
782     }
783
784     return TRUE;
785 }
786
787 /* Takes a RaitDevice, and makes a GPtrArray of GenericOp. */
788 static GPtrArray * make_generic_boolean_op_array(RaitDevice* self) {
789     GPtrArray * rval;
790     guint i;
791
792     rval = g_ptr_array_sized_new(self->private->children->len);
793     for (i = 0; i < self->private->children->len; i ++) {
794         GenericOp * op;
795
796         if ((signed)i == self->private->failed) {
797             continue;
798         }
799
800         op = g_new(GenericOp, 1);
801         op->child = g_ptr_array_index(self->private->children, i);
802         op->child_index = i;
803         g_ptr_array_add(rval, op);
804     }
805
806     return rval;
807 }
808
809 /* Takes a GPtrArray of GenericOp, and a BooleanExtractor, and does
810    all the proper handling for the result of operations that allow
811    device isolation. Returns FALSE only if an unrecoverable error
812    occured. */
813 static gboolean g_ptr_array_union_robust(RaitDevice * self, GPtrArray * ops,
814                                          BooleanExtractor extractor) {
815     int nfailed = 0;
816     int lastfailed = 0;
817     guint i;
818
819     /* We found one or more failed elements.  See which elements failed, and
820      * isolate them*/
821     for (i = 0; i < ops->len; i ++) {
822         GenericOp * op = g_ptr_array_index(ops, i);
823         if (!extractor(op)) {
824             self->private->failed = op->child_index;
825             g_warning("RAIT array %s isolated device %s: %s",
826                     DEVICE(self)->device_name,
827                     op->child->device_name,
828                     device_error(op->child));
829             nfailed++;
830             lastfailed = i;
831         }
832     }
833
834     /* no failures? great! */
835     if (nfailed == 0)
836         return TRUE;
837
838     /* a single failure in COMPLETE just puts us in DEGRADED mode */
839     if (self->private->status == RAIT_STATUS_COMPLETE && nfailed == 1) {
840         self->private->status = RAIT_STATUS_DEGRADED;
841         self->private->failed = lastfailed;
842         g_warning("RAIT array %s DEGRADED", DEVICE(self)->device_name);
843         return TRUE;
844     } else {
845         self->private->status = RAIT_STATUS_FAILED;
846         g_warning("RAIT array %s FAILED", DEVICE(self)->device_name);
847         return FALSE;
848     }
849 }
850
851 typedef struct {
852     RaitDevice * self;
853     char *rait_name;
854     char * device_name; /* IN */
855     Device * result;    /* OUT */
856 } OpenDeviceOp;
857
858 /* A GFunc. */
859 static void device_open_do_op(gpointer data,
860                               gpointer user_data G_GNUC_UNUSED) {
861     OpenDeviceOp * op = data;
862
863     if (strcmp(op->device_name, "ERROR") == 0 ||
864         strcmp(op->device_name, "MISSING") == 0 ||
865         strcmp(op->device_name, "DEGRADED") == 0) {
866         g_warning("RAIT device %s contains a missing element, attempting "
867                   "degraded mode.\n", op->rait_name);
868         op->result = NULL;
869     } else {
870         op->result = device_open(op->device_name);
871     }
872 }
873
874 /* Returns TRUE if and only if the volume label and time are equal. */
875 static gboolean compare_volume_results(Device * a, Device * b) {
876     return (0 == compare_possibly_null_strings(a->volume_time, b->volume_time)
877          && 0 == compare_possibly_null_strings(a->volume_label, b->volume_label));
878 }
879
880 /* Stickes new_message at the end of *old_message; frees new_message and
881  * may change *old_message. */
882 static void append_message(char ** old_message, char * new_message) {
883     char * rval;
884     if (*old_message == NULL || **old_message == '\0') {
885         rval = new_message;
886     } else {
887         rval = g_strdup_printf("%s; %s", *old_message, new_message);
888         amfree(new_message);
889     }
890     amfree(*old_message);
891     *old_message = rval;
892 }
893
894 static gboolean
895 open_child_devices (Device * dself, char * device_name,
896             char * device_node) {
897     GPtrArray *device_names;
898     GPtrArray * device_open_ops;
899     guint i;
900     gboolean failure;
901     char *failure_errmsgs;
902     DeviceStatusFlags failure_flags;
903     RaitDevice * self;
904
905     self = RAIT_DEVICE(dself);
906
907     device_names = expand_braced_alternates(device_node);
908
909     if (device_names == NULL) {
910         device_set_error(dself,
911             vstrallocf(_("Invalid RAIT device name '%s'"), device_name),
912             DEVICE_STATUS_DEVICE_ERROR);
913         return FALSE;
914     }
915
916     /* Open devices in a separate thread, in case they have to rewind etc. */
917     device_open_ops = g_ptr_array_new();
918
919     for (i = 0; i < device_names->len; i++) {
920         OpenDeviceOp *op;
921         char *name = g_ptr_array_index(device_names, i);
922
923         op = g_new(OpenDeviceOp, 1);
924         op->device_name = name;
925         op->result = NULL;
926         op->self = self;
927         op->rait_name = device_name;
928         g_ptr_array_add(device_open_ops, op);
929     }
930
931     g_ptr_array_free(device_names, TRUE);
932     do_rait_child_ops(self, device_open_do_op, device_open_ops);
933
934     failure = FALSE;
935     failure_errmsgs = NULL;
936     failure_flags = 0;
937
938     /* Check results of opening devices. */
939     for (i = 0; i < device_open_ops->len; i ++) {
940         OpenDeviceOp *op = g_ptr_array_index(device_open_ops, i);
941
942         if (op->result != NULL &&
943             op->result->status == DEVICE_STATUS_SUCCESS) {
944             g_ptr_array_add(self->private->children, op->result);
945         } else {
946             char * this_failure_errmsg =
947                 g_strdup_printf("%s: %s", op->device_name,
948                                 device_error_or_status(op->result));
949             DeviceStatusFlags status =
950                 op->result == NULL ?
951                     DEVICE_STATUS_DEVICE_ERROR : op->result->status;
952             append_message(&failure_errmsgs,
953                            strdup(this_failure_errmsg));
954             failure_flags |= status;
955             if (self->private->status == RAIT_STATUS_COMPLETE) {
956                 /* The first failure just puts us in degraded mode. */
957                 g_warning("%s: %s",
958                           device_name, this_failure_errmsg);
959                 g_warning("%s: %s failed, entering degraded mode.",
960                           device_name, op->device_name);
961                 g_ptr_array_add(self->private->children, op->result);
962                 self->private->status = RAIT_STATUS_DEGRADED;
963                 self->private->failed = i;
964             } else {
965                 /* The second and further failures are fatal. */
966                 failure = TRUE;
967             }
968         }
969         amfree(op->device_name);
970     }
971
972     g_ptr_array_free_full(device_open_ops);
973
974     if (failure) {
975         self->private->status = RAIT_STATUS_FAILED;
976         device_set_error(dself, failure_errmsgs, failure_flags);
977         return FALSE;
978     }
979
980     return TRUE;
981 }
982
983 static void
984 rait_device_open_device (Device * dself, char * device_name,
985             char * device_type G_GNUC_UNUSED, char * device_node) {
986
987     if (0 != strcmp(device_node, DEFER_CHILDREN_SENTINEL)) {
988         if (!open_child_devices(dself, device_name, device_node))
989             return;
990
991         /* Chain up. */
992         if (parent_class->open_device) {
993             parent_class->open_device(dself, device_name, device_type, device_node);
994         }
995     }
996 }
997
998 Device *
999 rait_device_open_from_children (GSList *child_devices) {
1000     Device *dself;
1001     RaitDevice *self;
1002     GSList *iter;
1003     char *device_name;
1004     int nfailures;
1005     int i;
1006
1007     /* first, open a RAIT device using the DEFER_CHILDREN_SENTINEL */
1008     dself = device_open("rait:" DEFER_CHILDREN_SENTINEL);
1009     if (!IS_RAIT_DEVICE(dself)) {
1010         return dself;
1011     }
1012
1013     /* set its children */
1014     self = RAIT_DEVICE(dself);
1015     nfailures = 0;
1016     for (i=0, iter = child_devices; iter; i++, iter = iter->next) {
1017         Device *kid = iter->data;
1018
1019         /* a NULL kid is OK -- it opens the device in degraded mode */
1020         if (!kid) {
1021             nfailures++;
1022             self->private->failed = i;
1023         } else {
1024             g_assert(IS_DEVICE(kid));
1025             g_object_ref((GObject *)kid);
1026         }
1027
1028         g_ptr_array_add(self->private->children, kid);
1029     }
1030
1031     /* and set the status based on the children */
1032     switch (nfailures) {
1033         case 0:
1034             self->private->status = RAIT_STATUS_COMPLETE;
1035             break;
1036
1037         case 1:
1038             self->private->status = RAIT_STATUS_DEGRADED;
1039             break;
1040
1041         default:
1042             self->private->status = RAIT_STATUS_FAILED;
1043             device_set_error(dself,
1044                     _("more than one child device is missing"),
1045                     DEVICE_STATUS_DEVICE_ERROR);
1046             break;
1047     }
1048
1049     /* create a name from the children's names and use it to chain up
1050      * to open_device (we skipped this step in rait_device_open_device) */
1051     device_name = child_device_names_to_rait_name(self);
1052
1053     if (parent_class->open_device) {
1054         parent_class->open_device(dself,
1055             device_name, "rait",
1056             device_name+5); /* (+5 skips "rait:") */
1057     }
1058
1059     return dself;
1060 }
1061
1062 /* A GFunc. */
1063 static void read_label_do_op(gpointer data,
1064                              gpointer user_data G_GNUC_UNUSED) {
1065     GenericOp * op = data;
1066     op->result = GINT_TO_POINTER(device_read_label(op->child));
1067 }
1068
1069 static DeviceStatusFlags rait_device_read_label(Device * dself) {
1070     RaitDevice * self;
1071     GPtrArray * ops;
1072     DeviceStatusFlags failed_result = 0;
1073     char *failed_errmsg = NULL;
1074     unsigned int i;
1075     Device * first_success = NULL;
1076
1077     self = RAIT_DEVICE(dself);
1078
1079     amfree(dself->volume_time);
1080     amfree(dself->volume_label);
1081     dumpfile_free(dself->volume_header);
1082     dself->volume_header = NULL;
1083
1084     if (rait_device_in_error(self))
1085         return dself->status | DEVICE_STATUS_DEVICE_ERROR;
1086
1087     /* nail down our block size, if we haven't already */
1088     if (!fix_block_size(self))
1089         return FALSE;
1090
1091     ops = make_generic_boolean_op_array(self);
1092
1093     do_rait_child_ops(self, read_label_do_op, ops);
1094
1095     for (i = 0; i < ops->len; i ++) {
1096         GenericOp * op = g_ptr_array_index(ops, i);
1097         DeviceStatusFlags result = GPOINTER_TO_INT(op->result);
1098         if (op->result == DEVICE_STATUS_SUCCESS) {
1099             if (first_success == NULL) {
1100                 /* This is the first successful device. */
1101                 first_success = op->child;
1102             } else if (!compare_volume_results(first_success, op->child)) {
1103                 /* Doesn't match. :-( */
1104                 failed_errmsg = vstrallocf("Inconsistent volume labels/datestamps: "
1105                         "Got %s/%s on %s against %s/%s on %s.",
1106                         first_success->volume_label,
1107                         first_success->volume_time,
1108                         first_success->device_name,
1109                         op->child->volume_label,
1110                         op->child->volume_time,
1111                         op->child->device_name);
1112                 g_warning("%s", failed_errmsg);
1113                 failed_result |= DEVICE_STATUS_VOLUME_ERROR;
1114             }
1115         } else {
1116             failed_result |= result;
1117         }
1118     }
1119
1120     if (failed_result != DEVICE_STATUS_SUCCESS) {
1121         /* We had multiple failures or an inconsistency. */
1122         device_set_error(dself, failed_errmsg, failed_result);
1123     } else {
1124         /* Everything peachy. */
1125         amfree(failed_errmsg);
1126
1127         g_assert(first_success != NULL);
1128         if (first_success->volume_label != NULL) {
1129             dself->volume_label = g_strdup(first_success->volume_label);
1130         }
1131         if (first_success->volume_time != NULL) {
1132             dself->volume_time = g_strdup(first_success->volume_time);
1133         }
1134         if (first_success->volume_header != NULL) {
1135             dself->volume_header = dumpfile_copy(first_success->volume_header);
1136         }
1137     }
1138
1139     g_ptr_array_free_full(ops);
1140
1141     return dself->status;
1142 }
1143
1144 typedef struct {
1145     GenericOp base;
1146     DeviceAccessMode mode; /* IN */
1147     char * label;          /* IN */
1148     char * timestamp;      /* IN */
1149 } StartOp;
1150
1151 /* A GFunc. */
1152 static void start_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1153     DeviceClass *klass;
1154     StartOp * param = data;
1155
1156     klass = DEVICE_GET_CLASS(param->base.child);
1157     if (klass->start) {
1158         param->base.result =
1159             GINT_TO_POINTER((klass->start)(param->base.child,
1160                                             param->mode, param->label,
1161                                             param->timestamp));
1162     } else {
1163         param->base.result = FALSE;
1164     }
1165 }
1166
1167 static gboolean
1168 rait_device_configure(Device * dself, gboolean use_global_config)
1169 {
1170     RaitDevice *self = RAIT_DEVICE(dself);
1171     guint i;
1172
1173     for (i = 0; i < self->private->children->len; i ++) {
1174         Device *child;
1175
1176         if ((signed)i == self->private->failed)
1177             continue;
1178
1179         child = g_ptr_array_index(self->private->children, i);
1180         /* unconditionally configure the child without the global
1181          * configuration */
1182         if (!device_configure(child, FALSE))
1183             return FALSE;
1184     }
1185
1186     if (parent_class->configure) {
1187         return parent_class->configure(dself, use_global_config);
1188     }
1189
1190     return TRUE;
1191 }
1192
1193 static gboolean
1194 rait_device_start (Device * dself, DeviceAccessMode mode, char * label,
1195                    char * timestamp) {
1196     GPtrArray * ops;
1197     guint i;
1198     gboolean success;
1199     RaitDevice * self;
1200     DeviceStatusFlags total_status;
1201     char *failure_errmsgs = NULL;
1202     char * label_from_device = NULL;
1203
1204     self = RAIT_DEVICE(dself);
1205
1206     if (rait_device_in_error(self)) return FALSE;
1207
1208     /* No starting in degraded mode. */
1209     if (self->private->status != RAIT_STATUS_COMPLETE &&
1210         (mode == ACCESS_WRITE || mode == ACCESS_APPEND)) {
1211         device_set_error(dself,
1212                          g_strdup_printf(_("RAIT device %s is read-only "
1213                                            "because it is in degraded mode.\n"),
1214                                          dself->device_name),
1215                          DEVICE_STATUS_DEVICE_ERROR);
1216         return FALSE;
1217     }
1218
1219     /* nail down our block size, if we haven't already */
1220     if (!fix_block_size(self))
1221         return FALSE;
1222
1223     dself->access_mode = mode;
1224     dself->in_file = FALSE;
1225     amfree(dself->volume_label);
1226     amfree(dself->volume_time);
1227     dumpfile_free(dself->volume_header);
1228     dself->volume_header = NULL;
1229
1230     ops = g_ptr_array_sized_new(self->private->children->len);
1231     for (i = 0; i < self->private->children->len; i ++) {
1232         StartOp * op;
1233
1234         if ((signed)i == self->private->failed) {
1235             continue;
1236         }
1237
1238         op = g_new(StartOp, 1);
1239         op->base.child = g_ptr_array_index(self->private->children, i);
1240         op->mode = mode;
1241         op->label = g_strdup(label);
1242         op->timestamp = g_strdup(timestamp);
1243         g_ptr_array_add(ops, op);
1244     }
1245
1246     do_rait_child_ops(self, start_do_op, ops);
1247
1248     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1249
1250     /* Check results of starting devices; this is mostly about the
1251      * VOLUME_UNLABELED flag. */
1252     total_status = 0;
1253     for (i = 0; i < ops->len; i ++) {
1254         StartOp * op = g_ptr_array_index(ops, i);
1255         Device *child = op->base.child;
1256
1257         total_status |= child->status;
1258         if (child->status != DEVICE_STATUS_SUCCESS) {
1259             /* record the error message and move on. */
1260             append_message(&failure_errmsgs,
1261                            g_strdup_printf("%s: %s",
1262                                            child->device_name,
1263                                            device_error_or_status(child)));
1264         } else {
1265             if (child->volume_label != NULL && child->volume_time != NULL) {
1266                 if (label_from_device) {
1267                     if (strcmp(child->volume_label, dself->volume_label) != 0 ||
1268                         strcmp(child->volume_time, dself->volume_time) != 0) {
1269                         /* Mismatch! (Two devices provided different labels) */
1270                         char * this_message =
1271                             g_strdup_printf("%s: Label (%s/%s) is different "
1272                                             "from label (%s/%s) found at "
1273                                             "device %s",
1274                                             child->device_name,
1275                                             child->volume_label,
1276                                             child->volume_time,
1277                                             dself->volume_label,
1278                                             dself->volume_time,
1279                                             label_from_device);
1280                         append_message(&failure_errmsgs, this_message);
1281                         total_status |= DEVICE_STATUS_DEVICE_ERROR;
1282                         g_warning("RAIT device children have different labels or timestamps");
1283                     }
1284                 } else {
1285                     /* First device with a volume. */
1286                     dself->volume_label = g_strdup(child->volume_label);
1287                     dself->volume_time = g_strdup(child->volume_time);
1288                     dself->volume_header = dumpfile_copy(child->volume_header);
1289                     label_from_device = g_strdup(child->device_name);
1290                 }
1291             } else {
1292                 /* Device problem, it says it succeeded but sets no label? */
1293                 char * this_message =
1294                     g_strdup_printf("%s: Says label read, but no volume "
1295                                      "label found.", child->device_name);
1296                 g_warning("RAIT device child has NULL volume or label");
1297                 append_message(&failure_errmsgs, this_message);
1298                 total_status |= DEVICE_STATUS_DEVICE_ERROR;
1299             }
1300         }
1301     }
1302
1303     amfree(label_from_device);
1304     g_ptr_array_free_full(ops);
1305
1306     dself->status = total_status;
1307
1308     if (total_status != DEVICE_STATUS_SUCCESS || !success) {
1309         device_set_error(dself, failure_errmsgs, total_status);
1310         return FALSE;
1311     }
1312
1313     amfree(failure_errmsgs);
1314     return TRUE;
1315 }
1316
1317 typedef struct {
1318     GenericOp base;
1319     dumpfile_t * info; /* IN */
1320     int fileno;
1321 } StartFileOp;
1322
1323 /* a GFunc */
1324 static void start_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1325     StartFileOp * op = data;
1326     op->base.result = GINT_TO_POINTER(device_start_file(op->base.child,
1327                                                         op->info));
1328     op->fileno = op->base.child->file;
1329     if (op->fileno < 1) {
1330         op->base.result = FALSE;
1331     }
1332 }
1333
1334 static gboolean
1335 rait_device_start_file (Device * dself, dumpfile_t * info) {
1336     GPtrArray * ops;
1337     guint i;
1338     gboolean success;
1339     RaitDevice * self;
1340     int actual_file = -1;
1341
1342     self = RAIT_DEVICE(dself);
1343
1344     if (rait_device_in_error(self)) return FALSE;
1345     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1346
1347     ops = g_ptr_array_sized_new(self->private->children->len);
1348     for (i = 0; i < self->private->children->len; i ++) {
1349         StartFileOp * op;
1350         op = g_new(StartFileOp, 1);
1351         op->base.child = g_ptr_array_index(self->private->children, i);
1352         /* each child gets its own copy of the header, to munge as it
1353          * likes (setting blocksize, at least) */
1354         op->info = dumpfile_copy(info);
1355         g_ptr_array_add(ops, op);
1356     }
1357
1358     do_rait_child_ops(self, start_file_do_op, ops);
1359
1360     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1361
1362     for (i = 0; i < self->private->children->len && success; i ++) {
1363         StartFileOp * op = g_ptr_array_index(ops, i);
1364         if (!op->base.result)
1365             continue;
1366         g_assert(op->fileno >= 1);
1367         if (actual_file < 1) {
1368             actual_file = op->fileno;
1369         }
1370         if (actual_file != op->fileno) {
1371             /* File number mismatch! Aah, my hair is on fire! */
1372             device_set_error(dself,
1373                              g_strdup_printf("File number mismatch in "
1374                                              "rait_device_start_file(): "
1375                                              "Child %s reported file number "
1376                                              "%d, another child reported "
1377                                              "file number %d.",
1378                                              op->base.child->device_name,
1379                                              op->fileno, actual_file),
1380                              DEVICE_STATUS_DEVICE_ERROR);
1381             success = FALSE;
1382             op->base.result = FALSE;
1383         }
1384     }
1385
1386     for (i = 0; i < ops->len && success; i ++) {
1387         StartFileOp * op = g_ptr_array_index(ops, i);
1388         if (op->info) dumpfile_free(op->info);
1389     }
1390     g_ptr_array_free_full(ops);
1391
1392     if (!success) {
1393         if (!device_in_error(dself)) {
1394             device_set_error(dself, stralloc("One or more devices "
1395                                              "failed to start_file"),
1396                              DEVICE_STATUS_DEVICE_ERROR);
1397         }
1398         return FALSE;
1399     }
1400
1401     dself->in_file = TRUE;
1402     g_assert(actual_file >= 1);
1403     dself->file = actual_file;
1404
1405     return TRUE;
1406 }
1407
1408 static void find_simple_params(RaitDevice * self,
1409                                guint * num_children,
1410                                guint * data_children) {
1411     int num, data;
1412
1413     num = self->private->children->len;
1414     if (num > 1)
1415         data = num - 1;
1416     else
1417         data = num;
1418     if (num_children != NULL)
1419         *num_children = num;
1420     if (data_children != NULL)
1421         *data_children = data;
1422 }
1423
1424 typedef struct {
1425     GenericOp base;
1426     guint size;           /* IN */
1427     gpointer data;        /* IN */
1428     gboolean data_needs_free; /* bookkeeping */
1429 } WriteBlockOp;
1430
1431 /* a GFunc. */
1432 static void write_block_do_op(gpointer data,
1433                               gpointer user_data G_GNUC_UNUSED) {
1434     WriteBlockOp * op = data;
1435
1436     op->base.result =
1437         GINT_TO_POINTER(device_write_block(op->base.child, op->size, op->data));
1438 }
1439
1440 /* Parity block generation. Performance of this function can be improved
1441    considerably by using larger-sized integers or
1442    assembly-coded vector instructions. Parameters are:
1443    % data       - All data chunks in series (chunk_size * num_chunks bytes)
1444    % parity     - Allocated space for parity block (chunk_size bytes)
1445  */
1446 static void make_parity_block(char * data, char * parity,
1447                               guint chunk_size, guint num_chunks) {
1448     guint i;
1449     bzero(parity, chunk_size);
1450     for (i = 0; i < num_chunks - 1; i ++) {
1451         guint j;
1452         for (j = 0; j < chunk_size; j ++) {
1453             parity[j] ^= data[chunk_size*i + j];
1454         }
1455     }
1456 }
1457
1458 /* Does the same thing as make_parity_block, but instead of using a
1459    single memory chunk holding all chunks, it takes a GPtrArray of
1460    chunks. */
1461 static void make_parity_block_extents(GPtrArray * data, char * parity,
1462                                       guint chunk_size) {
1463     guint i;
1464     bzero(parity, chunk_size);
1465     for (i = 0; i < data->len; i ++) {
1466         guint j;
1467         char * data_chunk;
1468         data_chunk = g_ptr_array_index(data, i);
1469         for (j = 0; j < chunk_size; j ++) {
1470             parity[j] ^= data_chunk[j];
1471         }
1472     }
1473 }
1474
1475 /* Does the parity creation algorithm. Allocates and returns a single
1476    device block from a larger RAIT block. chunks and chunk are 1-indexed. */
1477 static char * extract_data_block(char * data, guint size,
1478                                  guint chunks, guint chunk) {
1479     char * rval;
1480     guint chunk_size;
1481
1482     g_assert(chunks > 0 && chunk > 0 && chunk <= chunks);
1483     g_assert(data != NULL);
1484     g_assert(size > 0 && size % (chunks - 1) == 0);
1485
1486     chunk_size = size / (chunks - 1);
1487     rval = g_malloc(chunk_size);
1488     if (chunks != chunk) {
1489         /* data block. */
1490         memcpy(rval, data + chunk_size * (chunk - 1), chunk_size);
1491     } else {
1492         make_parity_block(data, rval, chunk_size, chunks);
1493     }
1494
1495     return rval;
1496 }
1497
1498 static gboolean
1499 rait_device_write_block (Device * dself, guint size, gpointer data) {
1500     GPtrArray * ops;
1501     guint i;
1502     gboolean success;
1503     guint data_children, num_children;
1504     gsize blocksize = dself->block_size;
1505     RaitDevice * self;
1506     gboolean last_block = (size < blocksize);
1507
1508     self = RAIT_DEVICE(dself);
1509
1510     if (rait_device_in_error(self)) return FALSE;
1511     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1512
1513     find_simple_params(RAIT_DEVICE(self), &num_children, &data_children);
1514     num_children = self->private->children->len;
1515     if (num_children != 1)
1516         data_children = num_children - 1;
1517     else
1518         data_children = num_children;
1519
1520     g_assert(size % data_children == 0 || last_block);
1521
1522     /* zero out to the end of a short block -- tape devices only write
1523      * whole blocks. */
1524     if (last_block) {
1525         char *new_data;
1526
1527         new_data = g_malloc(blocksize);
1528         memcpy(new_data, data, size);
1529         bzero(new_data + size, blocksize - size);
1530
1531         data = new_data;
1532         size = blocksize;
1533     }
1534
1535     ops = g_ptr_array_sized_new(num_children);
1536     for (i = 0; i < self->private->children->len; i ++) {
1537         WriteBlockOp * op;
1538         op = g_malloc(sizeof(*op));
1539         op->base.child = g_ptr_array_index(self->private->children, i);
1540         op->size = size / data_children;
1541         if (num_children <= 2) {
1542             op->data = data;
1543             op->data_needs_free = FALSE;
1544         } else {
1545             op->data_needs_free = TRUE;
1546             op->data = extract_data_block(data, size, num_children, i + 1);
1547         }
1548         g_ptr_array_add(ops, op);
1549     }
1550
1551     do_rait_child_ops(self, write_block_do_op, ops);
1552
1553     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1554
1555     for (i = 0; i < self->private->children->len; i ++) {
1556         WriteBlockOp * op = g_ptr_array_index(ops, i);
1557         if (op->data_needs_free)
1558             free(op->data);
1559     }
1560
1561     if (last_block) {
1562         amfree(data);
1563     }
1564
1565     g_ptr_array_free_full(ops);
1566
1567     if (!success) {
1568         /* TODO be more specific here */
1569         /* TODO: handle EOM here -- if one or more (or two or more??)
1570          * children have is_eom set, then reflect that in our error
1571          * status. What's more fun is when one device fails and must be isolated at
1572          * the same time another hits EOF. */
1573         device_set_error(dself,
1574             stralloc("One or more devices failed to write_block"),
1575             DEVICE_STATUS_DEVICE_ERROR);
1576         /* this is EOM or an error, so call it EOM */
1577         dself->is_eom = TRUE;
1578         return FALSE;
1579     } else {
1580         dself->block ++;
1581
1582         return TRUE;
1583     }
1584 }
1585
1586 /* A GFunc */
1587 static void finish_file_do_op(gpointer data,
1588                               gpointer user_data G_GNUC_UNUSED) {
1589     GenericOp * op = data;
1590     if (op->child) {
1591         op->result = GINT_TO_POINTER(device_finish_file(op->child));
1592     } else {
1593         op->result = FALSE;
1594     }
1595 }
1596
1597 static gboolean
1598 rait_device_finish_file (Device * dself) {
1599     GPtrArray * ops;
1600     gboolean success;
1601     RaitDevice * self = RAIT_DEVICE(dself);
1602
1603     g_assert(self != NULL);
1604     if (rait_device_in_error(dself)) return FALSE;
1605     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1606
1607     ops = make_generic_boolean_op_array(self);
1608
1609     do_rait_child_ops(self, finish_file_do_op, ops);
1610
1611     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1612
1613     g_ptr_array_free_full(ops);
1614
1615     if (!success) {
1616         /* TODO: be more specific here */
1617         device_set_error(dself,
1618                          g_strdup("One or more devices failed to finish_file"),
1619             DEVICE_STATUS_DEVICE_ERROR);
1620         return FALSE;
1621     }
1622
1623     dself->in_file = FALSE;
1624     return TRUE;
1625 }
1626
1627 typedef struct {
1628     GenericOp base;
1629     guint requested_file;                /* IN */
1630     guint actual_file;                   /* OUT */
1631 } SeekFileOp;
1632
1633 /* a GFunc. */
1634 static void seek_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1635     SeekFileOp * op = data;
1636     op->base.result = device_seek_file(op->base.child, op->requested_file);
1637     op->actual_file = op->base.child->file;
1638 }
1639
1640 static dumpfile_t *
1641 rait_device_seek_file (Device * dself, guint file) {
1642     GPtrArray * ops;
1643     guint i;
1644     gboolean success;
1645     dumpfile_t * rval;
1646     RaitDevice * self = RAIT_DEVICE(dself);
1647     guint actual_file = 0;
1648     gboolean in_file = FALSE;
1649
1650     if (rait_device_in_error(self)) return NULL;
1651
1652     dself->in_file = FALSE;
1653     dself->is_eof = FALSE;
1654     dself->block = 0;
1655
1656     ops = g_ptr_array_sized_new(self->private->children->len);
1657     for (i = 0; i < self->private->children->len; i ++) {
1658         SeekFileOp * op;
1659         if ((int)i == self->private->failed)
1660             continue; /* This device is broken. */
1661         op = g_new(SeekFileOp, 1);
1662         op->base.child = g_ptr_array_index(self->private->children, i);
1663         op->base.child_index = i;
1664         op->requested_file = file;
1665         g_ptr_array_add(ops, op);
1666     }
1667
1668     do_rait_child_ops(self, seek_file_do_op, ops);
1669
1670     /* This checks for NULL values, but we still have to check for
1671        consistant headers. */
1672     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1673                                        ops, extract_boolean_pointer_op);
1674
1675     rval = NULL;
1676     for (i = 0; i < ops->len; i ++) {
1677         SeekFileOp * this_op;
1678         dumpfile_t * this_result;
1679         guint this_actual_file;
1680         gboolean this_in_file;
1681
1682         this_op = (SeekFileOp*)g_ptr_array_index(ops, i);
1683
1684         if ((signed)this_op->base.child_index == self->private->failed)
1685             continue;
1686
1687         this_result = this_op->base.result;
1688         this_actual_file = this_op->actual_file;
1689         this_in_file = this_op->base.child->in_file;
1690
1691         if (rval == NULL) {
1692             rval = this_result;
1693             actual_file = this_actual_file;
1694             in_file = this_in_file;
1695         } else {
1696             if (headers_are_equal(rval, this_result) &&
1697                 actual_file == this_actual_file &&
1698                 in_file == this_in_file) {
1699                 /* Do nothing. */
1700             } else {
1701                 success = FALSE;
1702             }
1703             free(this_result);
1704         }
1705     }
1706
1707     g_ptr_array_free_full(ops);
1708
1709     if (!success) {
1710         amfree(rval);
1711         /* TODO: be more specific here */
1712         device_set_error(dself,
1713                          g_strdup("One or more devices failed to seek_file"),
1714             DEVICE_STATUS_DEVICE_ERROR);
1715         return NULL;
1716     }
1717
1718     /* update our state */
1719     dself->in_file = in_file;
1720     dself->file = actual_file;
1721
1722     return rval;
1723 }
1724
1725 typedef struct {
1726     GenericOp base;
1727     guint64 block; /* IN */
1728 } SeekBlockOp;
1729
1730 /* a GFunc. */
1731 static void seek_block_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1732     SeekBlockOp * op = data;
1733     op->base.result =
1734         GINT_TO_POINTER(device_seek_block(op->base.child, op->block));
1735 }
1736
1737 static gboolean
1738 rait_device_seek_block (Device * dself, guint64 block) {
1739     GPtrArray * ops;
1740     guint i;
1741     gboolean success;
1742
1743     RaitDevice * self = RAIT_DEVICE(dself);
1744
1745     if (rait_device_in_error(self)) return FALSE;
1746
1747     ops = g_ptr_array_sized_new(self->private->children->len);
1748     for (i = 0; i < self->private->children->len; i ++) {
1749         SeekBlockOp * op;
1750         if ((int)i == self->private->failed)
1751             continue; /* This device is broken. */
1752         op = g_new(SeekBlockOp, 1);
1753         op->base.child = g_ptr_array_index(self->private->children, i);
1754         op->base.child_index = i;
1755         op->block = block;
1756         g_ptr_array_add(ops, op);
1757     }
1758
1759     do_rait_child_ops(self, seek_block_do_op, ops);
1760
1761     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1762                                        ops, extract_boolean_generic_op);
1763
1764     g_ptr_array_free_full(ops);
1765
1766     if (!success) {
1767         /* TODO: be more specific here */
1768         device_set_error(dself,
1769             stralloc("One or more devices failed to seek_block"),
1770             DEVICE_STATUS_DEVICE_ERROR);
1771         return FALSE;
1772     }
1773
1774     dself->block = block;
1775     return TRUE;
1776 }
1777
1778 typedef struct {
1779     GenericOp base;
1780     gpointer buffer; /* IN */
1781     int read_size;      /* IN/OUT -- note not a pointer */
1782     int desired_read_size; /* bookkeeping */
1783 } ReadBlockOp;
1784
1785 /* a GFunc. */
1786 static void read_block_do_op(gpointer data,
1787                              gpointer user_data G_GNUC_UNUSED) {
1788     ReadBlockOp * op = data;
1789     op->base.result =
1790         GINT_TO_POINTER(device_read_block(op->base.child, op->buffer,
1791                                           &(op->read_size)));
1792     if (op->read_size > op->desired_read_size) {
1793         g_warning("child device %s tried to return an oversized block, which the RAIT device does not support",
1794                   op->base.child->device_name);
1795     }
1796 }
1797
1798 /* A BooleanExtractor. This one checks for a successful read. */
1799 static gboolean extract_boolean_read_block_op_data(gpointer data) {
1800     ReadBlockOp * op = data;
1801     return GPOINTER_TO_INT(op->base.result) == op->desired_read_size;
1802 }
1803
1804 /* A BooleanExtractor. This one checks for EOF. */
1805 static gboolean extract_boolean_read_block_op_eof(gpointer data) {
1806     ReadBlockOp * op = data;
1807     return op->base.child->is_eof;
1808 }
1809
1810 /* Counts the number of elements in an array matching a given proposition. */
1811 static int g_ptr_array_count(GPtrArray * array, BooleanExtractor filter) {
1812     int rval;
1813     unsigned int i;
1814     rval = 0;
1815     for (i = 0; i < array->len ; i++) {
1816         if (filter(g_ptr_array_index(array, i)))
1817             rval ++;
1818     }
1819     return rval;
1820 }
1821
1822 static gboolean raid_block_reconstruction(RaitDevice * self, GPtrArray * ops,
1823                                       gpointer buf, size_t bufsize) {
1824     guint num_children, data_children;
1825     gsize blocksize;
1826     gsize child_blocksize;
1827     guint i;
1828     int parity_child;
1829     gpointer parity_block = NULL;
1830     gboolean success;
1831
1832     success = TRUE;
1833
1834     blocksize = DEVICE(self)->block_size;
1835     find_simple_params(self, &num_children, &data_children);
1836
1837     if (num_children > 1)
1838         parity_child = num_children - 1;
1839     else
1840         parity_child = -1;
1841
1842     child_blocksize = blocksize / data_children;
1843
1844     for (i = 0; i < ops->len; i ++) {
1845         ReadBlockOp * op = g_ptr_array_index(ops, i);
1846         if (!extract_boolean_read_block_op_data(op))
1847             continue;
1848         if ((int)(op->base.child_index) == parity_child) {
1849             parity_block = op->buffer;
1850         } else {
1851             g_assert(child_blocksize * (op->base.child_index+1) <= bufsize);
1852             memcpy((char *)buf + child_blocksize * op->base.child_index, op->buffer,
1853                    child_blocksize);
1854         }
1855     }
1856     if (self->private->status == RAIT_STATUS_COMPLETE) {
1857         g_assert(parity_block != NULL); /* should have found parity_child */
1858
1859         if (num_children >= 2) {
1860             /* Verify the parity block. This code is inefficient but
1861                does the job for the 2-device case, too. */
1862             gpointer constructed_parity;
1863             GPtrArray * data_extents;
1864
1865             constructed_parity = g_malloc(child_blocksize);
1866             data_extents = g_ptr_array_sized_new(data_children);
1867             for (i = 0; i < data_children; i ++) {
1868                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1869                 g_assert(extract_boolean_read_block_op_data(op));
1870                 if ((int)op->base.child_index == parity_child)
1871                     continue;
1872                 g_ptr_array_add(data_extents, op->buffer);
1873             }
1874             make_parity_block_extents(data_extents, constructed_parity,
1875                                       child_blocksize);
1876
1877             if (0 != memcmp(parity_block, constructed_parity,
1878                             child_blocksize)) {
1879                 device_set_error(DEVICE(self),
1880                     stralloc(_("RAIT is inconsistent: Parity block did not match data blocks.")),
1881                     DEVICE_STATUS_DEVICE_ERROR);
1882                 /* TODO: can't we just isolate the device in this case? */
1883                 success = FALSE;
1884             }
1885             g_ptr_array_free(data_extents, TRUE);
1886             amfree(constructed_parity);
1887         } else { /* do nothing. */ }
1888     } else if (self->private->status == RAIT_STATUS_DEGRADED) {
1889         g_assert(self->private->failed >= 0 && self->private->failed < (int)num_children);
1890         /* We are in degraded mode. What's missing? */
1891         if (self->private->failed == parity_child) {
1892             /* do nothing. */
1893         } else if (num_children >= 2) {
1894             /* Reconstruct failed block from parity block. */
1895             GPtrArray * data_extents = g_ptr_array_new();
1896
1897             for (i = 0; i < data_children; i ++) {
1898                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1899                 if (!extract_boolean_read_block_op_data(op))
1900                     continue;
1901                 g_ptr_array_add(data_extents, op->buffer);
1902             }
1903
1904             /* Conveniently, the reconstruction is the same procedure
1905                as the parity generation. This even works if there is
1906                only one remaining device! */
1907             make_parity_block_extents(data_extents,
1908                                       (char *)buf + (child_blocksize *
1909                                              self->private->failed),
1910                                       child_blocksize);
1911
1912             /* The array members belong to our ops argument. */
1913             g_ptr_array_free(data_extents, TRUE);
1914         } else {
1915             g_assert_not_reached();
1916         }
1917     } else {
1918         /* device is already in FAILED state -- we shouldn't even be here */
1919         success = FALSE;
1920     }
1921     return success;
1922 }
1923
1924 static int
1925 rait_device_read_block (Device * dself, gpointer buf, int * size) {
1926     GPtrArray * ops;
1927     guint i;
1928     gboolean success;
1929     guint num_children, data_children;
1930     gsize blocksize = dself->block_size;
1931     gsize child_blocksize;
1932
1933     RaitDevice * self = RAIT_DEVICE(dself);
1934
1935     if (rait_device_in_error(self)) return -1;
1936
1937     find_simple_params(self, &num_children, &data_children);
1938
1939     /* tell caller they haven't given us a big enough buffer */
1940     if (blocksize > (gsize)*size) {
1941         g_assert(blocksize < INT_MAX);
1942         *size = (int)blocksize;
1943         return 0;
1944     }
1945
1946     g_assert(blocksize % data_children == 0); /* see find_block_size */
1947     child_blocksize = blocksize / data_children;
1948
1949     ops = g_ptr_array_sized_new(num_children);
1950     for (i = 0; i < num_children; i ++) {
1951         ReadBlockOp * op;
1952         if ((int)i == self->private->failed)
1953             continue; /* This device is broken. */
1954         op = g_new(ReadBlockOp, 1);
1955         op->base.child = g_ptr_array_index(self->private->children, i);
1956         op->base.child_index = i;
1957         op->buffer = g_malloc(child_blocksize);
1958         op->desired_read_size = op->read_size = child_blocksize;
1959         g_ptr_array_add(ops, op);
1960     }
1961
1962     do_rait_child_ops(self, read_block_do_op, ops);
1963
1964     if (g_ptr_array_count(ops, extract_boolean_read_block_op_data)) {
1965         if (!g_ptr_array_union_robust(RAIT_DEVICE(self),
1966                                      ops,
1967                                      extract_boolean_read_block_op_data)) {
1968             /* TODO: be more specific */
1969             device_set_error(dself,
1970                 stralloc(_("Error occurred combining blocks from child devices")),
1971                 DEVICE_STATUS_DEVICE_ERROR);
1972             success = FALSE;
1973         } else {
1974             /* raid_block_reconstruction sets the error status if necessary */
1975             success = raid_block_reconstruction(RAIT_DEVICE(self),
1976                                                 ops, buf, (size_t)*size);
1977         }
1978     } else {
1979         success = FALSE;
1980         if (g_ptr_array_union_robust(RAIT_DEVICE(self),
1981                                      ops,
1982                                      extract_boolean_read_block_op_eof)) {
1983             device_set_error(dself,
1984                 stralloc(_("EOF")),
1985                 DEVICE_STATUS_SUCCESS);
1986             dself->is_eof = TRUE;
1987             dself->in_file = FALSE;
1988         } else {
1989             device_set_error(dself,
1990                 stralloc(_("All child devices failed to read, but not all are at eof")),
1991                 DEVICE_STATUS_DEVICE_ERROR);
1992         }
1993     }
1994
1995     for (i = 0; i < ops->len; i ++) {
1996         ReadBlockOp * op = g_ptr_array_index(ops, i);
1997         amfree(op->buffer);
1998     }
1999     g_ptr_array_free_full(ops);
2000
2001     if (success) {
2002         dself->block++;
2003         *size = blocksize;
2004         return blocksize;
2005     } else {
2006         return -1;
2007     }
2008 }
2009
2010 /* property utility functions */
2011
2012 typedef struct {
2013     GenericOp base;
2014     DevicePropertyId id;   /* IN */
2015     GValue value;          /* IN/OUT */
2016     PropertySurety surety; /* IN (for set) */
2017     PropertySource source; /* IN (for set) */
2018 } PropertyOp;
2019
2020 /* Creates a GPtrArray of PropertyOf for a get or set operation. */
2021 static GPtrArray * make_property_op_array(RaitDevice * self,
2022                                           DevicePropertyId id,
2023                                           GValue * value,
2024                                           PropertySurety surety,
2025                                           PropertySource source) {
2026     guint i;
2027     GPtrArray * ops;
2028     ops = g_ptr_array_sized_new(self->private->children->len);
2029     for (i = 0; i < self->private->children->len; i ++) {
2030         PropertyOp * op;
2031
2032         if ((signed)i == self->private->failed) {
2033             continue;
2034         }
2035
2036         op = g_new(PropertyOp, 1);
2037         op->base.child = g_ptr_array_index(self->private->children, i);
2038         op->id = id;
2039         bzero(&(op->value), sizeof(op->value));
2040         if (value != NULL) {
2041             g_value_unset_copy(value, &(op->value));
2042         }
2043         op->surety = surety;
2044         op->source = source;
2045         g_ptr_array_add(ops, op);
2046     }
2047
2048     return ops;
2049 }
2050
2051 /* A GFunc. */
2052 static void property_get_do_op(gpointer data,
2053                                gpointer user_data G_GNUC_UNUSED) {
2054     PropertyOp * op = data;
2055
2056     bzero(&(op->value), sizeof(op->value));
2057     op->base.result =
2058         GINT_TO_POINTER(device_property_get(op->base.child, op->id,
2059                                             &(op->value)));
2060 }
2061
2062 /* A GFunc. */
2063 static void property_set_do_op(gpointer data,
2064                                gpointer user_data G_GNUC_UNUSED) {
2065     PropertyOp * op = data;
2066
2067     op->base.result =
2068         GINT_TO_POINTER(device_property_set_ex(op->base.child, op->id,
2069                                                &(op->value), op->surety,
2070                                                op->source));
2071     g_value_unset(&(op->value));
2072 }
2073
2074 /* PropertyGetFns and PropertySetFns */
2075
2076 static gboolean
2077 property_get_block_size_fn(Device *dself,
2078     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2079     PropertySurety *surety, PropertySource *source)
2080 {
2081     RaitDevice *self = RAIT_DEVICE(dself);
2082     gsize my_block_size;
2083
2084     if (dself->block_size_source != PROPERTY_SOURCE_DEFAULT) {
2085         my_block_size = dself->block_size;
2086
2087         if (surety)
2088             *surety = dself->block_size_surety;
2089     } else {
2090         gsize child_block_size;
2091         child_block_size = calculate_block_size_from_children(self,
2092                                                     &my_block_size);
2093         if (child_block_size == 0)
2094             return FALSE;
2095
2096         if (surety)
2097             *surety = PROPERTY_SURETY_BAD; /* may still change */
2098     }
2099
2100     if (val) {
2101         g_value_unset_init(val, G_TYPE_INT);
2102         g_assert(my_block_size < G_MAXINT); /* gsize -> gint */
2103         g_value_set_int(val, (gint)my_block_size);
2104     }
2105
2106     if (source)
2107         *source = dself->block_size_source;
2108
2109     return TRUE;
2110 }
2111
2112 static gboolean
2113 property_set_block_size_fn(Device *dself,
2114     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2115     PropertySurety surety, PropertySource source)
2116 {
2117     RaitDevice *self = RAIT_DEVICE(dself);
2118     gint my_block_size = g_value_get_int(val);
2119     guint data_children;
2120
2121     find_simple_params(self, NULL, &data_children);
2122     if ((my_block_size % data_children) != 0) {
2123         device_set_error(dself,
2124             vstrallocf(_("Block size must be a multiple of %d"), data_children),
2125             DEVICE_STATUS_DEVICE_ERROR);
2126         return FALSE;
2127     }
2128
2129     dself->block_size = my_block_size;
2130     dself->block_size_source = source;
2131     dself->block_size_surety = surety;
2132
2133     if (!fix_block_size(self))
2134         return FALSE;
2135
2136     return TRUE;
2137 }
2138
2139 static gboolean
2140 property_get_canonical_name_fn(Device *dself,
2141     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2142     PropertySurety *surety, PropertySource *source)
2143 {
2144     RaitDevice *self = RAIT_DEVICE(dself);
2145     char *canonical = child_device_names_to_rait_name(self);
2146
2147     if (val) {
2148         g_value_unset_init(val, G_TYPE_STRING);
2149         g_value_set_string(val, canonical);
2150         g_free(canonical);
2151     }
2152
2153     if (surety)
2154         *surety = PROPERTY_SURETY_GOOD;
2155
2156     if (source)
2157         *source = PROPERTY_SOURCE_DETECTED;
2158
2159     return TRUE;
2160 }
2161
2162 static gboolean
2163 property_get_concurrency_fn(Device *dself,
2164     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2165     PropertySurety *surety, PropertySource *source)
2166 {
2167     RaitDevice *self = RAIT_DEVICE(dself);
2168     ConcurrencyParadigm result;
2169     guint i;
2170     GPtrArray * ops;
2171     gboolean success;
2172
2173     ops = make_property_op_array(self, PROPERTY_CONCURRENCY, NULL, 0, 0);
2174     do_rait_child_ops(self, property_get_do_op, ops);
2175
2176     /* find the most restrictive paradigm acceptable to all
2177      * child devices */
2178     result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
2179     success = TRUE;
2180     for (i = 0; i < ops->len; i ++) {
2181         ConcurrencyParadigm cur;
2182         PropertyOp * op = g_ptr_array_index(ops, i);
2183
2184         if (!op->base.result
2185             || G_VALUE_TYPE(&(op->value)) != CONCURRENCY_PARADIGM_TYPE) {
2186             success = FALSE;
2187             break;
2188         }
2189
2190         cur = g_value_get_enum(&(op->value));
2191         if (result == CONCURRENCY_PARADIGM_EXCLUSIVE ||
2192             cur == CONCURRENCY_PARADIGM_EXCLUSIVE) {
2193             result = CONCURRENCY_PARADIGM_EXCLUSIVE;
2194         } else if (result == CONCURRENCY_PARADIGM_SHARED_READ ||
2195                    cur == CONCURRENCY_PARADIGM_SHARED_READ) {
2196             result = CONCURRENCY_PARADIGM_SHARED_READ;
2197         } else if (result == CONCURRENCY_PARADIGM_RANDOM_ACCESS &&
2198                    cur == CONCURRENCY_PARADIGM_RANDOM_ACCESS) {
2199             result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
2200         } else {
2201             success = FALSE;
2202             break;
2203         }
2204     }
2205
2206     g_ptr_array_free_full(ops);
2207
2208     if (success) {
2209         if (val) {
2210             g_value_unset_init(val, CONCURRENCY_PARADIGM_TYPE);
2211             g_value_set_enum(val, result);
2212         }
2213
2214         if (surety)
2215             *surety = PROPERTY_SURETY_GOOD;
2216
2217         if (source)
2218             *source = PROPERTY_SOURCE_DETECTED;
2219     }
2220
2221     return success;
2222 }
2223
2224 static gboolean
2225 property_get_streaming_fn(Device *dself,
2226     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2227     PropertySurety *surety, PropertySource *source)
2228 {
2229     RaitDevice *self = RAIT_DEVICE(dself);
2230     StreamingRequirement result;
2231     guint i;
2232     GPtrArray * ops;
2233     gboolean success;
2234
2235     ops = make_property_op_array(self, PROPERTY_STREAMING, NULL, 0, 0);
2236     do_rait_child_ops(self, property_get_do_op, ops);
2237
2238     /* combine the child streaming requirements, selecting the strongest
2239      * requirement of the bunch. */
2240     result = STREAMING_REQUIREMENT_NONE;
2241     success = TRUE;
2242     for (i = 0; i < ops->len; i ++) {
2243         StreamingRequirement cur;
2244         PropertyOp * op = g_ptr_array_index(ops, i);
2245
2246         if (!op->base.result
2247             || G_VALUE_TYPE(&(op->value)) != STREAMING_REQUIREMENT_TYPE) {
2248             success = FALSE;
2249             break;
2250         }
2251
2252         cur = g_value_get_enum(&(op->value));
2253         if (result == STREAMING_REQUIREMENT_REQUIRED ||
2254             cur == STREAMING_REQUIREMENT_REQUIRED) {
2255             result = STREAMING_REQUIREMENT_REQUIRED;
2256         } else if (result == STREAMING_REQUIREMENT_DESIRED ||
2257                    cur == STREAMING_REQUIREMENT_DESIRED) {
2258             result = STREAMING_REQUIREMENT_DESIRED;
2259         } else if (result == STREAMING_REQUIREMENT_NONE &&
2260                    cur == STREAMING_REQUIREMENT_NONE) {
2261             result = STREAMING_REQUIREMENT_NONE;
2262         } else {
2263             success = FALSE;
2264             break;
2265         }
2266     }
2267
2268     g_ptr_array_free_full(ops);
2269
2270     if (success) {
2271         if (val) {
2272             g_value_unset_init(val, STREAMING_REQUIREMENT_TYPE);
2273             g_value_set_enum(val, result);
2274         }
2275
2276         if (surety)
2277             *surety = PROPERTY_SURETY_GOOD;
2278
2279         if (source)
2280             *source = PROPERTY_SOURCE_DETECTED;
2281     }
2282
2283     return success;
2284 }
2285
2286 static gboolean
2287 property_get_boolean_and_fn(Device *dself,
2288     DevicePropertyBase *base, GValue *val,
2289     PropertySurety *surety, PropertySource *source)
2290 {
2291     RaitDevice *self = RAIT_DEVICE(dself);
2292     gboolean result;
2293     guint i;
2294     GPtrArray * ops;
2295     gboolean success;
2296
2297     ops = make_property_op_array(self, base->ID, NULL, 0, 0);
2298     do_rait_child_ops(self, property_get_do_op, ops);
2299
2300     /* combine the child values, applying a simple AND */
2301     result = TRUE;
2302     success = TRUE;
2303     for (i = 0; i < ops->len; i ++) {
2304         PropertyOp * op = g_ptr_array_index(ops, i);
2305
2306         if (!op->base.result || !G_VALUE_HOLDS_BOOLEAN(&(op->value))) {
2307             success = FALSE;
2308             break;
2309         }
2310
2311         if (!g_value_get_boolean(&(op->value))) {
2312             result = FALSE;
2313             break;
2314         }
2315     }
2316
2317     g_ptr_array_free_full(ops);
2318
2319     if (success) {
2320         if (val) {
2321             g_value_unset_init(val, G_TYPE_BOOLEAN);
2322             g_value_set_boolean(val, result);
2323         }
2324
2325         if (surety)
2326             *surety = PROPERTY_SURETY_GOOD;
2327
2328         if (source)
2329             *source = PROPERTY_SOURCE_DETECTED;
2330     }
2331
2332     return success;
2333 }
2334
2335 static gboolean
2336 property_get_medium_access_type_fn(Device *dself,
2337     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2338     PropertySurety *surety, PropertySource *source)
2339 {
2340     RaitDevice *self = RAIT_DEVICE(dself);
2341     MediaAccessMode result;
2342     guint i;
2343     GPtrArray * ops;
2344     gboolean success;
2345
2346     ops = make_property_op_array(self, PROPERTY_MEDIUM_ACCESS_TYPE, NULL, 0, 0);
2347     do_rait_child_ops(self, property_get_do_op, ops);
2348
2349     /* combine the modes as best we can */
2350     result = 0;
2351     success = TRUE;
2352     for (i = 0; i < ops->len; i ++) {
2353         MediaAccessMode cur;
2354         PropertyOp * op = g_ptr_array_index(ops, i);
2355
2356         if (!op->base.result || G_VALUE_TYPE(&(op->value)) != MEDIA_ACCESS_MODE_TYPE) {
2357             success = FALSE;
2358             break;
2359         }
2360
2361         cur = g_value_get_enum(&(op->value));
2362
2363         if (i == 0) {
2364             result = cur;
2365         } else if ((result == MEDIA_ACCESS_MODE_READ_ONLY &&
2366                     cur == MEDIA_ACCESS_MODE_WRITE_ONLY) ||
2367                    (result == MEDIA_ACCESS_MODE_WRITE_ONLY &&
2368                     cur == MEDIA_ACCESS_MODE_READ_ONLY)) {
2369             /* Invalid combination; one device can only read, other
2370                can only write. */
2371             success = FALSE;
2372             break;
2373         } else if (result == MEDIA_ACCESS_MODE_READ_ONLY ||
2374                    cur == MEDIA_ACCESS_MODE_READ_ONLY) {
2375             result = MEDIA_ACCESS_MODE_READ_ONLY;
2376         } else if (result == MEDIA_ACCESS_MODE_WRITE_ONLY ||
2377                    cur == MEDIA_ACCESS_MODE_WRITE_ONLY) {
2378             result = MEDIA_ACCESS_MODE_WRITE_ONLY;
2379         } else if (result == MEDIA_ACCESS_MODE_WORM ||
2380                    cur == MEDIA_ACCESS_MODE_WORM) {
2381             result = MEDIA_ACCESS_MODE_WORM;
2382         } else if (result == MEDIA_ACCESS_MODE_READ_WRITE &&
2383                    cur == MEDIA_ACCESS_MODE_READ_WRITE) {
2384             result = MEDIA_ACCESS_MODE_READ_WRITE;
2385         } else {
2386             success = FALSE;
2387             break;
2388         }
2389     }
2390
2391     g_ptr_array_free_full(ops);
2392
2393     if (success) {
2394         if (val) {
2395             g_value_unset_init(val, MEDIA_ACCESS_MODE_TYPE);
2396             g_value_set_enum(val, result);
2397         }
2398
2399         if (surety)
2400             *surety = PROPERTY_SURETY_GOOD;
2401
2402         if (source)
2403             *source = PROPERTY_SOURCE_DETECTED;
2404     }
2405
2406     return success;
2407 }
2408
2409 static gboolean
2410 property_get_max_volume_usage_fn(Device *dself,
2411     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2412     PropertySurety *surety, PropertySource *source)
2413 {
2414     RaitDevice *self = RAIT_DEVICE(dself);
2415     guint64 result;
2416     guint i;
2417     GPtrArray * ops;
2418     guint data_children;
2419
2420     ops = make_property_op_array(self, PROPERTY_MAX_VOLUME_USAGE, NULL, 0, 0);
2421     do_rait_child_ops(self, property_get_do_op, ops);
2422
2423     /* look for the smallest value that is set */
2424     result = 0;
2425     for (i = 0; i < ops->len; i ++) {
2426         guint64 cur;
2427         PropertyOp * op = g_ptr_array_index(ops, i);
2428
2429         if (!op->base.result || !G_VALUE_HOLDS_UINT64(&(op->value))) {
2430             continue; /* ignore children without this property */
2431         }
2432
2433         cur = g_value_get_uint64(&(op->value));
2434
2435         if (!result || (cur && cur < result)) {
2436             result = cur;
2437         }
2438     }
2439
2440     g_ptr_array_free_full(ops);
2441
2442     if (result) {
2443         /* result contains the minimum usage on any child.  We can use that space
2444          * on each of our data children, so the total is larger */
2445         find_simple_params(self, NULL, &data_children);
2446         result *= data_children;
2447
2448         if (val) {
2449             g_value_unset_init(val, G_TYPE_UINT64);
2450             g_value_set_uint64(val, result);
2451         }
2452
2453         if (surety)
2454             *surety = PROPERTY_SURETY_GOOD;
2455
2456         if (source)
2457             *source = PROPERTY_SOURCE_DETECTED;
2458
2459         return TRUE;
2460     } else {
2461         /* no result from any children, so we effectively don't have this property */
2462         return FALSE;
2463     }
2464 }
2465
2466 static gboolean
2467 property_set_max_volume_usage_fn(Device *dself,
2468     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2469     PropertySurety surety, PropertySource source)
2470 {
2471     RaitDevice *self = RAIT_DEVICE(dself);
2472     guint64 parent_usage;
2473     guint64 child_usage;
2474     GValue child_val;
2475     guint i;
2476     gboolean success;
2477     GPtrArray * ops;
2478     guint data_children;
2479
2480     parent_usage = g_value_get_uint64(val);
2481     find_simple_params(self, NULL, &data_children);
2482
2483     child_usage = parent_usage / data_children;
2484
2485     bzero(&child_val, sizeof(child_val));
2486     g_value_init(&child_val, G_TYPE_UINT64);
2487     g_value_set_uint64(&child_val, child_usage);
2488
2489     ops = make_property_op_array(self, PROPERTY_MAX_VOLUME_USAGE,
2490                                 &child_val, surety, source);
2491     do_rait_child_ops(self, property_set_do_op, ops);
2492
2493     /* if any of the kids succeeded, then we did too */
2494     success = FALSE;
2495     for (i = 0; i < ops->len; i ++) {
2496         PropertyOp * op = g_ptr_array_index(ops, i);
2497
2498         if (op->base.result) {
2499             success = TRUE;
2500             break;
2501         }
2502     }
2503
2504     g_ptr_array_free_full(ops);
2505
2506     return success;
2507 }
2508
2509 typedef struct {
2510     GenericOp base;
2511     guint filenum;
2512 } RecycleFileOp;
2513
2514 /* A GFunc */
2515 static void recycle_file_do_op(gpointer data,
2516                                gpointer user_data G_GNUC_UNUSED) {
2517     RecycleFileOp * op = data;
2518     op->base.result =
2519         GINT_TO_POINTER(device_recycle_file(op->base.child, op->filenum));
2520 }
2521
2522 static gboolean
2523 rait_device_recycle_file (Device * dself, guint filenum) {
2524     GPtrArray * ops;
2525     guint i;
2526     gboolean success;
2527
2528     RaitDevice * self = RAIT_DEVICE(dself);
2529
2530     if (rait_device_in_error(self)) return FALSE;
2531
2532     ops = g_ptr_array_sized_new(self->private->children->len);
2533     for (i = 0; i < self->private->children->len; i ++) {
2534         RecycleFileOp * op;
2535         op = g_new(RecycleFileOp, 1);
2536         op->base.child = g_ptr_array_index(self->private->children, i);
2537         op->filenum = filenum;
2538         g_ptr_array_add(ops, op);
2539     }
2540
2541     do_rait_child_ops(self, recycle_file_do_op, ops);
2542
2543     success = g_ptr_array_and(ops, extract_boolean_generic_op);
2544
2545     g_ptr_array_free_full(ops);
2546
2547     if (!success) {
2548         /* TODO: be more specific here */
2549         device_set_error(dself,
2550             stralloc(_("One or more devices failed to recycle_file")),
2551             DEVICE_STATUS_DEVICE_ERROR);
2552         return FALSE;
2553     }
2554     return TRUE;
2555 }
2556
2557 /* GFunc */
2558 static void finish_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
2559     GenericOp * op = data;
2560     op->result = GINT_TO_POINTER(device_finish(op->child));
2561 }
2562
2563 static gboolean
2564 rait_device_finish (Device * self) {
2565     GPtrArray * ops;
2566     gboolean success;
2567     gboolean rval = TRUE;
2568
2569     rval = !rait_device_in_error(self);
2570
2571     ops = make_generic_boolean_op_array(RAIT_DEVICE(self));
2572
2573     do_rait_child_ops(RAIT_DEVICE(self), finish_do_op, ops);
2574
2575     success = g_ptr_array_and(ops, extract_boolean_generic_op);
2576     if (!success)
2577         rval = FALSE;
2578
2579     g_ptr_array_free_full(ops);
2580
2581     self->access_mode = ACCESS_NULL;
2582
2583     return rval;
2584 }
2585
2586 static Device *
2587 rait_device_factory (char * device_name, char * device_type, char * device_node) {
2588     Device * rval;
2589     g_assert(0 == strcmp(device_type, "rait"));
2590     rval = DEVICE(g_object_new(TYPE_RAIT_DEVICE, NULL));
2591     device_open_device(rval, device_name, device_type, device_node);
2592     return rval;
2593 }
2594
2595 void
2596 rait_device_register (void) {
2597     static const char * device_prefix_list[] = {"rait", NULL};
2598     register_device(rait_device_factory, device_prefix_list);
2599 }