add bug closure to changelog
[debian/amanda] / device-src / rait-device.c
1 /*
2  * Copyright (c) 2007-2012 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  *
17  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 /* The RAIT device encapsulates some number of other devices into a single
22  * redundant device. */
23
24 #include "amanda.h"
25 #include "property.h"
26 #include "util.h"
27 #include <glib.h>
28 #include "glib-util.h"
29 #include "device.h"
30 #include "fileheader.h"
31 #include "amsemaphore.h"
32
33 /* Just a note about the failure mode of different operations:
34    - Recovers from a failure (enters degraded mode)
35      open_device()
36      seek_file() -- explodes if headers don't match.
37      seek_block() -- explodes if headers don't match.
38      read_block() -- explodes if data doesn't match.
39
40    - Operates in degraded mode (but dies if a new problem shows up)
41      read_label() -- but dies on label mismatch.
42      start() -- but dies when writing in degraded mode.
43      property functions
44      finish()
45
46    - Dies in degraded mode (even if remaining devices are OK)
47      start_file()
48      write_block()
49      finish_file()
50      recycle_file()
51 */
52
53 /*
54  * Type checking and casting macros
55  */
56 #define TYPE_RAIT_DEVICE        (rait_device_get_type())
57 #define RAIT_DEVICE(obj)        G_TYPE_CHECK_INSTANCE_CAST((obj), rait_device_get_type(), RaitDevice)
58 #define RAIT_DEVICE_CONST(obj)  G_TYPE_CHECK_INSTANCE_CAST((obj), rait_device_get_type(), RaitDevice const)
59 #define RAIT_DEVICE_CLASS(klass)        G_TYPE_CHECK_CLASS_CAST((klass), rait_device_get_type(), RaitDeviceClass)
60 #define IS_RAIT_DEVICE(obj)     G_TYPE_CHECK_INSTANCE_TYPE((obj), rait_device_get_type ())
61
62 #define RAIT_DEVICE_GET_CLASS(obj)      G_TYPE_INSTANCE_GET_CLASS((obj), rait_device_get_type(), RaitDeviceClass)
63 static GType    rait_device_get_type    (void);
64
65 /*
66  * Main object structure
67  */
68 typedef struct RaitDevice_s {
69     Device __parent__;
70
71     struct RaitDevicePrivate_s * private;
72 } RaitDevice;
73
74 /*
75  * Class definition
76  */
77 typedef struct _RaitDeviceClass RaitDeviceClass;
78 struct _RaitDeviceClass {
79     DeviceClass __parent__;
80 };
81
82 typedef enum {
83     RAIT_STATUS_COMPLETE, /* All subdevices OK. */
84     RAIT_STATUS_DEGRADED, /* One subdevice failed. */
85     RAIT_STATUS_FAILED    /* Two or more subdevices failed. */
86 } RaitStatus;
87
88 /* Older versions of glib have a deadlock in their thread pool implementations,
89  * so we include a simple thread-pool implementation here to replace it.
90  *
91  * This implementation assumes that threads are used for paralellizing a single
92  * operation, so all threads run a function to completion before the main thread
93  * continues.  This simplifies some of the locking semantics, and in particular
94  * there is no need to wait for stray threads to finish an operation when
95  * finalizing the RaitDevice object or when beginning a new operation.
96  */
97 #if !(GLIB_CHECK_VERSION(2,10,0))
98 #define USE_INTERNAL_THREADPOOL
99 #endif
100
101 typedef struct RaitDevicePrivate_s {
102     GPtrArray * children;
103     /* These flags are only relevant for reading. */
104     RaitStatus status;
105     /* If status == RAIT_STATUS_DEGRADED, this holds the index of the
106        failed node. It holds a negative number otherwise. */
107     int failed;
108
109     /* the child block size */
110     gsize child_block_size;
111
112 #ifdef USE_INTERNAL_THREADPOOL
113     /* array of ThreadInfo for performing parallel operations */
114     GArray *threads;
115
116     /* value of this semaphore is the number of threaded operations
117      * in progress */
118     amsemaphore_t *threads_sem;
119 #endif
120 } RaitDevicePrivate;
121
122 #ifdef USE_INTERNAL_THREADPOOL
123 typedef struct ThreadInfo {
124     GThread *thread;
125
126     /* struct fields below are protected by this mutex and condition variable */
127     GMutex *mutex;
128     GCond *cond;
129
130     gboolean die;
131     GFunc func;
132     gpointer data;
133
134     /* give threads access to active_threads and its mutex/cond */
135     struct RaitDevicePrivate_s *private;
136 } ThreadInfo;
137 #endif
138
139 /* This device uses a special sentinel node to indicate that the child devices
140  * will be set later (in rait_device_open).  It contains a control character to
141  * make it difficult to enter accidentally in an Amanda config. */
142 #define DEFER_CHILDREN_SENTINEL "DEFER\1"
143
144 #define PRIVATE(o) (o->private)
145
146 #define rait_device_in_error(dev) \
147     (device_in_error((dev)) || PRIVATE(RAIT_DEVICE((dev)))->status == RAIT_STATUS_FAILED)
148
149 void rait_device_register (void);
150
151 /* here are local prototypes */
152 static void rait_device_init (RaitDevice * o);
153 static void rait_device_class_init (RaitDeviceClass * c);
154 static void rait_device_base_init (RaitDeviceClass * c);
155 static void rait_device_open_device (Device * self, char * device_name, char * device_type, char * device_node);
156 static gboolean rait_device_start (Device * self, DeviceAccessMode mode,
157                                    char * label, char * timestamp);
158 static gboolean rait_device_configure(Device * self, gboolean use_global_config);
159 static gboolean rait_device_start_file(Device * self, dumpfile_t * info);
160 static gboolean rait_device_write_block (Device * self, guint size, gpointer data);
161 static gboolean rait_device_finish_file (Device * self);
162 static dumpfile_t * rait_device_seek_file (Device * self, guint file);
163 static gboolean rait_device_seek_block (Device * self, guint64 block);
164 static int      rait_device_read_block (Device * self, gpointer buf,
165                                         int * size);
166 static gboolean rait_device_recycle_file (Device * self, guint filenum);
167 static gboolean rait_device_finish (Device * self);
168 static DeviceStatusFlags rait_device_read_label(Device * dself);
169 static void find_simple_params(RaitDevice * self, guint * num_children,
170                                guint * data_children);
171
172 /* property handlers */
173
174 static gboolean property_get_block_size_fn(Device *self,
175     DevicePropertyBase *base, GValue *val,
176     PropertySurety *surety, PropertySource *source);
177
178 static gboolean property_set_block_size_fn(Device *self,
179     DevicePropertyBase *base, GValue *val,
180     PropertySurety surety, PropertySource source);
181
182 static gboolean property_get_canonical_name_fn(Device *self,
183     DevicePropertyBase *base, GValue *val,
184     PropertySurety *surety, PropertySource *source);
185
186 static gboolean property_get_concurrency_fn(Device *self,
187     DevicePropertyBase *base, GValue *val,
188     PropertySurety *surety, PropertySource *source);
189
190 static gboolean property_get_streaming_fn(Device *self,
191     DevicePropertyBase *base, GValue *val,
192     PropertySurety *surety, PropertySource *source);
193
194 static gboolean property_get_boolean_and_fn(Device *self,
195     DevicePropertyBase *base, GValue *val,
196     PropertySurety *surety, PropertySource *source);
197
198 static gboolean property_get_medium_access_type_fn(Device *self,
199     DevicePropertyBase *base, GValue *val,
200     PropertySurety *surety, PropertySource *source);
201
202 static gboolean property_get_max_volume_usage_fn(Device *self,
203     DevicePropertyBase *base, GValue *val,
204     PropertySurety *surety, PropertySource *source);
205
206 static gboolean property_set_max_volume_usage_fn(Device *self,
207     DevicePropertyBase *base, GValue *val,
208     PropertySurety surety, PropertySource source);
209
210
211 /* pointer to the class of our parent */
212 static DeviceClass *parent_class = NULL;
213
214 static GType
215 rait_device_get_type (void)
216 {
217     static GType type = 0;
218
219     if G_UNLIKELY(type == 0) {
220         static const GTypeInfo info = {
221             sizeof (RaitDeviceClass),
222             (GBaseInitFunc) rait_device_base_init,
223             (GBaseFinalizeFunc) NULL,
224             (GClassInitFunc) rait_device_class_init,
225             (GClassFinalizeFunc) NULL,
226             NULL /* class_data */,
227             sizeof (RaitDevice),
228             0 /* n_preallocs */,
229             (GInstanceInitFunc) rait_device_init,
230             NULL
231         };
232
233         type = g_type_register_static (TYPE_DEVICE, "RaitDevice", &info,
234                                        (GTypeFlags)0);
235         }
236
237     return type;
238 }
239
240 static void g_object_unref_foreach(gpointer data,
241                                    gpointer user_data G_GNUC_UNUSED) {
242     if (data != NULL && G_IS_OBJECT(data)) {
243         g_object_unref(data);
244     }
245 }
246
247 static void
248 rait_device_finalize(GObject *obj_self)
249 {
250     RaitDevice *self = RAIT_DEVICE (obj_self);
251     if(G_OBJECT_CLASS(parent_class)->finalize) \
252            (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
253     if(self->private->children) {
254         g_ptr_array_foreach(self->private->children,
255                             g_object_unref_foreach, NULL);
256         g_ptr_array_free (self->private->children, TRUE);
257         self->private->children = NULL;
258     }
259 #ifdef USE_INTERNAL_THREADPOOL
260     g_assert(PRIVATE(self)->threads_sem == NULL || PRIVATE(self)->threads_sem->value == 0);
261
262     if (PRIVATE(self)->threads) {
263         guint i;
264
265         for (i = 0; i < PRIVATE(self)->threads->len; i++) {
266             ThreadInfo *inf = &g_array_index(PRIVATE(self)->threads, ThreadInfo, i);
267             if (inf->thread) {
268                 /* NOTE: the thread is waiting on this condition right now, not
269                  * executing an operation. */
270
271                 /* ask the thread to die */
272                 g_mutex_lock(inf->mutex);
273                 inf->die = TRUE;
274                 g_cond_signal(inf->cond);
275                 g_mutex_unlock(inf->mutex);
276
277                 /* and wait for it to die, which should happen soon */
278                 g_thread_join(inf->thread);
279             }
280
281             if (inf->mutex)
282                 g_mutex_free(inf->mutex);
283             if (inf->cond)
284                 g_cond_free(inf->cond);
285         }
286     }
287
288     if (PRIVATE(self)->threads_sem)
289         amsemaphore_free(PRIVATE(self)->threads_sem);
290 #endif
291     amfree(self->private);
292 }
293
294 static void
295 rait_device_init (RaitDevice * o G_GNUC_UNUSED)
296 {
297     PRIVATE(o) = g_new(RaitDevicePrivate, 1);
298     PRIVATE(o)->children = g_ptr_array_new();
299     PRIVATE(o)->status = RAIT_STATUS_COMPLETE;
300     PRIVATE(o)->failed = -1;
301 #ifdef USE_INTERNAL_THREADPOOL
302     PRIVATE(o)->threads = NULL;
303     PRIVATE(o)->threads_sem = NULL;
304 #endif
305 }
306
307 static void
308 rait_device_class_init (RaitDeviceClass * c)
309 {
310     GObjectClass *g_object_class = (GObjectClass*) c;
311     DeviceClass *device_class = (DeviceClass *)c;
312
313     parent_class = g_type_class_ref (TYPE_DEVICE);
314
315     device_class->open_device = rait_device_open_device;
316     device_class->configure = rait_device_configure;
317     device_class->start = rait_device_start;
318     device_class->start_file = rait_device_start_file;
319     device_class->write_block = rait_device_write_block;
320     device_class->finish_file = rait_device_finish_file;
321     device_class->seek_file = rait_device_seek_file;
322     device_class->seek_block = rait_device_seek_block;
323     device_class->read_block = rait_device_read_block;
324     device_class->recycle_file = rait_device_recycle_file;
325     device_class->finish = rait_device_finish;
326     device_class->read_label = rait_device_read_label;
327
328     g_object_class->finalize = rait_device_finalize;
329
330 #ifndef USE_INTERNAL_THREADPOOL
331 #if !GLIB_CHECK_VERSION(2,10,2)
332     /* Versions of glib before 2.10.2 crash if
333      * g_thread_pool_set_max_unused_threads is called before the first
334      * invocation of g_thread_pool_new.  So we make up a thread pool, but don't
335      * start any threads in it, and free it */
336     {
337         GThreadPool *pool = g_thread_pool_new((GFunc)-1, NULL, -1, FALSE, NULL);
338         g_thread_pool_free(pool, TRUE, FALSE);
339     }
340 #endif
341
342     g_thread_pool_set_max_unused_threads(-1);
343 #endif
344 }
345
346 static void
347 rait_device_base_init (RaitDeviceClass * c)
348 {
349     DeviceClass *device_class = (DeviceClass *)c;
350
351     /* the RAIT device overrides most of the standard properties, so that it
352      * can calculate them by querying the same property on the children */
353     device_class_register_property(device_class, PROPERTY_BLOCK_SIZE,
354             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
355             property_get_block_size_fn,
356             property_set_block_size_fn);
357
358     device_class_register_property(device_class, PROPERTY_CANONICAL_NAME,
359             PROPERTY_ACCESS_GET_MASK,
360             property_get_canonical_name_fn, NULL);
361
362     device_class_register_property(device_class, PROPERTY_CONCURRENCY,
363             PROPERTY_ACCESS_GET_MASK,
364             property_get_concurrency_fn, NULL);
365
366     device_class_register_property(device_class, PROPERTY_STREAMING,
367             PROPERTY_ACCESS_GET_MASK,
368             property_get_streaming_fn, NULL);
369
370     device_class_register_property(device_class, PROPERTY_APPENDABLE,
371             PROPERTY_ACCESS_GET_MASK,
372             property_get_boolean_and_fn, NULL);
373
374     device_class_register_property(device_class, PROPERTY_PARTIAL_DELETION,
375             PROPERTY_ACCESS_GET_MASK,
376             property_get_boolean_and_fn, NULL);
377
378     device_class_register_property(device_class, PROPERTY_FULL_DELETION,
379             PROPERTY_ACCESS_GET_MASK,
380             property_get_boolean_and_fn, NULL);
381
382     device_class_register_property(device_class, PROPERTY_LEOM,
383             PROPERTY_ACCESS_GET_MASK,
384             property_get_boolean_and_fn, NULL);
385
386     device_class_register_property(device_class, PROPERTY_MEDIUM_ACCESS_TYPE,
387             PROPERTY_ACCESS_GET_MASK,
388             property_get_medium_access_type_fn, NULL);
389
390     device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
391             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
392             property_get_max_volume_usage_fn,
393             property_set_max_volume_usage_fn);
394 }
395
396 /* This function does something a little clever and a little
397  * complicated. It takes an array of operations and runs the given
398  * function on each element in the array. The trick is that it runs them
399  * all in parallel, in different threads. This is more efficient than it
400  * sounds because we use a GThreadPool, which means calling this function
401  * will probably not start any new threads at all, but rather use
402  * existing ones. The func is called with two gpointer arguments: The
403  * first from the array, the second is the data argument.
404  *
405  * When it returns, all the operations have been successfully
406  * executed. If you want results from your operations, do it yourself
407  * through the array.
408  */
409
410 #ifdef USE_INTERNAL_THREADPOOL
411 static gpointer rait_thread_pool_func(gpointer data) {
412     ThreadInfo *inf = data;
413
414     g_mutex_lock(inf->mutex);
415     while (TRUE) {
416         while (!inf->die && !inf->func)
417             g_cond_wait(inf->cond, inf->mutex);
418
419         if (inf->die)
420             break;
421
422         if (inf->func) {
423             /* invoke the function */
424             inf->func(inf->data, NULL);
425             inf->func = NULL;
426             inf->data = NULL;
427
428             /* indicate that we're finished; will not block */
429             amsemaphore_down(inf->private->threads_sem);
430         }
431     }
432     g_mutex_unlock(inf->mutex);
433     return NULL;
434 }
435
436 static void do_thread_pool_op(RaitDevice *self, GFunc func, GPtrArray * ops) {
437     guint i;
438
439     if (PRIVATE(self)->threads_sem == NULL)
440         PRIVATE(self)->threads_sem = amsemaphore_new_with_value(0);
441
442     if (PRIVATE(self)->threads == NULL)
443         PRIVATE(self)->threads = g_array_sized_new(FALSE, TRUE,
444                                             sizeof(ThreadInfo), ops->len);
445
446     g_assert(PRIVATE(self)->threads_sem->value == 0);
447
448     if (PRIVATE(self)->threads->len < ops->len)
449         g_array_set_size(PRIVATE(self)->threads, ops->len);
450
451     /* the semaphore will hit zero when each thread has decremented it */
452     amsemaphore_force_set(PRIVATE(self)->threads_sem, ops->len);
453
454     for (i = 0; i < ops->len; i++) {
455         ThreadInfo *inf = &g_array_index(PRIVATE(self)->threads, ThreadInfo, i);
456         if (!inf->thread) {
457             inf->mutex = g_mutex_new();
458             inf->cond = g_cond_new();
459             inf->private = PRIVATE(self);
460             inf->thread = g_thread_create(rait_thread_pool_func, inf, TRUE, NULL);
461         }
462
463         /* set up the info the thread needs and trigger it to start */
464         g_mutex_lock(inf->mutex);
465         inf->data = g_ptr_array_index(ops, i);
466         inf->func = func;
467         g_cond_signal(inf->cond);
468         g_mutex_unlock(inf->mutex);
469     }
470
471     /* wait until semaphore hits zero */
472     amsemaphore_wait_empty(PRIVATE(self)->threads_sem);
473 }
474
475 #else /* USE_INTERNAL_THREADPOOL */
476
477 static void do_thread_pool_op(RaitDevice *self G_GNUC_UNUSED, GFunc func, GPtrArray * ops) {
478     GThreadPool * pool;
479     guint i;
480
481     pool = g_thread_pool_new(func, NULL, -1, FALSE, NULL);
482     for (i = 0; i < ops->len; i ++) {
483         g_thread_pool_push(pool, g_ptr_array_index(ops, i), NULL);
484     }
485
486     g_thread_pool_free(pool, FALSE, TRUE);
487 }
488
489 #endif /* USE_INTERNAL_THREADPOOL */
490
491 /* This does the above, in a serial fashion (and without using threads) */
492 static void do_unthreaded_ops(RaitDevice *self G_GNUC_UNUSED, GFunc func, GPtrArray * ops) {
493     guint i;
494
495     for (i = 0; i < ops->len; i ++) {
496         func(g_ptr_array_index(ops, i), NULL);
497     }
498 }
499
500 /* This is the one that code below should call. It switches
501    automatically between do_thread_pool_op and do_unthreaded_ops,
502    depending on g_thread_supported(). */
503 static void do_rait_child_ops(RaitDevice *self, GFunc func, GPtrArray * ops) {
504     if (g_thread_supported()) {
505         do_thread_pool_op(self, func, ops);
506     } else {
507         do_unthreaded_ops(self, func, ops);
508     }
509 }
510
511 static char *
512 child_device_names_to_rait_name(RaitDevice * self) {
513     GPtrArray *kids;
514     char *braced, *result;
515     guint i;
516
517     kids = g_ptr_array_sized_new(self->private->children->len);
518     for (i = 0; i < self->private->children->len; i ++) {
519         Device *child = g_ptr_array_index(self->private->children, i);
520         const char *child_name = NULL;
521         GValue val;
522         gboolean got_prop = FALSE;
523
524         bzero(&val, sizeof(val));
525
526         if ((signed)i != self->private->failed) {
527             if (device_property_get(child, PROPERTY_CANONICAL_NAME, &val)) {
528                 child_name = g_value_get_string(&val);
529                 got_prop = TRUE;
530             }
531         }
532
533         if (!got_prop)
534             child_name = "MISSING";
535
536         g_ptr_array_add(kids, g_strdup(child_name));
537
538         if (got_prop)
539             g_value_unset(&val);
540     }
541
542     braced = collapse_braced_alternates(kids);
543     result = g_strdup_printf("rait:%s", braced);
544     g_free(braced);
545
546     return result;
547 }
548
549 /* Find a workable child block size, based on the block size ranges of our
550  * child devices.
551  *
552  * The algorithm is to construct the intersection of all child devices'
553  * [min,max] block size ranges, and then pick the block size closest to 32k
554  * that is in the resulting range.  This avoids picking ridiculously small (1
555  * byte) or large (INT_MAX) block sizes when using devices with wide-open block
556  * size ranges.
557
558  * This function returns the calculated child block size directly, and the RAIT
559  * device's blocksize via rait_size, if not NULL.  It is resilient to errors in
560  * a single child device, but sets the device's error status and returns 0 if
561  * it cannot determine an agreeable block size.
562  */
563 static gsize
564 calculate_block_size_from_children(RaitDevice * self, gsize *rait_size)
565 {
566     gsize min = 0;
567     gsize max = SIZE_MAX;
568     gboolean found_one = FALSE;
569     gsize result;
570     guint i;
571
572     for (i = 0; i < self->private->children->len; i ++) {
573         gsize child_min = SIZE_MAX, child_max = 0;
574         Device *child;
575         GValue property_result;
576         PropertySource source;
577
578         bzero(&property_result, sizeof(property_result));
579
580         if ((signed)i == self->private->failed)
581             continue;
582
583         child = g_ptr_array_index(self->private->children, i);
584         if (!device_property_get_ex(child, PROPERTY_BLOCK_SIZE,
585                                  &property_result, NULL, &source)) {
586             g_warning("Error getting BLOCK_SIZE from %s: %s",
587                     child->device_name, device_error_or_status(child));
588             continue;
589         }
590
591         /* if the block size has been set explicitly, then we need to use that blocksize;
592          * otherwise (even if it was DETECTED), override it. */
593         if (source == PROPERTY_SOURCE_USER) {
594             child_min = child_max = g_value_get_int(&property_result);
595         } else {
596             if (!device_property_get(child, PROPERTY_MIN_BLOCK_SIZE,
597                                      &property_result)) {
598                 g_warning("Error getting MIN_BLOCK_SIZE from %s: %s",
599                         child->device_name, device_error_or_status(child));
600                 continue;
601             }
602             child_min = g_value_get_uint(&property_result);
603
604             if (!device_property_get(child, PROPERTY_MAX_BLOCK_SIZE,
605                                      &property_result)) {
606                 g_warning("Error getting MAX_BLOCK_SIZE from %s: %s",
607                         child->device_name, device_error_or_status(child));
608                 continue;
609             }
610             child_max = g_value_get_uint(&property_result);
611
612             if (child_min == 0 || child_max == 0 || (child_min > child_max)) {
613                 g_warning("Invalid min, max block sizes from %s", child->device_name);
614                 continue;
615             }
616         }
617
618         found_one = TRUE;
619         min = MAX(min, child_min);
620         max = MIN(max, child_max);
621     }
622
623     if (!found_one) {
624         device_set_error((Device*)self,
625             stralloc(_("Could not find any child devices' block size ranges")),
626             DEVICE_STATUS_DEVICE_ERROR);
627         return 0;
628     }
629
630     if (min > max) {
631         device_set_error((Device*)self,
632             stralloc(_("No block size is acceptable to all child devices")),
633             DEVICE_STATUS_DEVICE_ERROR);
634         return 0;
635     }
636
637     /* Now pick a number.  If 32k is in range, we use that; otherwise, we use
638      * the nearest acceptable size. */
639     result = CLAMP(32768, min, max);
640
641     if (rait_size) {
642         guint data_children;
643         find_simple_params(self, NULL, &data_children);
644         *rait_size = result * data_children;
645     }
646
647     return result;
648 }
649
650 /* Set BLOCK_SIZE on all children */
651 static gboolean
652 set_block_size_on_children(RaitDevice *self, gsize child_block_size)
653 {
654     GValue val;
655     guint i;
656     PropertySource source;
657
658     bzero(&val, sizeof(val));
659
660     g_assert(child_block_size < INT_MAX);
661     g_value_init(&val, G_TYPE_INT);
662     g_value_set_int(&val, (gint)child_block_size);
663
664     for (i = 0; i < self->private->children->len; i ++) {
665         Device *child;
666         GValue property_result;
667
668         bzero(&property_result, sizeof(property_result));
669
670         if ((signed)i == self->private->failed)
671             continue;
672
673         child = g_ptr_array_index(self->private->children, i);
674
675         /* first, make sure the block size is at its default, or is already
676          * correct */
677         if (device_property_get_ex(child, PROPERTY_BLOCK_SIZE,
678                                  &property_result, NULL, &source)) {
679             gsize from_child = g_value_get_int(&property_result);
680             g_value_unset(&property_result);
681             if (source != PROPERTY_SOURCE_DEFAULT
682                     && from_child != child_block_size) {
683                 device_set_error((Device *)self,
684                     vstrallocf(_("Child device %s already has its block size set to %zd, not %zd"),
685                                 child->device_name, from_child, child_block_size),
686                     DEVICE_STATUS_DEVICE_ERROR);
687                 return FALSE;
688             }
689         } else {
690             /* failing to get the block size isn't necessarily fatal.. */
691             g_warning("Error getting BLOCK_SIZE from %s: %s",
692                     child->device_name, device_error_or_status(child));
693         }
694
695         if (!device_property_set(child, PROPERTY_BLOCK_SIZE, &val)) {
696             device_set_error((Device *)self,
697                 vstrallocf(_("Error setting block size on %s"), child->device_name),
698                 DEVICE_STATUS_DEVICE_ERROR);
699             return FALSE;
700         }
701     }
702
703     return TRUE;
704 }
705
706 /* The time for users to specify block sizes has ended; set this device's
707  * block-size attributes for easy access by other RAIT functions.  Returns
708  * FALSE on error, with the device's error status already set. */
709 static gboolean
710 fix_block_size(RaitDevice *self)
711 {
712     Device *dself = (Device *)self;
713     gsize my_block_size, child_block_size;
714
715     if (dself->block_size_source == PROPERTY_SOURCE_DEFAULT) {
716         child_block_size = calculate_block_size_from_children(self, &my_block_size);
717         if (child_block_size == 0)
718             return FALSE;
719
720         self->private->child_block_size = child_block_size;
721         dself->block_size = my_block_size;
722         dself->block_size_surety = PROPERTY_SURETY_GOOD;
723         dself->block_size_source = PROPERTY_SOURCE_DETECTED;
724     } else {
725         guint data_children;
726
727         find_simple_params(self, NULL, &data_children);
728         g_assert((dself->block_size % data_children) == 0);
729         child_block_size = dself->block_size / data_children;
730     }
731
732     /* now tell the children we mean it */
733     if (!set_block_size_on_children(self, child_block_size))
734         return FALSE;
735
736     return TRUE;
737 }
738
739 /* This structure contains common fields for many operations. Not all
740    operations use all fields, however. */
741 typedef struct {
742     gpointer result; /* May be a pointer; may be an integer or boolean
743                         stored with GINT_TO_POINTER. */
744     Device * child;  /* The device in question. Used by all
745                         operations. */
746     guint child_index; /* For recoverable operations (read-related
747                           operations), this field provides the number
748                           of this child in the self->private->children
749                           array. */
750 } GenericOp;
751
752 typedef gboolean (*BooleanExtractor)(gpointer data);
753
754 /* A BooleanExtractor */
755 static gboolean extract_boolean_generic_op(gpointer data) {
756     GenericOp * op = data;
757     return GPOINTER_TO_INT(op->result);
758 }
759
760 /* A BooleanExtractor */
761 static gboolean extract_boolean_pointer_op(gpointer data) {
762     GenericOp * op = data;
763     return op->result != NULL;
764 }
765
766 /* Does the equivalent of this perl command:
767      ! (first { !extractor($_) } @_
768    That is, calls extractor on each element of the array, and returns
769    TRUE if and only if all calls to extractor return TRUE. This function
770    stops as soon as an extractor returns false, so it's best if extractor
771    functions have no side effects.
772 */
773 static gboolean g_ptr_array_and(GPtrArray * array,
774                                 BooleanExtractor extractor) {
775     guint i;
776     if (array == NULL || array->len <= 0)
777         return FALSE;
778
779     for (i = 0; i < array->len; i ++) {
780         if (!extractor(g_ptr_array_index(array, i)))
781             return FALSE;
782     }
783
784     return TRUE;
785 }
786
787 /* Takes a RaitDevice, and makes a GPtrArray of GenericOp. */
788 static GPtrArray * make_generic_boolean_op_array(RaitDevice* self) {
789     GPtrArray * rval;
790     guint i;
791
792     rval = g_ptr_array_sized_new(self->private->children->len);
793     for (i = 0; i < self->private->children->len; i ++) {
794         GenericOp * op;
795
796         if ((signed)i == self->private->failed) {
797             continue;
798         }
799
800         op = g_new(GenericOp, 1);
801         op->child = g_ptr_array_index(self->private->children, i);
802         op->child_index = i;
803         g_ptr_array_add(rval, op);
804     }
805
806     return rval;
807 }
808
809 /* Takes a GPtrArray of GenericOp, and a BooleanExtractor, and does
810    all the proper handling for the result of operations that allow
811    device isolation. Returns FALSE only if an unrecoverable error
812    occured. */
813 static gboolean g_ptr_array_union_robust(RaitDevice * self, GPtrArray * ops,
814                                          BooleanExtractor extractor) {
815     int nfailed = 0;
816     int lastfailed = 0;
817     guint i;
818
819     /* We found one or more failed elements.  See which elements failed, and
820      * isolate them*/
821     for (i = 0; i < ops->len; i ++) {
822         GenericOp * op = g_ptr_array_index(ops, i);
823         if (!extractor(op)) {
824             self->private->failed = op->child_index;
825             g_warning("RAIT array %s isolated device %s: %s",
826                     DEVICE(self)->device_name,
827                     op->child->device_name,
828                     device_error(op->child));
829             nfailed++;
830             lastfailed = i;
831         }
832     }
833
834     /* no failures? great! */
835     if (nfailed == 0)
836         return TRUE;
837
838     /* a single failure in COMPLETE just puts us in DEGRADED mode */
839     if (self->private->status == RAIT_STATUS_COMPLETE && nfailed == 1) {
840         self->private->status = RAIT_STATUS_DEGRADED;
841         self->private->failed = lastfailed;
842         g_warning("RAIT array %s DEGRADED", DEVICE(self)->device_name);
843         return TRUE;
844     } else {
845         self->private->status = RAIT_STATUS_FAILED;
846         g_warning("RAIT array %s FAILED", DEVICE(self)->device_name);
847         return FALSE;
848     }
849 }
850
851 typedef struct {
852     RaitDevice * self;
853     char *rait_name;
854     char * device_name; /* IN */
855     Device * result;    /* OUT */
856 } OpenDeviceOp;
857
858 /* A GFunc. */
859 static void device_open_do_op(gpointer data,
860                               gpointer user_data G_GNUC_UNUSED) {
861     OpenDeviceOp * op = data;
862
863     if (strcmp(op->device_name, "ERROR") == 0 ||
864         strcmp(op->device_name, "MISSING") == 0 ||
865         strcmp(op->device_name, "DEGRADED") == 0) {
866         g_warning("RAIT device %s contains a missing element, attempting "
867                   "degraded mode.\n", op->rait_name);
868         op->result = NULL;
869     } else {
870         op->result = device_open(op->device_name);
871     }
872 }
873
874 /* Returns TRUE if and only if the volume label and time are equal. */
875 static gboolean compare_volume_results(Device * a, Device * b) {
876     return (0 == compare_possibly_null_strings(a->volume_time, b->volume_time)
877          && 0 == compare_possibly_null_strings(a->volume_label, b->volume_label));
878 }
879
880 /* Stickes new_message at the end of *old_message; frees new_message and
881  * may change *old_message. */
882 static void append_message(char ** old_message, char * new_message) {
883     char * rval;
884     if (*old_message == NULL || **old_message == '\0') {
885         rval = new_message;
886     } else {
887         rval = g_strdup_printf("%s; %s", *old_message, new_message);
888         amfree(new_message);
889     }
890     amfree(*old_message);
891     *old_message = rval;
892 }
893
894 static gboolean
895 open_child_devices (Device * dself, char * device_name,
896             char * device_node) {
897     GPtrArray *device_names;
898     GPtrArray * device_open_ops;
899     guint i;
900     gboolean failure;
901     char *failure_errmsgs;
902     DeviceStatusFlags failure_flags;
903     RaitDevice * self;
904
905     self = RAIT_DEVICE(dself);
906
907     device_names = expand_braced_alternates(device_node);
908
909     if (device_names == NULL) {
910         device_set_error(dself,
911             vstrallocf(_("Invalid RAIT device name '%s'"), device_name),
912             DEVICE_STATUS_DEVICE_ERROR);
913         return FALSE;
914     }
915
916     /* Open devices in a separate thread, in case they have to rewind etc. */
917     device_open_ops = g_ptr_array_new();
918
919     for (i = 0; i < device_names->len; i++) {
920         OpenDeviceOp *op;
921         char *name = g_ptr_array_index(device_names, i);
922
923         op = g_new(OpenDeviceOp, 1);
924         op->device_name = name;
925         op->result = NULL;
926         op->self = self;
927         op->rait_name = device_name;
928         g_ptr_array_add(device_open_ops, op);
929     }
930
931     g_ptr_array_free(device_names, TRUE);
932     do_rait_child_ops(self, device_open_do_op, device_open_ops);
933
934     failure = FALSE;
935     failure_errmsgs = NULL;
936     failure_flags = 0;
937
938     /* Check results of opening devices. */
939     for (i = 0; i < device_open_ops->len; i ++) {
940         OpenDeviceOp *op = g_ptr_array_index(device_open_ops, i);
941
942         if (op->result != NULL &&
943             op->result->status == DEVICE_STATUS_SUCCESS) {
944             g_ptr_array_add(self->private->children, op->result);
945         } else {
946             char * this_failure_errmsg =
947                 g_strdup_printf("%s: %s", op->device_name,
948                                 device_error_or_status(op->result));
949             DeviceStatusFlags status =
950                 op->result == NULL ?
951                     DEVICE_STATUS_DEVICE_ERROR : op->result->status;
952             append_message(&failure_errmsgs,
953                            strdup(this_failure_errmsg));
954             failure_flags |= status;
955             if (self->private->status == RAIT_STATUS_COMPLETE) {
956                 /* The first failure just puts us in degraded mode. */
957                 g_warning("%s: %s",
958                           device_name, this_failure_errmsg);
959                 g_warning("%s: %s failed, entering degraded mode.",
960                           device_name, op->device_name);
961                 g_ptr_array_add(self->private->children, op->result);
962                 self->private->status = RAIT_STATUS_DEGRADED;
963                 self->private->failed = i;
964             } else {
965                 /* The second and further failures are fatal. */
966                 failure = TRUE;
967             }
968         }
969         amfree(op->device_name);
970     }
971
972     g_ptr_array_free_full(device_open_ops);
973
974     if (failure) {
975         self->private->status = RAIT_STATUS_FAILED;
976         device_set_error(dself, failure_errmsgs, failure_flags);
977         return FALSE;
978     }
979
980     return TRUE;
981 }
982
983 static void
984 rait_device_open_device (Device * dself, char * device_name,
985             char * device_type G_GNUC_UNUSED, char * device_node) {
986
987     if (0 != strcmp(device_node, DEFER_CHILDREN_SENTINEL)) {
988         if (!open_child_devices(dself, device_name, device_node))
989             return;
990
991         /* Chain up. */
992         if (parent_class->open_device) {
993             parent_class->open_device(dself, device_name, device_type, device_node);
994         }
995     }
996 }
997
998 Device *
999 rait_device_open_from_children (GSList *child_devices) {
1000     Device *dself;
1001     RaitDevice *self;
1002     GSList *iter;
1003     char *device_name;
1004     int nfailures;
1005     int i;
1006
1007     /* first, open a RAIT device using the DEFER_CHILDREN_SENTINEL */
1008     dself = device_open("rait:" DEFER_CHILDREN_SENTINEL);
1009     if (!IS_RAIT_DEVICE(dself)) {
1010         return dself;
1011     }
1012
1013     /* set its children */
1014     self = RAIT_DEVICE(dself);
1015     nfailures = 0;
1016     for (i=0, iter = child_devices; iter; i++, iter = iter->next) {
1017         Device *kid = iter->data;
1018
1019         /* a NULL kid is OK -- it opens the device in degraded mode */
1020         if (!kid) {
1021             nfailures++;
1022             self->private->failed = i;
1023         } else {
1024             g_assert(IS_DEVICE(kid));
1025             g_object_ref((GObject *)kid);
1026         }
1027
1028         g_ptr_array_add(self->private->children, kid);
1029     }
1030
1031     /* and set the status based on the children */
1032     switch (nfailures) {
1033         case 0:
1034             self->private->status = RAIT_STATUS_COMPLETE;
1035             break;
1036
1037         case 1:
1038             self->private->status = RAIT_STATUS_DEGRADED;
1039             break;
1040
1041         default:
1042             self->private->status = RAIT_STATUS_FAILED;
1043             device_set_error(dself,
1044                     _("more than one child device is missing"),
1045                     DEVICE_STATUS_DEVICE_ERROR);
1046             break;
1047     }
1048
1049     /* create a name from the children's names and use it to chain up
1050      * to open_device (we skipped this step in rait_device_open_device) */
1051     device_name = child_device_names_to_rait_name(self);
1052
1053     if (parent_class->open_device) {
1054         parent_class->open_device(dself,
1055             device_name, "rait",
1056             device_name+5); /* (+5 skips "rait:") */
1057     }
1058
1059     return dself;
1060 }
1061
1062 /* A GFunc. */
1063 static void read_label_do_op(gpointer data,
1064                              gpointer user_data G_GNUC_UNUSED) {
1065     GenericOp * op = data;
1066     op->result = GINT_TO_POINTER(device_read_label(op->child));
1067 }
1068
1069 static DeviceStatusFlags rait_device_read_label(Device * dself) {
1070     RaitDevice * self;
1071     GPtrArray * ops;
1072     DeviceStatusFlags failed_result = 0;
1073     char *failed_errmsg = NULL;
1074     unsigned int i;
1075     Device * first_success = NULL;
1076
1077     self = RAIT_DEVICE(dself);
1078
1079     amfree(dself->volume_time);
1080     amfree(dself->volume_label);
1081     dumpfile_free(dself->volume_header);
1082     dself->volume_header = NULL;
1083
1084     if (rait_device_in_error(self))
1085         return dself->status | DEVICE_STATUS_DEVICE_ERROR;
1086
1087     /* nail down our block size, if we haven't already */
1088     if (!fix_block_size(self))
1089         return FALSE;
1090
1091     ops = make_generic_boolean_op_array(self);
1092
1093     do_rait_child_ops(self, read_label_do_op, ops);
1094
1095     for (i = 0; i < ops->len; i ++) {
1096         GenericOp * op = g_ptr_array_index(ops, i);
1097         DeviceStatusFlags result = GPOINTER_TO_INT(op->result);
1098         if (op->result == DEVICE_STATUS_SUCCESS) {
1099             if (first_success == NULL) {
1100                 /* This is the first successful device. */
1101                 first_success = op->child;
1102             } else if (!compare_volume_results(first_success, op->child)) {
1103                 /* Doesn't match. :-( */
1104                 failed_errmsg = vstrallocf("Inconsistent volume labels/datestamps: "
1105                         "Got %s/%s on %s against %s/%s on %s.",
1106                         first_success->volume_label,
1107                         first_success->volume_time,
1108                         first_success->device_name,
1109                         op->child->volume_label,
1110                         op->child->volume_time,
1111                         op->child->device_name);
1112                 g_warning("%s", failed_errmsg);
1113                 failed_result |= DEVICE_STATUS_VOLUME_ERROR;
1114             }
1115         } else {
1116             failed_result |= result;
1117         }
1118     }
1119
1120     if (failed_result != DEVICE_STATUS_SUCCESS) {
1121         /* We had multiple failures or an inconsistency. */
1122         device_set_error(dself, failed_errmsg, failed_result);
1123     } else {
1124         /* Everything peachy. */
1125         amfree(failed_errmsg);
1126
1127         g_assert(first_success != NULL);
1128         if (first_success->volume_label != NULL) {
1129             dself->volume_label = g_strdup(first_success->volume_label);
1130         }
1131         if (first_success->volume_time != NULL) {
1132             dself->volume_time = g_strdup(first_success->volume_time);
1133         }
1134         if (first_success->volume_header != NULL) {
1135             dself->volume_header = dumpfile_copy(first_success->volume_header);
1136         }
1137         dself->header_block_size = first_success->header_block_size;
1138     }
1139
1140     g_ptr_array_free_full(ops);
1141
1142     return dself->status;
1143 }
1144
1145 typedef struct {
1146     GenericOp base;
1147     DeviceAccessMode mode; /* IN */
1148     char * label;          /* IN */
1149     char * timestamp;      /* IN */
1150 } StartOp;
1151
1152 /* A GFunc. */
1153 static void start_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1154     DeviceClass *klass;
1155     StartOp * param = data;
1156
1157     klass = DEVICE_GET_CLASS(param->base.child);
1158     if (klass->start) {
1159         param->base.result =
1160             GINT_TO_POINTER((klass->start)(param->base.child,
1161                                             param->mode, param->label,
1162                                             param->timestamp));
1163     } else {
1164         param->base.result = FALSE;
1165     }
1166 }
1167
1168 static gboolean
1169 rait_device_configure(Device * dself, gboolean use_global_config)
1170 {
1171     RaitDevice *self = RAIT_DEVICE(dself);
1172     guint i;
1173
1174     for (i = 0; i < self->private->children->len; i ++) {
1175         Device *child;
1176
1177         if ((signed)i == self->private->failed)
1178             continue;
1179
1180         child = g_ptr_array_index(self->private->children, i);
1181         /* unconditionally configure the child without the global
1182          * configuration */
1183         if (!device_configure(child, FALSE))
1184             return FALSE;
1185     }
1186
1187     if (parent_class->configure) {
1188         return parent_class->configure(dself, use_global_config);
1189     }
1190
1191     return TRUE;
1192 }
1193
1194 static gboolean
1195 rait_device_start (Device * dself, DeviceAccessMode mode, char * label,
1196                    char * timestamp) {
1197     GPtrArray * ops;
1198     guint i;
1199     gboolean success;
1200     RaitDevice * self;
1201     DeviceStatusFlags total_status;
1202     char *failure_errmsgs = NULL;
1203     char * label_from_device = NULL;
1204
1205     self = RAIT_DEVICE(dself);
1206
1207     if (rait_device_in_error(self)) return FALSE;
1208
1209     /* No starting in degraded mode. */
1210     if (self->private->status != RAIT_STATUS_COMPLETE &&
1211         (mode == ACCESS_WRITE || mode == ACCESS_APPEND)) {
1212         device_set_error(dself,
1213                          g_strdup_printf(_("RAIT device %s is read-only "
1214                                            "because it is in degraded mode.\n"),
1215                                          dself->device_name),
1216                          DEVICE_STATUS_DEVICE_ERROR);
1217         return FALSE;
1218     }
1219
1220     /* nail down our block size, if we haven't already */
1221     if (!fix_block_size(self))
1222         return FALSE;
1223
1224     dself->access_mode = mode;
1225     g_mutex_lock(dself->device_mutex);
1226     dself->in_file = FALSE;
1227     g_mutex_unlock(dself->device_mutex);
1228     amfree(dself->volume_label);
1229     amfree(dself->volume_time);
1230     dumpfile_free(dself->volume_header);
1231     dself->volume_header = NULL;
1232
1233     ops = g_ptr_array_sized_new(self->private->children->len);
1234     for (i = 0; i < self->private->children->len; i ++) {
1235         StartOp * op;
1236
1237         if ((signed)i == self->private->failed) {
1238             continue;
1239         }
1240
1241         op = g_new(StartOp, 1);
1242         op->base.child = g_ptr_array_index(self->private->children, i);
1243         op->mode = mode;
1244         op->label = g_strdup(label);
1245         op->timestamp = g_strdup(timestamp);
1246         g_ptr_array_add(ops, op);
1247     }
1248
1249     do_rait_child_ops(self, start_do_op, ops);
1250
1251     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1252
1253     /* Check results of starting devices; this is mostly about the
1254      * VOLUME_UNLABELED flag. */
1255     total_status = 0;
1256     for (i = 0; i < ops->len; i ++) {
1257         StartOp * op = g_ptr_array_index(ops, i);
1258         Device *child = op->base.child;
1259
1260         total_status |= child->status;
1261         if (child->status != DEVICE_STATUS_SUCCESS) {
1262             /* record the error message and move on. */
1263             append_message(&failure_errmsgs,
1264                            g_strdup_printf("%s: %s",
1265                                            child->device_name,
1266                                            device_error_or_status(child)));
1267         } else {
1268             if (child->volume_label != NULL && child->volume_time != NULL) {
1269                 if (label_from_device) {
1270                     if (strcmp(child->volume_label, dself->volume_label) != 0 ||
1271                         strcmp(child->volume_time, dself->volume_time) != 0) {
1272                         /* Mismatch! (Two devices provided different labels) */
1273                         char * this_message =
1274                             g_strdup_printf("%s: Label (%s/%s) is different "
1275                                             "from label (%s/%s) found at "
1276                                             "device %s",
1277                                             child->device_name,
1278                                             child->volume_label,
1279                                             child->volume_time,
1280                                             dself->volume_label,
1281                                             dself->volume_time,
1282                                             label_from_device);
1283                         append_message(&failure_errmsgs, this_message);
1284                         total_status |= DEVICE_STATUS_DEVICE_ERROR;
1285                         g_warning("RAIT device children have different labels or timestamps");
1286                     }
1287                 } else {
1288                     /* First device with a volume. */
1289                     dself->volume_label = g_strdup(child->volume_label);
1290                     dself->volume_time = g_strdup(child->volume_time);
1291                     dself->volume_header = dumpfile_copy(child->volume_header);
1292                     label_from_device = g_strdup(child->device_name);
1293                 }
1294             } else {
1295                 /* Device problem, it says it succeeded but sets no label? */
1296                 char * this_message =
1297                     g_strdup_printf("%s: Says label read, but no volume "
1298                                      "label found.", child->device_name);
1299                 g_warning("RAIT device child has NULL volume or label");
1300                 append_message(&failure_errmsgs, this_message);
1301                 total_status |= DEVICE_STATUS_DEVICE_ERROR;
1302             }
1303         }
1304     }
1305
1306     if (total_status == DEVICE_STATUS_SUCCESS) {
1307         StartOp * op = g_ptr_array_index(ops, 0);
1308         Device *child = op->base.child;
1309         dself->header_block_size = child->header_block_size;
1310     }
1311
1312     amfree(label_from_device);
1313     g_ptr_array_free_full(ops);
1314
1315     dself->status = total_status;
1316
1317     if (total_status != DEVICE_STATUS_SUCCESS || !success) {
1318         device_set_error(dself, failure_errmsgs, total_status);
1319         return FALSE;
1320     }
1321     amfree(failure_errmsgs);
1322     return TRUE;
1323 }
1324
1325 typedef struct {
1326     GenericOp base;
1327     dumpfile_t * info; /* IN */
1328     int fileno;
1329 } StartFileOp;
1330
1331 /* a GFunc */
1332 static void start_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1333     StartFileOp * op = data;
1334     op->base.result = GINT_TO_POINTER(device_start_file(op->base.child,
1335                                                         op->info));
1336     op->fileno = op->base.child->file;
1337     if (op->fileno < 1) {
1338         op->base.result = FALSE;
1339     }
1340 }
1341
1342 static gboolean
1343 rait_device_start_file (Device * dself, dumpfile_t * info) {
1344     GPtrArray * ops;
1345     guint i;
1346     gboolean success;
1347     RaitDevice * self;
1348     int actual_file = -1;
1349
1350     self = RAIT_DEVICE(dself);
1351
1352     if (rait_device_in_error(self)) return FALSE;
1353     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1354
1355     ops = g_ptr_array_sized_new(self->private->children->len);
1356     for (i = 0; i < self->private->children->len; i ++) {
1357         StartFileOp * op;
1358         op = g_new(StartFileOp, 1);
1359         op->base.child = g_ptr_array_index(self->private->children, i);
1360         /* each child gets its own copy of the header, to munge as it
1361          * likes (setting blocksize, at least) */
1362         op->info = dumpfile_copy(info);
1363         g_ptr_array_add(ops, op);
1364     }
1365
1366     do_rait_child_ops(self, start_file_do_op, ops);
1367
1368     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1369
1370     for (i = 0; i < self->private->children->len && success; i ++) {
1371         StartFileOp * op = g_ptr_array_index(ops, i);
1372         if (!op->base.result)
1373             continue;
1374         g_assert(op->fileno >= 1);
1375         if (actual_file < 1) {
1376             actual_file = op->fileno;
1377         }
1378         if (actual_file != op->fileno) {
1379             /* File number mismatch! Aah, my hair is on fire! */
1380             device_set_error(dself,
1381                              g_strdup_printf("File number mismatch in "
1382                                              "rait_device_start_file(): "
1383                                              "Child %s reported file number "
1384                                              "%d, another child reported "
1385                                              "file number %d.",
1386                                              op->base.child->device_name,
1387                                              op->fileno, actual_file),
1388                              DEVICE_STATUS_DEVICE_ERROR);
1389             success = FALSE;
1390             op->base.result = FALSE;
1391         }
1392     }
1393
1394     for (i = 0; i < ops->len && success; i ++) {
1395         StartFileOp * op = g_ptr_array_index(ops, i);
1396         if (op->info) dumpfile_free(op->info);
1397     }
1398     g_ptr_array_free_full(ops);
1399
1400     if (!success) {
1401         if (!device_in_error(dself)) {
1402             device_set_error(dself, stralloc("One or more devices "
1403                                              "failed to start_file"),
1404                              DEVICE_STATUS_DEVICE_ERROR);
1405         }
1406         return FALSE;
1407     }
1408
1409     g_assert(actual_file >= 1);
1410     dself->file = actual_file;
1411     g_mutex_lock(dself->device_mutex);
1412     dself->in_file = TRUE;
1413     dself->bytes_written = 0;
1414     g_mutex_unlock(dself->device_mutex);
1415
1416     return TRUE;
1417 }
1418
1419 static void find_simple_params(RaitDevice * self,
1420                                guint * num_children,
1421                                guint * data_children) {
1422     int num, data;
1423
1424     num = self->private->children->len;
1425     if (num > 1)
1426         data = num - 1;
1427     else
1428         data = num;
1429     if (num_children != NULL)
1430         *num_children = num;
1431     if (data_children != NULL)
1432         *data_children = data;
1433 }
1434
1435 typedef struct {
1436     GenericOp base;
1437     guint size;           /* IN */
1438     gpointer data;        /* IN */
1439     gboolean data_needs_free; /* bookkeeping */
1440 } WriteBlockOp;
1441
1442 /* a GFunc. */
1443 static void write_block_do_op(gpointer data,
1444                               gpointer user_data G_GNUC_UNUSED) {
1445     WriteBlockOp * op = data;
1446
1447     op->base.result =
1448         GINT_TO_POINTER(device_write_block(op->base.child, op->size, op->data));
1449 }
1450
1451 /* Parity block generation. Performance of this function can be improved
1452    considerably by using larger-sized integers or
1453    assembly-coded vector instructions. Parameters are:
1454    % data       - All data chunks in series (chunk_size * num_chunks bytes)
1455    % parity     - Allocated space for parity block (chunk_size bytes)
1456  */
1457 static void make_parity_block(char * data, char * parity,
1458                               guint chunk_size, guint num_chunks) {
1459     guint i;
1460     bzero(parity, chunk_size);
1461     for (i = 0; i < num_chunks - 1; i ++) {
1462         guint j;
1463         for (j = 0; j < chunk_size; j ++) {
1464             parity[j] ^= data[chunk_size*i + j];
1465         }
1466     }
1467 }
1468
1469 /* Does the same thing as make_parity_block, but instead of using a
1470    single memory chunk holding all chunks, it takes a GPtrArray of
1471    chunks. */
1472 static void make_parity_block_extents(GPtrArray * data, char * parity,
1473                                       guint chunk_size) {
1474     guint i;
1475     bzero(parity, chunk_size);
1476     for (i = 0; i < data->len; i ++) {
1477         guint j;
1478         char * data_chunk;
1479         data_chunk = g_ptr_array_index(data, i);
1480         for (j = 0; j < chunk_size; j ++) {
1481             parity[j] ^= data_chunk[j];
1482         }
1483     }
1484 }
1485
1486 /* Does the parity creation algorithm. Allocates and returns a single
1487    device block from a larger RAIT block. chunks and chunk are 1-indexed. */
1488 static char * extract_data_block(char * data, guint size,
1489                                  guint chunks, guint chunk) {
1490     char * rval;
1491     guint chunk_size;
1492
1493     g_assert(chunks > 0 && chunk > 0 && chunk <= chunks);
1494     g_assert(data != NULL);
1495     g_assert(size > 0 && size % (chunks - 1) == 0);
1496
1497     chunk_size = size / (chunks - 1);
1498     rval = g_malloc(chunk_size);
1499     if (chunks != chunk) {
1500         /* data block. */
1501         memcpy(rval, data + chunk_size * (chunk - 1), chunk_size);
1502     } else {
1503         make_parity_block(data, rval, chunk_size, chunks);
1504     }
1505
1506     return rval;
1507 }
1508
1509 static gboolean
1510 rait_device_write_block (Device * dself, guint size, gpointer data) {
1511     GPtrArray * ops;
1512     guint i;
1513     gboolean success;
1514     guint data_children, num_children;
1515     gsize blocksize = dself->block_size;
1516     RaitDevice * self;
1517     gboolean last_block = (size < blocksize);
1518
1519     self = RAIT_DEVICE(dself);
1520
1521     if (rait_device_in_error(self)) return FALSE;
1522     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1523
1524     find_simple_params(RAIT_DEVICE(self), &num_children, &data_children);
1525     num_children = self->private->children->len;
1526     if (num_children != 1)
1527         data_children = num_children - 1;
1528     else
1529         data_children = num_children;
1530
1531     g_assert(size % data_children == 0 || last_block);
1532
1533     /* zero out to the end of a short block -- tape devices only write
1534      * whole blocks. */
1535     if (last_block) {
1536         char *new_data;
1537
1538         new_data = g_malloc(blocksize);
1539         memcpy(new_data, data, size);
1540         bzero(new_data + size, blocksize - size);
1541
1542         data = new_data;
1543         size = blocksize;
1544     }
1545
1546     ops = g_ptr_array_sized_new(num_children);
1547     for (i = 0; i < self->private->children->len; i ++) {
1548         WriteBlockOp * op;
1549         op = g_malloc(sizeof(*op));
1550         op->base.child = g_ptr_array_index(self->private->children, i);
1551         op->size = size / data_children;
1552         if (num_children <= 2) {
1553             op->data = data;
1554             op->data_needs_free = FALSE;
1555         } else {
1556             op->data_needs_free = TRUE;
1557             op->data = extract_data_block(data, size, num_children, i + 1);
1558         }
1559         g_ptr_array_add(ops, op);
1560     }
1561
1562     do_rait_child_ops(self, write_block_do_op, ops);
1563
1564     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1565
1566     for (i = 0; i < self->private->children->len; i ++) {
1567         WriteBlockOp * op = g_ptr_array_index(ops, i);
1568         if (op->data_needs_free)
1569             free(op->data);
1570     }
1571
1572     if (last_block) {
1573         amfree(data);
1574     }
1575
1576     g_ptr_array_free_full(ops);
1577
1578     if (!success) {
1579         /* TODO be more specific here */
1580         /* TODO: handle EOM here -- if one or more (or two or more??)
1581          * children have is_eom set, then reflect that in our error
1582          * status. What's more fun is when one device fails and must be isolated at
1583          * the same time another hits EOF. */
1584         device_set_error(dself,
1585             stralloc("One or more devices failed to write_block"),
1586             DEVICE_STATUS_DEVICE_ERROR);
1587         /* this is EOM or an error, so call it EOM */
1588         dself->is_eom = TRUE;
1589         return FALSE;
1590     } else {
1591         dself->block ++;
1592         g_mutex_lock(dself->device_mutex);
1593         dself->bytes_written += size;
1594         g_mutex_unlock(dself->device_mutex);
1595
1596         return TRUE;
1597     }
1598 }
1599
1600 /* A GFunc */
1601 static void finish_file_do_op(gpointer data,
1602                               gpointer user_data G_GNUC_UNUSED) {
1603     GenericOp * op = data;
1604     if (op->child) {
1605         op->result = GINT_TO_POINTER(device_finish_file(op->child));
1606     } else {
1607         op->result = FALSE;
1608     }
1609 }
1610
1611 static gboolean
1612 rait_device_finish_file (Device * dself) {
1613     GPtrArray * ops;
1614     gboolean success;
1615     RaitDevice * self = RAIT_DEVICE(dself);
1616
1617     g_assert(self != NULL);
1618     if (rait_device_in_error(dself)) return FALSE;
1619     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1620
1621     ops = make_generic_boolean_op_array(self);
1622
1623     do_rait_child_ops(self, finish_file_do_op, ops);
1624
1625     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1626
1627     g_ptr_array_free_full(ops);
1628
1629     if (!success) {
1630         /* TODO: be more specific here */
1631         device_set_error(dself,
1632                          g_strdup("One or more devices failed to finish_file"),
1633             DEVICE_STATUS_DEVICE_ERROR);
1634         return FALSE;
1635     }
1636
1637     g_mutex_lock(dself->device_mutex);
1638     dself->in_file = FALSE;
1639     g_mutex_unlock(dself->device_mutex);
1640     return TRUE;
1641 }
1642
1643 typedef struct {
1644     GenericOp base;
1645     guint requested_file;                /* IN */
1646     guint actual_file;                   /* OUT */
1647 } SeekFileOp;
1648
1649 /* a GFunc. */
1650 static void seek_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1651     SeekFileOp * op = data;
1652     op->base.result = device_seek_file(op->base.child, op->requested_file);
1653     op->actual_file = op->base.child->file;
1654 }
1655
1656 static dumpfile_t *
1657 rait_device_seek_file (Device * dself, guint file) {
1658     GPtrArray * ops;
1659     guint i;
1660     gboolean success;
1661     dumpfile_t * rval;
1662     RaitDevice * self = RAIT_DEVICE(dself);
1663     guint actual_file = 0;
1664     gboolean in_file = FALSE;
1665
1666     if (rait_device_in_error(self)) return NULL;
1667
1668     dself->is_eof = FALSE;
1669     dself->block = 0;
1670     g_mutex_lock(dself->device_mutex);
1671     dself->in_file = FALSE;
1672     dself->bytes_read = 0;
1673     g_mutex_unlock(dself->device_mutex);
1674
1675     ops = g_ptr_array_sized_new(self->private->children->len);
1676     for (i = 0; i < self->private->children->len; i ++) {
1677         SeekFileOp * op;
1678         if ((int)i == self->private->failed)
1679             continue; /* This device is broken. */
1680         op = g_new(SeekFileOp, 1);
1681         op->base.child = g_ptr_array_index(self->private->children, i);
1682         op->base.child_index = i;
1683         op->requested_file = file;
1684         g_ptr_array_add(ops, op);
1685     }
1686
1687     do_rait_child_ops(self, seek_file_do_op, ops);
1688
1689     /* This checks for NULL values, but we still have to check for
1690        consistant headers. */
1691     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1692                                        ops, extract_boolean_pointer_op);
1693
1694     rval = NULL;
1695     for (i = 0; i < ops->len; i ++) {
1696         SeekFileOp * this_op;
1697         dumpfile_t * this_result;
1698         guint this_actual_file;
1699         gboolean this_in_file;
1700
1701         this_op = (SeekFileOp*)g_ptr_array_index(ops, i);
1702
1703         if ((signed)this_op->base.child_index == self->private->failed)
1704             continue;
1705
1706         this_result = this_op->base.result;
1707         this_actual_file = this_op->actual_file;
1708         this_in_file = this_op->base.child->in_file;
1709
1710         if (rval == NULL) {
1711             rval = this_result;
1712             actual_file = this_actual_file;
1713             in_file = this_in_file;
1714         } else {
1715             if (headers_are_equal(rval, this_result) &&
1716                 actual_file == this_actual_file &&
1717                 in_file == this_in_file) {
1718                 /* Do nothing. */
1719             } else {
1720                 success = FALSE;
1721             }
1722             free(this_result);
1723         }
1724     }
1725
1726     g_ptr_array_free_full(ops);
1727
1728     if (!success) {
1729         amfree(rval);
1730         /* TODO: be more specific here */
1731         device_set_error(dself,
1732                          g_strdup("One or more devices failed to seek_file"),
1733             DEVICE_STATUS_DEVICE_ERROR);
1734         return NULL;
1735     }
1736
1737     /* update our state */
1738     g_mutex_lock(dself->device_mutex);
1739     dself->in_file = in_file;
1740     g_mutex_unlock(dself->device_mutex);
1741     dself->file = actual_file;
1742
1743     return rval;
1744 }
1745
1746 typedef struct {
1747     GenericOp base;
1748     guint64 block; /* IN */
1749 } SeekBlockOp;
1750
1751 /* a GFunc. */
1752 static void seek_block_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1753     SeekBlockOp * op = data;
1754     op->base.result =
1755         GINT_TO_POINTER(device_seek_block(op->base.child, op->block));
1756 }
1757
1758 static gboolean
1759 rait_device_seek_block (Device * dself, guint64 block) {
1760     GPtrArray * ops;
1761     guint i;
1762     gboolean success;
1763
1764     RaitDevice * self = RAIT_DEVICE(dself);
1765
1766     if (rait_device_in_error(self)) return FALSE;
1767
1768     ops = g_ptr_array_sized_new(self->private->children->len);
1769     for (i = 0; i < self->private->children->len; i ++) {
1770         SeekBlockOp * op;
1771         if ((int)i == self->private->failed)
1772             continue; /* This device is broken. */
1773         op = g_new(SeekBlockOp, 1);
1774         op->base.child = g_ptr_array_index(self->private->children, i);
1775         op->base.child_index = i;
1776         op->block = block;
1777         g_ptr_array_add(ops, op);
1778     }
1779
1780     do_rait_child_ops(self, seek_block_do_op, ops);
1781
1782     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1783                                        ops, extract_boolean_generic_op);
1784
1785     g_ptr_array_free_full(ops);
1786
1787     if (!success) {
1788         /* TODO: be more specific here */
1789         device_set_error(dself,
1790             stralloc("One or more devices failed to seek_block"),
1791             DEVICE_STATUS_DEVICE_ERROR);
1792         return FALSE;
1793     }
1794
1795     dself->block = block;
1796     return TRUE;
1797 }
1798
1799 typedef struct {
1800     GenericOp base;
1801     gpointer buffer; /* IN */
1802     int read_size;      /* IN/OUT -- note not a pointer */
1803     int desired_read_size; /* bookkeeping */
1804 } ReadBlockOp;
1805
1806 /* a GFunc. */
1807 static void read_block_do_op(gpointer data,
1808                              gpointer user_data G_GNUC_UNUSED) {
1809     ReadBlockOp * op = data;
1810     op->base.result =
1811         GINT_TO_POINTER(device_read_block(op->base.child, op->buffer,
1812                                           &(op->read_size)));
1813     if (op->read_size > op->desired_read_size) {
1814         g_warning("child device %s tried to return an oversized block, which the RAIT device does not support",
1815                   op->base.child->device_name);
1816     }
1817 }
1818
1819 /* A BooleanExtractor. This one checks for a successful read. */
1820 static gboolean extract_boolean_read_block_op_data(gpointer data) {
1821     ReadBlockOp * op = data;
1822     return GPOINTER_TO_INT(op->base.result) == op->desired_read_size;
1823 }
1824
1825 /* A BooleanExtractor. This one checks for EOF. */
1826 static gboolean extract_boolean_read_block_op_eof(gpointer data) {
1827     ReadBlockOp * op = data;
1828     return op->base.child->is_eof;
1829 }
1830
1831 /* Counts the number of elements in an array matching a given proposition. */
1832 static int g_ptr_array_count(GPtrArray * array, BooleanExtractor filter) {
1833     int rval;
1834     unsigned int i;
1835     rval = 0;
1836     for (i = 0; i < array->len ; i++) {
1837         if (filter(g_ptr_array_index(array, i)))
1838             rval ++;
1839     }
1840     return rval;
1841 }
1842
1843 static gboolean raid_block_reconstruction(RaitDevice * self, GPtrArray * ops,
1844                                       gpointer buf, size_t bufsize) {
1845     guint num_children, data_children;
1846     gsize blocksize;
1847     gsize child_blocksize;
1848     guint i;
1849     int parity_child;
1850     gpointer parity_block = NULL;
1851     gboolean success;
1852
1853     success = TRUE;
1854
1855     blocksize = DEVICE(self)->block_size;
1856     find_simple_params(self, &num_children, &data_children);
1857
1858     if (num_children > 1)
1859         parity_child = num_children - 1;
1860     else
1861         parity_child = -1;
1862
1863     child_blocksize = blocksize / data_children;
1864
1865     for (i = 0; i < ops->len; i ++) {
1866         ReadBlockOp * op = g_ptr_array_index(ops, i);
1867         if (!extract_boolean_read_block_op_data(op))
1868             continue;
1869         if ((int)(op->base.child_index) == parity_child) {
1870             parity_block = op->buffer;
1871         } else {
1872             g_assert(child_blocksize * (op->base.child_index+1) <= bufsize);
1873             memcpy((char *)buf + child_blocksize * op->base.child_index, op->buffer,
1874                    child_blocksize);
1875         }
1876     }
1877     if (self->private->status == RAIT_STATUS_COMPLETE) {
1878         g_assert(parity_block != NULL); /* should have found parity_child */
1879
1880         if (num_children >= 2) {
1881             /* Verify the parity block. This code is inefficient but
1882                does the job for the 2-device case, too. */
1883             gpointer constructed_parity;
1884             GPtrArray * data_extents;
1885
1886             constructed_parity = g_malloc(child_blocksize);
1887             data_extents = g_ptr_array_sized_new(data_children);
1888             for (i = 0; i < data_children; i ++) {
1889                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1890                 g_assert(extract_boolean_read_block_op_data(op));
1891                 if ((int)op->base.child_index == parity_child)
1892                     continue;
1893                 g_ptr_array_add(data_extents, op->buffer);
1894             }
1895             make_parity_block_extents(data_extents, constructed_parity,
1896                                       child_blocksize);
1897
1898             if (0 != memcmp(parity_block, constructed_parity,
1899                             child_blocksize)) {
1900                 device_set_error(DEVICE(self),
1901                     stralloc(_("RAIT is inconsistent: Parity block did not match data blocks.")),
1902                     DEVICE_STATUS_DEVICE_ERROR);
1903                 /* TODO: can't we just isolate the device in this case? */
1904                 success = FALSE;
1905             }
1906             g_ptr_array_free(data_extents, TRUE);
1907             amfree(constructed_parity);
1908         } else { /* do nothing. */ }
1909     } else if (self->private->status == RAIT_STATUS_DEGRADED) {
1910         g_assert(self->private->failed >= 0 && self->private->failed < (int)num_children);
1911         /* We are in degraded mode. What's missing? */
1912         if (self->private->failed == parity_child) {
1913             /* do nothing. */
1914         } else if (num_children >= 2) {
1915             /* Reconstruct failed block from parity block. */
1916             GPtrArray * data_extents = g_ptr_array_new();
1917
1918             for (i = 0; i < data_children; i ++) {
1919                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1920                 if (!extract_boolean_read_block_op_data(op))
1921                     continue;
1922                 g_ptr_array_add(data_extents, op->buffer);
1923             }
1924
1925             /* Conveniently, the reconstruction is the same procedure
1926                as the parity generation. This even works if there is
1927                only one remaining device! */
1928             make_parity_block_extents(data_extents,
1929                                       (char *)buf + (child_blocksize *
1930                                              self->private->failed),
1931                                       child_blocksize);
1932
1933             /* The array members belong to our ops argument. */
1934             g_ptr_array_free(data_extents, TRUE);
1935         } else {
1936             g_assert_not_reached();
1937         }
1938     } else {
1939         /* device is already in FAILED state -- we shouldn't even be here */
1940         success = FALSE;
1941     }
1942     return success;
1943 }
1944
1945 static int
1946 rait_device_read_block (Device * dself, gpointer buf, int * size) {
1947     GPtrArray * ops;
1948     guint i;
1949     gboolean success;
1950     guint num_children, data_children;
1951     gsize blocksize = dself->block_size;
1952     gsize child_blocksize;
1953
1954     RaitDevice * self = RAIT_DEVICE(dself);
1955
1956     if (rait_device_in_error(self)) return -1;
1957
1958     find_simple_params(self, &num_children, &data_children);
1959
1960     /* tell caller they haven't given us a big enough buffer */
1961     if (blocksize > (gsize)*size) {
1962         g_assert(blocksize < INT_MAX);
1963         *size = (int)blocksize;
1964         return 0;
1965     }
1966
1967     g_assert(blocksize % data_children == 0); /* see find_block_size */
1968     child_blocksize = blocksize / data_children;
1969
1970     ops = g_ptr_array_sized_new(num_children);
1971     for (i = 0; i < num_children; i ++) {
1972         ReadBlockOp * op;
1973         if ((int)i == self->private->failed)
1974             continue; /* This device is broken. */
1975         op = g_new(ReadBlockOp, 1);
1976         op->base.child = g_ptr_array_index(self->private->children, i);
1977         op->base.child_index = i;
1978         op->buffer = g_malloc(child_blocksize);
1979         op->desired_read_size = op->read_size = child_blocksize;
1980         g_ptr_array_add(ops, op);
1981     }
1982
1983     do_rait_child_ops(self, read_block_do_op, ops);
1984
1985     if (g_ptr_array_count(ops, extract_boolean_read_block_op_data)) {
1986         if (!g_ptr_array_union_robust(RAIT_DEVICE(self),
1987                                      ops,
1988                                      extract_boolean_read_block_op_data)) {
1989             /* TODO: be more specific */
1990             device_set_error(dself,
1991                 stralloc(_("Error occurred combining blocks from child devices")),
1992                 DEVICE_STATUS_DEVICE_ERROR);
1993             success = FALSE;
1994         } else {
1995             /* raid_block_reconstruction sets the error status if necessary */
1996             success = raid_block_reconstruction(RAIT_DEVICE(self),
1997                                                 ops, buf, (size_t)*size);
1998         }
1999     } else {
2000         success = FALSE;
2001         if (g_ptr_array_union_robust(RAIT_DEVICE(self),
2002                                      ops,
2003                                      extract_boolean_read_block_op_eof)) {
2004             device_set_error(dself,
2005                 stralloc(_("EOF")),
2006                 DEVICE_STATUS_SUCCESS);
2007             dself->is_eof = TRUE;
2008             g_mutex_lock(dself->device_mutex);
2009             dself->in_file = FALSE;
2010             g_mutex_unlock(dself->device_mutex);
2011         } else {
2012             device_set_error(dself,
2013                 stralloc(_("All child devices failed to read, but not all are at eof")),
2014                 DEVICE_STATUS_DEVICE_ERROR);
2015         }
2016     }
2017
2018     for (i = 0; i < ops->len; i ++) {
2019         ReadBlockOp * op = g_ptr_array_index(ops, i);
2020         amfree(op->buffer);
2021     }
2022     g_ptr_array_free_full(ops);
2023
2024     if (success) {
2025         dself->block++;
2026         *size = blocksize;
2027         g_mutex_lock(dself->device_mutex);
2028         dself->bytes_read += blocksize;
2029         g_mutex_unlock(dself->device_mutex);
2030         return blocksize;
2031     } else {
2032         return -1;
2033     }
2034 }
2035
2036 /* property utility functions */
2037
2038 typedef struct {
2039     GenericOp base;
2040     DevicePropertyId id;   /* IN */
2041     GValue value;          /* IN/OUT */
2042     PropertySurety surety; /* IN (for set) */
2043     PropertySource source; /* IN (for set) */
2044 } PropertyOp;
2045
2046 /* Creates a GPtrArray of PropertyOf for a get or set operation. */
2047 static GPtrArray * make_property_op_array(RaitDevice * self,
2048                                           DevicePropertyId id,
2049                                           GValue * value,
2050                                           PropertySurety surety,
2051                                           PropertySource source) {
2052     guint i;
2053     GPtrArray * ops;
2054     ops = g_ptr_array_sized_new(self->private->children->len);
2055     for (i = 0; i < self->private->children->len; i ++) {
2056         PropertyOp * op;
2057
2058         if ((signed)i == self->private->failed) {
2059             continue;
2060         }
2061
2062         op = g_new(PropertyOp, 1);
2063         op->base.child = g_ptr_array_index(self->private->children, i);
2064         op->id = id;
2065         bzero(&(op->value), sizeof(op->value));
2066         if (value != NULL) {
2067             g_value_unset_copy(value, &(op->value));
2068         }
2069         op->surety = surety;
2070         op->source = source;
2071         g_ptr_array_add(ops, op);
2072     }
2073
2074     return ops;
2075 }
2076
2077 /* A GFunc. */
2078 static void property_get_do_op(gpointer data,
2079                                gpointer user_data G_GNUC_UNUSED) {
2080     PropertyOp * op = data;
2081
2082     bzero(&(op->value), sizeof(op->value));
2083     op->base.result =
2084         GINT_TO_POINTER(device_property_get(op->base.child, op->id,
2085                                             &(op->value)));
2086 }
2087
2088 /* A GFunc. */
2089 static void property_set_do_op(gpointer data,
2090                                gpointer user_data G_GNUC_UNUSED) {
2091     PropertyOp * op = data;
2092
2093     op->base.result =
2094         GINT_TO_POINTER(device_property_set_ex(op->base.child, op->id,
2095                                                &(op->value), op->surety,
2096                                                op->source));
2097     g_value_unset(&(op->value));
2098 }
2099
2100 /* PropertyGetFns and PropertySetFns */
2101
2102 static gboolean
2103 property_get_block_size_fn(Device *dself,
2104     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2105     PropertySurety *surety, PropertySource *source)
2106 {
2107     RaitDevice *self = RAIT_DEVICE(dself);
2108     gsize my_block_size;
2109
2110     if (dself->block_size_source != PROPERTY_SOURCE_DEFAULT) {
2111         my_block_size = dself->block_size;
2112
2113         if (surety)
2114             *surety = dself->block_size_surety;
2115     } else {
2116         gsize child_block_size;
2117         child_block_size = calculate_block_size_from_children(self,
2118                                                     &my_block_size);
2119         if (child_block_size == 0)
2120             return FALSE;
2121
2122         if (surety)
2123             *surety = PROPERTY_SURETY_BAD; /* may still change */
2124     }
2125
2126     if (val) {
2127         g_value_unset_init(val, G_TYPE_INT);
2128         g_assert(my_block_size < G_MAXINT); /* gsize -> gint */
2129         g_value_set_int(val, (gint)my_block_size);
2130     }
2131
2132     if (source)
2133         *source = dself->block_size_source;
2134
2135     return TRUE;
2136 }
2137
2138 static gboolean
2139 property_set_block_size_fn(Device *dself,
2140     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2141     PropertySurety surety, PropertySource source)
2142 {
2143     RaitDevice *self = RAIT_DEVICE(dself);
2144     gint my_block_size = g_value_get_int(val);
2145     guint data_children;
2146
2147     find_simple_params(self, NULL, &data_children);
2148     if ((my_block_size % data_children) != 0) {
2149         device_set_error(dself,
2150             vstrallocf(_("Block size must be a multiple of %d"), data_children),
2151             DEVICE_STATUS_DEVICE_ERROR);
2152         return FALSE;
2153     }
2154
2155     dself->block_size = my_block_size;
2156     dself->block_size_source = source;
2157     dself->block_size_surety = surety;
2158
2159     if (!fix_block_size(self))
2160         return FALSE;
2161
2162     return TRUE;
2163 }
2164
2165 static gboolean
2166 property_get_canonical_name_fn(Device *dself,
2167     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2168     PropertySurety *surety, PropertySource *source)
2169 {
2170     RaitDevice *self = RAIT_DEVICE(dself);
2171     char *canonical = child_device_names_to_rait_name(self);
2172
2173     if (val) {
2174         g_value_unset_init(val, G_TYPE_STRING);
2175         g_value_set_string(val, canonical);
2176         g_free(canonical);
2177     }
2178
2179     if (surety)
2180         *surety = PROPERTY_SURETY_GOOD;
2181
2182     if (source)
2183         *source = PROPERTY_SOURCE_DETECTED;
2184
2185     return TRUE;
2186 }
2187
2188 static gboolean
2189 property_get_concurrency_fn(Device *dself,
2190     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2191     PropertySurety *surety, PropertySource *source)
2192 {
2193     RaitDevice *self = RAIT_DEVICE(dself);
2194     ConcurrencyParadigm result;
2195     guint i;
2196     GPtrArray * ops;
2197     gboolean success;
2198
2199     ops = make_property_op_array(self, PROPERTY_CONCURRENCY, NULL, 0, 0);
2200     do_rait_child_ops(self, property_get_do_op, ops);
2201
2202     /* find the most restrictive paradigm acceptable to all
2203      * child devices */
2204     result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
2205     success = TRUE;
2206     for (i = 0; i < ops->len; i ++) {
2207         ConcurrencyParadigm cur;
2208         PropertyOp * op = g_ptr_array_index(ops, i);
2209
2210         if (!op->base.result
2211             || G_VALUE_TYPE(&(op->value)) != CONCURRENCY_PARADIGM_TYPE) {
2212             success = FALSE;
2213             break;
2214         }
2215
2216         cur = g_value_get_enum(&(op->value));
2217         if (result == CONCURRENCY_PARADIGM_EXCLUSIVE ||
2218             cur == CONCURRENCY_PARADIGM_EXCLUSIVE) {
2219             result = CONCURRENCY_PARADIGM_EXCLUSIVE;
2220         } else if (result == CONCURRENCY_PARADIGM_SHARED_READ ||
2221                    cur == CONCURRENCY_PARADIGM_SHARED_READ) {
2222             result = CONCURRENCY_PARADIGM_SHARED_READ;
2223         } else if (result == CONCURRENCY_PARADIGM_RANDOM_ACCESS &&
2224                    cur == CONCURRENCY_PARADIGM_RANDOM_ACCESS) {
2225             result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
2226         } else {
2227             success = FALSE;
2228             break;
2229         }
2230     }
2231
2232     g_ptr_array_free_full(ops);
2233
2234     if (success) {
2235         if (val) {
2236             g_value_unset_init(val, CONCURRENCY_PARADIGM_TYPE);
2237             g_value_set_enum(val, result);
2238         }
2239
2240         if (surety)
2241             *surety = PROPERTY_SURETY_GOOD;
2242
2243         if (source)
2244             *source = PROPERTY_SOURCE_DETECTED;
2245     }
2246
2247     return success;
2248 }
2249
2250 static gboolean
2251 property_get_streaming_fn(Device *dself,
2252     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2253     PropertySurety *surety, PropertySource *source)
2254 {
2255     RaitDevice *self = RAIT_DEVICE(dself);
2256     StreamingRequirement result;
2257     guint i;
2258     GPtrArray * ops;
2259     gboolean success;
2260
2261     ops = make_property_op_array(self, PROPERTY_STREAMING, NULL, 0, 0);
2262     do_rait_child_ops(self, property_get_do_op, ops);
2263
2264     /* combine the child streaming requirements, selecting the strongest
2265      * requirement of the bunch. */
2266     result = STREAMING_REQUIREMENT_NONE;
2267     success = TRUE;
2268     for (i = 0; i < ops->len; i ++) {
2269         StreamingRequirement cur;
2270         PropertyOp * op = g_ptr_array_index(ops, i);
2271
2272         if (!op->base.result
2273             || G_VALUE_TYPE(&(op->value)) != STREAMING_REQUIREMENT_TYPE) {
2274             success = FALSE;
2275             break;
2276         }
2277
2278         cur = g_value_get_enum(&(op->value));
2279         if (result == STREAMING_REQUIREMENT_REQUIRED ||
2280             cur == STREAMING_REQUIREMENT_REQUIRED) {
2281             result = STREAMING_REQUIREMENT_REQUIRED;
2282         } else if (result == STREAMING_REQUIREMENT_DESIRED ||
2283                    cur == STREAMING_REQUIREMENT_DESIRED) {
2284             result = STREAMING_REQUIREMENT_DESIRED;
2285         } else if (result == STREAMING_REQUIREMENT_NONE &&
2286                    cur == STREAMING_REQUIREMENT_NONE) {
2287             result = STREAMING_REQUIREMENT_NONE;
2288         } else {
2289             success = FALSE;
2290             break;
2291         }
2292     }
2293
2294     g_ptr_array_free_full(ops);
2295
2296     if (success) {
2297         if (val) {
2298             g_value_unset_init(val, STREAMING_REQUIREMENT_TYPE);
2299             g_value_set_enum(val, result);
2300         }
2301
2302         if (surety)
2303             *surety = PROPERTY_SURETY_GOOD;
2304
2305         if (source)
2306             *source = PROPERTY_SOURCE_DETECTED;
2307     }
2308
2309     return success;
2310 }
2311
2312 static gboolean
2313 property_get_boolean_and_fn(Device *dself,
2314     DevicePropertyBase *base, GValue *val,
2315     PropertySurety *surety, PropertySource *source)
2316 {
2317     RaitDevice *self = RAIT_DEVICE(dself);
2318     gboolean result;
2319     guint i;
2320     GPtrArray * ops;
2321     gboolean success;
2322
2323     ops = make_property_op_array(self, base->ID, NULL, 0, 0);
2324     do_rait_child_ops(self, property_get_do_op, ops);
2325
2326     /* combine the child values, applying a simple AND */
2327     result = TRUE;
2328     success = TRUE;
2329     for (i = 0; i < ops->len; i ++) {
2330         PropertyOp * op = g_ptr_array_index(ops, i);
2331
2332         if (!op->base.result || !G_VALUE_HOLDS_BOOLEAN(&(op->value))) {
2333             success = FALSE;
2334             break;
2335         }
2336
2337         if (!g_value_get_boolean(&(op->value))) {
2338             result = FALSE;
2339             break;
2340         }
2341     }
2342
2343     g_ptr_array_free_full(ops);
2344
2345     if (success) {
2346         if (val) {
2347             g_value_unset_init(val, G_TYPE_BOOLEAN);
2348             g_value_set_boolean(val, result);
2349         }
2350
2351         if (surety)
2352             *surety = PROPERTY_SURETY_GOOD;
2353
2354         if (source)
2355             *source = PROPERTY_SOURCE_DETECTED;
2356     }
2357
2358     return success;
2359 }
2360
2361 static gboolean
2362 property_get_medium_access_type_fn(Device *dself,
2363     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2364     PropertySurety *surety, PropertySource *source)
2365 {
2366     RaitDevice *self = RAIT_DEVICE(dself);
2367     MediaAccessMode result;
2368     guint i;
2369     GPtrArray * ops;
2370     gboolean success;
2371
2372     ops = make_property_op_array(self, PROPERTY_MEDIUM_ACCESS_TYPE, NULL, 0, 0);
2373     do_rait_child_ops(self, property_get_do_op, ops);
2374
2375     /* combine the modes as best we can */
2376     result = 0;
2377     success = TRUE;
2378     for (i = 0; i < ops->len; i ++) {
2379         MediaAccessMode cur;
2380         PropertyOp * op = g_ptr_array_index(ops, i);
2381
2382         if (!op->base.result || G_VALUE_TYPE(&(op->value)) != MEDIA_ACCESS_MODE_TYPE) {
2383             success = FALSE;
2384             break;
2385         }
2386
2387         cur = g_value_get_enum(&(op->value));
2388
2389         if (i == 0) {
2390             result = cur;
2391         } else if ((result == MEDIA_ACCESS_MODE_READ_ONLY &&
2392                     cur == MEDIA_ACCESS_MODE_WRITE_ONLY) ||
2393                    (result == MEDIA_ACCESS_MODE_WRITE_ONLY &&
2394                     cur == MEDIA_ACCESS_MODE_READ_ONLY)) {
2395             /* Invalid combination; one device can only read, other
2396                can only write. */
2397             success = FALSE;
2398             break;
2399         } else if (result == MEDIA_ACCESS_MODE_READ_ONLY ||
2400                    cur == MEDIA_ACCESS_MODE_READ_ONLY) {
2401             result = MEDIA_ACCESS_MODE_READ_ONLY;
2402         } else if (result == MEDIA_ACCESS_MODE_WRITE_ONLY ||
2403                    cur == MEDIA_ACCESS_MODE_WRITE_ONLY) {
2404             result = MEDIA_ACCESS_MODE_WRITE_ONLY;
2405         } else if (result == MEDIA_ACCESS_MODE_WORM ||
2406                    cur == MEDIA_ACCESS_MODE_WORM) {
2407             result = MEDIA_ACCESS_MODE_WORM;
2408         } else if (result == MEDIA_ACCESS_MODE_READ_WRITE &&
2409                    cur == MEDIA_ACCESS_MODE_READ_WRITE) {
2410             result = MEDIA_ACCESS_MODE_READ_WRITE;
2411         } else {
2412             success = FALSE;
2413             break;
2414         }
2415     }
2416
2417     g_ptr_array_free_full(ops);
2418
2419     if (success) {
2420         if (val) {
2421             g_value_unset_init(val, MEDIA_ACCESS_MODE_TYPE);
2422             g_value_set_enum(val, result);
2423         }
2424
2425         if (surety)
2426             *surety = PROPERTY_SURETY_GOOD;
2427
2428         if (source)
2429             *source = PROPERTY_SOURCE_DETECTED;
2430     }
2431
2432     return success;
2433 }
2434
2435 static gboolean
2436 property_get_max_volume_usage_fn(Device *dself,
2437     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2438     PropertySurety *surety, PropertySource *source)
2439 {
2440     RaitDevice *self = RAIT_DEVICE(dself);
2441     guint64 result;
2442     guint i;
2443     GPtrArray * ops;
2444     guint data_children;
2445
2446     ops = make_property_op_array(self, PROPERTY_MAX_VOLUME_USAGE, NULL, 0, 0);
2447     do_rait_child_ops(self, property_get_do_op, ops);
2448
2449     /* look for the smallest value that is set */
2450     result = 0;
2451     for (i = 0; i < ops->len; i ++) {
2452         guint64 cur;
2453         PropertyOp * op = g_ptr_array_index(ops, i);
2454
2455         if (!op->base.result || !G_VALUE_HOLDS_UINT64(&(op->value))) {
2456             continue; /* ignore children without this property */
2457         }
2458
2459         cur = g_value_get_uint64(&(op->value));
2460
2461         if (!result || (cur && cur < result)) {
2462             result = cur;
2463         }
2464     }
2465
2466     g_ptr_array_free_full(ops);
2467
2468     if (result) {
2469         /* result contains the minimum usage on any child.  We can use that space
2470          * on each of our data children, so the total is larger */
2471         find_simple_params(self, NULL, &data_children);
2472         result *= data_children;
2473
2474         if (val) {
2475             g_value_unset_init(val, G_TYPE_UINT64);
2476             g_value_set_uint64(val, result);
2477         }
2478
2479         if (surety)
2480             *surety = PROPERTY_SURETY_GOOD;
2481
2482         if (source)
2483             *source = PROPERTY_SOURCE_DETECTED;
2484
2485         return TRUE;
2486     } else {
2487         /* no result from any children, so we effectively don't have this property */
2488         return FALSE;
2489     }
2490 }
2491
2492 static gboolean
2493 property_set_max_volume_usage_fn(Device *dself,
2494     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2495     PropertySurety surety, PropertySource source)
2496 {
2497     RaitDevice *self = RAIT_DEVICE(dself);
2498     guint64 parent_usage;
2499     guint64 child_usage;
2500     GValue child_val;
2501     guint i;
2502     gboolean success;
2503     GPtrArray * ops;
2504     guint data_children;
2505
2506     parent_usage = g_value_get_uint64(val);
2507     find_simple_params(self, NULL, &data_children);
2508
2509     child_usage = parent_usage / data_children;
2510
2511     bzero(&child_val, sizeof(child_val));
2512     g_value_init(&child_val, G_TYPE_UINT64);
2513     g_value_set_uint64(&child_val, child_usage);
2514
2515     ops = make_property_op_array(self, PROPERTY_MAX_VOLUME_USAGE,
2516                                 &child_val, surety, source);
2517     do_rait_child_ops(self, property_set_do_op, ops);
2518
2519     /* if any of the kids succeeded, then we did too */
2520     success = FALSE;
2521     for (i = 0; i < ops->len; i ++) {
2522         PropertyOp * op = g_ptr_array_index(ops, i);
2523
2524         if (op->base.result) {
2525             success = TRUE;
2526             break;
2527         }
2528     }
2529
2530     g_ptr_array_free_full(ops);
2531
2532     return success;
2533 }
2534
2535 typedef struct {
2536     GenericOp base;
2537     guint filenum;
2538 } RecycleFileOp;
2539
2540 /* A GFunc */
2541 static void recycle_file_do_op(gpointer data,
2542                                gpointer user_data G_GNUC_UNUSED) {
2543     RecycleFileOp * op = data;
2544     op->base.result =
2545         GINT_TO_POINTER(device_recycle_file(op->base.child, op->filenum));
2546 }
2547
2548 static gboolean
2549 rait_device_recycle_file (Device * dself, guint filenum) {
2550     GPtrArray * ops;
2551     guint i;
2552     gboolean success;
2553
2554     RaitDevice * self = RAIT_DEVICE(dself);
2555
2556     if (rait_device_in_error(self)) return FALSE;
2557
2558     ops = g_ptr_array_sized_new(self->private->children->len);
2559     for (i = 0; i < self->private->children->len; i ++) {
2560         RecycleFileOp * op;
2561         op = g_new(RecycleFileOp, 1);
2562         op->base.child = g_ptr_array_index(self->private->children, i);
2563         op->filenum = filenum;
2564         g_ptr_array_add(ops, op);
2565     }
2566
2567     do_rait_child_ops(self, recycle_file_do_op, ops);
2568
2569     success = g_ptr_array_and(ops, extract_boolean_generic_op);
2570
2571     g_ptr_array_free_full(ops);
2572
2573     if (!success) {
2574         /* TODO: be more specific here */
2575         device_set_error(dself,
2576             stralloc(_("One or more devices failed to recycle_file")),
2577             DEVICE_STATUS_DEVICE_ERROR);
2578         return FALSE;
2579     }
2580     return TRUE;
2581 }
2582
2583 /* GFunc */
2584 static void finish_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
2585     GenericOp * op = data;
2586     op->result = GINT_TO_POINTER(device_finish(op->child));
2587 }
2588
2589 static gboolean
2590 rait_device_finish (Device * self) {
2591     GPtrArray * ops;
2592     gboolean success;
2593     gboolean rval = TRUE;
2594
2595     rval = !rait_device_in_error(self);
2596
2597     ops = make_generic_boolean_op_array(RAIT_DEVICE(self));
2598
2599     do_rait_child_ops(RAIT_DEVICE(self), finish_do_op, ops);
2600
2601     success = g_ptr_array_and(ops, extract_boolean_generic_op);
2602     if (!success)
2603         rval = FALSE;
2604
2605     g_ptr_array_free_full(ops);
2606
2607     self->access_mode = ACCESS_NULL;
2608
2609     return rval;
2610 }
2611
2612 static Device *
2613 rait_device_factory (char * device_name, char * device_type, char * device_node) {
2614     Device * rval;
2615     g_assert(0 == strcmp(device_type, "rait"));
2616     rval = DEVICE(g_object_new(TYPE_RAIT_DEVICE, NULL));
2617     device_open_device(rval, device_name, device_type, device_node);
2618     return rval;
2619 }
2620
2621 void
2622 rait_device_register (void) {
2623     static const char * device_prefix_list[] = {"rait", NULL};
2624     register_device(rait_device_factory, device_prefix_list);
2625 }