d0b0e893c028f263eb445147e12113482b22f1e4
[debian/amanda] / device-src / rait-device.c
1 /*
2  * Copyright (c) 2005-2008 Zmanda, Inc.  All Rights Reserved.
3  * 
4  * This library is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU Lesser General Public License version 2.1 as 
6  * published by the Free Software Foundation.
7  * 
8  * This library is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
11  * License for more details.
12  * 
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this library; if not, write to the Free Software Foundation,
15  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
16  * 
17  * Contact information: Zmanda Inc., 465 S Mathlida Ave, Suite 300
18  * Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
19  */
20
21 /* The RAIT device encapsulates some number of other devices into a single
22  * redundant device. */
23
24 #include <amanda.h>
25 #include "property.h"
26 #include "util.h"
27 #include <glib.h>
28 #include "glib-util.h"
29 #include "device.h"
30 #include "fileheader.h"
31 #include "semaphore.h"
32
33 /* Just a note about the failure mode of different operations:
34    - Recovers from a failure (enters degraded mode)
35      open_device()
36      seek_file() -- explodes if headers don't match.
37      seek_block() -- explodes if headers don't match.
38      read_block() -- explodes if data doesn't match.
39
40    - Operates in degraded mode (but dies if a new problem shows up)
41      read_label() -- but dies on label mismatch.
42      start() -- but dies when writing in degraded mode.
43      property functions
44      finish()
45
46    - Dies in degraded mode (even if remaining devices are OK)
47      start_file()
48      write_block()
49      finish_file()
50      recycle_file()
51 */
52
53 /*
54  * Type checking and casting macros
55  */
56 #define TYPE_RAIT_DEVICE        (rait_device_get_type())
57 #define RAIT_DEVICE(obj)        G_TYPE_CHECK_INSTANCE_CAST((obj), rait_device_get_type(), RaitDevice)
58 #define RAIT_DEVICE_CONST(obj)  G_TYPE_CHECK_INSTANCE_CAST((obj), rait_device_get_type(), RaitDevice const)
59 #define RAIT_DEVICE_CLASS(klass)        G_TYPE_CHECK_CLASS_CAST((klass), rait_device_get_type(), RaitDeviceClass)
60 #define IS_RAIT_DEVICE(obj)     G_TYPE_CHECK_INSTANCE_TYPE((obj), rait_device_get_type ())
61
62 #define RAIT_DEVICE_GET_CLASS(obj)      G_TYPE_INSTANCE_GET_CLASS((obj), rait_device_get_type(), RaitDeviceClass)
63 static GType    rait_device_get_type    (void);
64
65 /*
66  * Main object structure
67  */
68 typedef struct RaitDevice_s {
69     Device __parent__;
70
71     struct RaitDevicePrivate_s * private;
72 } RaitDevice;
73
74 /*
75  * Class definition
76  */
77 typedef struct _RaitDeviceClass RaitDeviceClass;
78 struct _RaitDeviceClass {
79     DeviceClass __parent__;
80 };
81
82 typedef enum {
83     RAIT_STATUS_COMPLETE, /* All subdevices OK. */
84     RAIT_STATUS_DEGRADED, /* One subdevice failed. */
85     RAIT_STATUS_FAILED    /* Two or more subdevices failed. */
86 } RaitStatus;
87
88 /* Older versions of glib have a deadlock in their thread pool implementations,
89  * so we include a simple thread-pool implementation here to replace it.
90  *
91  * This implementation assumes that threads are used for paralellizing a single
92  * operation, so all threads run a function to completion before the main thread
93  * continues.  This simplifies some of the locking semantics, and in particular
94  * there is no need to wait for stray threads to finish an operation when
95  * finalizing the RaitDevice object or when beginning a new operation.
96  */
97 #if !(GLIB_CHECK_VERSION(2,10,0))
98 #define USE_INTERNAL_THREADPOOL
99 #endif
100
101 typedef struct RaitDevicePrivate_s {
102     GPtrArray * children;
103     /* These flags are only relevant for reading. */
104     RaitStatus status;
105     /* If status == RAIT_STATUS_DEGRADED, this holds the index of the
106        failed node. It holds a negative number otherwise. */
107     int failed;
108
109     /* the child block size */
110     gsize child_block_size;
111
112 #ifdef USE_INTERNAL_THREADPOOL
113     /* array of ThreadInfo for performing parallel operations */
114     GArray *threads;
115
116     /* value of this semaphore is the number of threaded operations
117      * in progress */
118     semaphore_t *threads_sem;
119 #endif
120 } RaitDevicePrivate;
121
122 #ifdef USE_INTERNAL_THREADPOOL
123 typedef struct ThreadInfo {
124     GThread *thread;
125
126     /* struct fields below are protected by this mutex and condition variable */
127     GMutex *mutex;
128     GCond *cond;
129
130     gboolean die;
131     GFunc func;
132     gpointer data;
133
134     /* give threads access to active_threads and its mutex/cond */
135     struct RaitDevicePrivate_s *private;
136 } ThreadInfo;
137 #endif
138
139
140 #define PRIVATE(o) (o->private)
141
142 #define rait_device_in_error(dev) \
143     (device_in_error((dev)) || PRIVATE(RAIT_DEVICE((dev)))->status == RAIT_STATUS_FAILED)
144
145 void rait_device_register (void);
146
147 /* here are local prototypes */
148 static void rait_device_init (RaitDevice * o);
149 static void rait_device_class_init (RaitDeviceClass * c);
150 static void rait_device_base_init (RaitDeviceClass * c);
151 static void rait_device_open_device (Device * self, char * device_name, char * device_type, char * device_node);
152 static gboolean rait_device_start (Device * self, DeviceAccessMode mode,
153                                    char * label, char * timestamp);
154 static gboolean rait_device_configure(Device * self, gboolean use_global_config);
155 static gboolean rait_device_start_file(Device * self, dumpfile_t * info);
156 static gboolean rait_device_write_block (Device * self, guint size, gpointer data);
157 static gboolean rait_device_finish_file (Device * self);
158 static dumpfile_t * rait_device_seek_file (Device * self, guint file);
159 static gboolean rait_device_seek_block (Device * self, guint64 block);
160 static int      rait_device_read_block (Device * self, gpointer buf,
161                                         int * size);
162 static gboolean rait_device_recycle_file (Device * self, guint filenum);
163 static gboolean rait_device_finish (Device * self);
164 static DeviceStatusFlags rait_device_read_label(Device * dself);
165 static void find_simple_params(RaitDevice * self, guint * num_children,
166                                guint * data_children);
167
168 /* property handlers */
169
170 static gboolean property_get_block_size_fn(Device *self,
171     DevicePropertyBase *base, GValue *val,
172     PropertySurety *surety, PropertySource *source);
173
174 static gboolean property_set_block_size_fn(Device *self,
175     DevicePropertyBase *base, GValue *val,
176     PropertySurety surety, PropertySource source);
177
178 static gboolean property_get_canonical_name_fn(Device *self,
179     DevicePropertyBase *base, GValue *val,
180     PropertySurety *surety, PropertySource *source);
181
182 static gboolean property_get_concurrency_fn(Device *self,
183     DevicePropertyBase *base, GValue *val,
184     PropertySurety *surety, PropertySource *source);
185
186 static gboolean property_get_streaming_fn(Device *self,
187     DevicePropertyBase *base, GValue *val,
188     PropertySurety *surety, PropertySource *source);
189
190 static gboolean property_get_boolean_and_fn(Device *self,
191     DevicePropertyBase *base, GValue *val,
192     PropertySurety *surety, PropertySource *source);
193
194 static gboolean property_get_medium_access_type_fn(Device *self,
195     DevicePropertyBase *base, GValue *val,
196     PropertySurety *surety, PropertySource *source);
197
198 static gboolean property_get_free_space_fn(Device *self,
199     DevicePropertyBase *base, GValue *val,
200     PropertySurety *surety, PropertySource *source);
201
202 static gboolean property_get_max_volume_usage_fn(Device *self,
203     DevicePropertyBase *base, GValue *val,
204     PropertySurety *surety, PropertySource *source);
205
206 static gboolean property_set_max_volume_usage_fn(Device *self,
207     DevicePropertyBase *base, GValue *val,
208     PropertySurety surety, PropertySource source);
209
210
211 /* pointer to the class of our parent */
212 static DeviceClass *parent_class = NULL;
213
214 static GType
215 rait_device_get_type (void)
216 {
217     static GType type = 0;
218
219     if G_UNLIKELY(type == 0) {
220         static const GTypeInfo info = {
221             sizeof (RaitDeviceClass),
222             (GBaseInitFunc) rait_device_base_init,
223             (GBaseFinalizeFunc) NULL,
224             (GClassInitFunc) rait_device_class_init,
225             (GClassFinalizeFunc) NULL,
226             NULL /* class_data */,
227             sizeof (RaitDevice),
228             0 /* n_preallocs */,
229             (GInstanceInitFunc) rait_device_init,
230             NULL
231         };
232         
233         type = g_type_register_static (TYPE_DEVICE, "RaitDevice", &info,
234                                        (GTypeFlags)0);
235         }
236     
237     return type;
238 }
239
240 static void g_object_unref_foreach(gpointer data,
241                                    gpointer user_data G_GNUC_UNUSED) {
242     if (data != NULL && G_IS_OBJECT(data)) {
243         g_object_unref(data);
244     }
245 }
246
247 static void
248 rait_device_finalize(GObject *obj_self)
249 {
250     RaitDevice *self = RAIT_DEVICE (obj_self);
251     if(G_OBJECT_CLASS(parent_class)->finalize) \
252            (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
253     if(self->private->children) {
254         g_ptr_array_foreach(self->private->children,
255                             g_object_unref_foreach, NULL);
256         g_ptr_array_free (self->private->children, TRUE);
257         self->private->children = NULL;
258     }
259 #ifdef USE_INTERNAL_THREADPOOL
260     g_assert(PRIVATE(self)->threads_sem == NULL || PRIVATE(self)->threads_sem->value == 0);
261
262     if (PRIVATE(self)->threads) {
263         guint i;
264
265         for (i = 0; i < PRIVATE(self)->threads->len; i++) {
266             ThreadInfo *inf = &g_array_index(PRIVATE(self)->threads, ThreadInfo, i);
267             if (inf->thread) {
268                 /* NOTE: the thread is waiting on this condition right now, not
269                  * executing an operation. */
270
271                 /* ask the thread to die */
272                 g_mutex_lock(inf->mutex);
273                 inf->die = TRUE;
274                 g_cond_signal(inf->cond);
275                 g_mutex_unlock(inf->mutex);
276
277                 /* and wait for it to die, which should happen soon */
278                 g_thread_join(inf->thread);
279             }
280
281             if (inf->mutex)
282                 g_mutex_free(inf->mutex);
283             if (inf->cond)
284                 g_cond_free(inf->cond);
285         }
286     }
287
288     if (PRIVATE(self)->threads_sem)
289         semaphore_free(PRIVATE(self)->threads_sem);
290 #endif
291     amfree(self->private);
292 }
293
294 static void 
295 rait_device_init (RaitDevice * o G_GNUC_UNUSED)
296 {
297     PRIVATE(o) = g_new(RaitDevicePrivate, 1);
298     PRIVATE(o)->children = g_ptr_array_new();
299     PRIVATE(o)->status = RAIT_STATUS_COMPLETE;
300     PRIVATE(o)->failed = -1;
301 #ifdef USE_INTERNAL_THREADPOOL
302     PRIVATE(o)->threads = NULL;
303     PRIVATE(o)->threads_sem = NULL;
304 #endif
305 }
306
307 static void 
308 rait_device_class_init (RaitDeviceClass * c)
309 {
310     GObjectClass *g_object_class = (GObjectClass*) c;
311     DeviceClass *device_class = (DeviceClass *)c;
312
313     parent_class = g_type_class_ref (TYPE_DEVICE);
314
315     device_class->open_device = rait_device_open_device;
316     device_class->configure = rait_device_configure;
317     device_class->start = rait_device_start;
318     device_class->start_file = rait_device_start_file;
319     device_class->write_block = rait_device_write_block;
320     device_class->finish_file = rait_device_finish_file;
321     device_class->seek_file = rait_device_seek_file;
322     device_class->seek_block = rait_device_seek_block;
323     device_class->read_block = rait_device_read_block;
324     device_class->recycle_file = rait_device_recycle_file; 
325     device_class->finish = rait_device_finish;
326     device_class->read_label = rait_device_read_label;
327
328     g_object_class->finalize = rait_device_finalize;
329
330 #ifndef USE_INTERNAL_THREADPOOL
331 #if !GLIB_CHECK_VERSION(2,10,2)
332     /* Versions of glib before 2.10.2 crash if
333      * g_thread_pool_set_max_unused_threads is called before the first
334      * invocation of g_thread_pool_new.  So we make up a thread pool, but don't
335      * start any threads in it, and free it */
336     {
337         GThreadPool *pool = g_thread_pool_new((GFunc)-1, NULL, -1, FALSE, NULL);
338         g_thread_pool_free(pool, TRUE, FALSE);
339     }
340 #endif
341
342     g_thread_pool_set_max_unused_threads(-1);
343 #endif
344 }
345
346 static void
347 rait_device_base_init (RaitDeviceClass * c)
348 {
349     DeviceClass *device_class = (DeviceClass *)c;
350
351     /* the RAIT device overrides most of the standard properties, so that it
352      * can calculate them by querying the same property on the children */
353     device_class_register_property(device_class, PROPERTY_BLOCK_SIZE,
354             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
355             property_get_block_size_fn,
356             property_set_block_size_fn);
357
358     device_class_register_property(device_class, PROPERTY_CANONICAL_NAME,
359             PROPERTY_ACCESS_GET_MASK,
360             property_get_canonical_name_fn, NULL);
361
362     device_class_register_property(device_class, PROPERTY_CONCURRENCY,
363             PROPERTY_ACCESS_GET_MASK,
364             property_get_concurrency_fn, NULL);
365
366     device_class_register_property(device_class, PROPERTY_STREAMING,
367             PROPERTY_ACCESS_GET_MASK,
368             property_get_streaming_fn, NULL);
369
370     device_class_register_property(device_class, PROPERTY_APPENDABLE,
371             PROPERTY_ACCESS_GET_MASK,
372             property_get_boolean_and_fn, NULL);
373
374     device_class_register_property(device_class, PROPERTY_PARTIAL_DELETION,
375             PROPERTY_ACCESS_GET_MASK,
376             property_get_boolean_and_fn, NULL);
377
378     device_class_register_property(device_class, PROPERTY_MEDIUM_ACCESS_TYPE,
379             PROPERTY_ACCESS_GET_MASK,
380             property_get_medium_access_type_fn, NULL);
381
382     device_class_register_property(device_class, PROPERTY_FREE_SPACE,
383             PROPERTY_ACCESS_GET_MASK,
384             property_get_free_space_fn, NULL);
385
386     device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
387             PROPERTY_ACCESS_GET_MASK,
388             property_get_max_volume_usage_fn,
389             property_set_max_volume_usage_fn);
390 }
391
392 /* This function does something a little clever and a little
393  * complicated. It takes an array of operations and runs the given
394  * function on each element in the array. The trick is that it runs them
395  * all in parallel, in different threads. This is more efficient than it
396  * sounds because we use a GThreadPool, which means calling this function
397  * will probably not start any new threads at all, but rather use
398  * existing ones. The func is called with two gpointer arguments: The
399  * first from the array, the second is the data argument.
400  * 
401  * When it returns, all the operations have been successfully
402  * executed. If you want results from your operations, do it yourself
403  * through the array.
404  */
405
406 #ifdef USE_INTERNAL_THREADPOOL
407 static gpointer rait_thread_pool_func(gpointer data) {
408     ThreadInfo *inf = data;
409
410     g_mutex_lock(inf->mutex);
411     while (TRUE) {
412         while (!inf->die && !inf->func)
413             g_cond_wait(inf->cond, inf->mutex);
414
415         if (inf->die)
416             break;
417
418         if (inf->func) {
419             /* invoke the function */
420             inf->func(inf->data, NULL);
421             inf->func = NULL;
422             inf->data = NULL;
423
424             /* indicate that we're finished; will not block */
425             semaphore_down(inf->private->threads_sem);
426         }
427     }
428     g_mutex_unlock(inf->mutex);
429     return NULL;
430 }
431
432 static void do_thread_pool_op(RaitDevice *self, GFunc func, GPtrArray * ops) {
433     guint i;
434
435     if (PRIVATE(self)->threads_sem == NULL)
436         PRIVATE(self)->threads_sem = semaphore_new_with_value(0);
437
438     if (PRIVATE(self)->threads == NULL)
439         PRIVATE(self)->threads = g_array_sized_new(FALSE, TRUE,
440                                             sizeof(ThreadInfo), ops->len);
441
442     g_assert(PRIVATE(self)->threads_sem->value == 0);
443
444     if (PRIVATE(self)->threads->len < ops->len)
445         g_array_set_size(PRIVATE(self)->threads, ops->len);
446
447     /* the semaphore will hit zero when each thread has decremented it */
448     semaphore_force_set(PRIVATE(self)->threads_sem, ops->len);
449
450     for (i = 0; i < ops->len; i++) {
451         ThreadInfo *inf = &g_array_index(PRIVATE(self)->threads, ThreadInfo, i);
452         if (!inf->thread) {
453             inf->mutex = g_mutex_new();
454             inf->cond = g_cond_new();
455             inf->private = PRIVATE(self);
456             inf->thread = g_thread_create(rait_thread_pool_func, inf, TRUE, NULL);
457         }
458
459         /* set up the info the thread needs and trigger it to start */
460         g_mutex_lock(inf->mutex);
461         inf->data = g_ptr_array_index(ops, i);
462         inf->func = func;
463         g_cond_signal(inf->cond);
464         g_mutex_unlock(inf->mutex);
465     }
466
467     /* wait until semaphore hits zero */
468     semaphore_wait_empty(PRIVATE(self)->threads_sem);
469 }
470
471 #else /* USE_INTERNAL_THREADPOOL */
472
473 static void do_thread_pool_op(RaitDevice *self G_GNUC_UNUSED, GFunc func, GPtrArray * ops) {
474     GThreadPool * pool;
475     guint i;
476
477     pool = g_thread_pool_new(func, NULL, -1, FALSE, NULL);
478     for (i = 0; i < ops->len; i ++) {
479         g_thread_pool_push(pool, g_ptr_array_index(ops, i), NULL);
480     }
481
482     g_thread_pool_free(pool, FALSE, TRUE);
483 }
484
485 #endif /* USE_INTERNAL_THREADPOOL */
486
487 /* This does the above, in a serial fashion (and without using threads) */
488 static void do_unthreaded_ops(RaitDevice *self G_GNUC_UNUSED, GFunc func, GPtrArray * ops) {
489     guint i;
490
491     for (i = 0; i < ops->len; i ++) {
492         func(g_ptr_array_index(ops, i), NULL);
493     }
494 }
495
496 /* This is the one that code below should call. It switches
497    automatically between do_thread_pool_op and do_unthreaded_ops,
498    depending on g_thread_supported(). */
499 static void do_rait_child_ops(RaitDevice *self, GFunc func, GPtrArray * ops) {
500     if (g_thread_supported()) {
501         do_thread_pool_op(self, func, ops);
502     } else {
503         do_unthreaded_ops(self, func, ops);
504     }
505 }
506
507 /* Helper for parse_device_name; returns a list of un-escaped strings for
508  * the first "component" of str, where a component is a plain string or a
509  * brace-enclosed set of alternatives.  str is pointing to the first character
510  * of the next component on return. */
511 static GPtrArray *
512 parse_device_name_component(char **str)
513 {
514     GPtrArray *result = g_ptr_array_new();
515
516     if (**str == '{') {
517         char *p = (*str)+1;
518         char *local = g_malloc(strlen(*str)+1);
519         char *current = local;
520         char *c = current;
521
522         while (1) {
523             if (*p == '\0' || *p == '{') {
524                 /* unterminated { .. } or extra '{' */
525                 amfree(local);
526                 g_ptr_array_free(result, TRUE);
527                 return NULL;
528             }
529
530             if (*p == '}' || *p == ',') {
531                 *c = '\0';
532                 g_ptr_array_add(result, g_strdup(current));
533                 current = ++c;
534
535                 if (*p == '}')
536                     break;
537                 else
538                     p++;
539             }
540
541             if (*p == '\\') {
542                 if (*(p+1) == '{' || *(p+1) == '}' || *(p+1) == '\\' || *(p+1) == ',')
543                     p++;
544             }
545             *(c++) = *(p++);
546         }
547
548         amfree(local);
549
550         if (*p)
551             *str = p+1;
552         else
553             *str = p;
554     } else {
555         /* no braces -- just un-escape a plain string */
556         char *local = g_malloc(strlen(*str)+1);
557         char *r = local;
558         char *p = *str;
559
560         while (*p && *p != '{') {
561             if (*p == '\\') {
562                 if (*(p+1) == '{' || *(p+1) == '}' || *(p+1) == '\\' || *(p+1) == ',')
563                     p++;
564             }
565             *(r++) = *(p++);
566         }
567         *r = '\0';
568         g_ptr_array_add(result, local);
569         *str = p;
570     }
571
572     return result;
573 }
574
575 /* Take a text string user_name, and break it out into an argv-style
576    array of strings, using a {..,..} syntax similar to shell brace expansion.
577    For example:
578
579      "{foo,bar,bat}" -> [ "foo", "bar", "bat" ]
580      "foo{1,2}bar" -> [ "foo1bar", "foo2bar" ]
581      "foo{1\,2,3}bar" -> [ "foo1,2bar", "foo3bar" ]
582      "{a,b}-{1,2}" -> [ "a-1", "a-2", "b-1", "b-2" ]
583
584    Note that nested braces are not processed.  Braces, commas, and backslashes
585    may be escaped with backslashes.  Returns NULL on invalid strings.
586    */
587
588 static GPtrArray *
589 parse_device_name(char * user_name)
590 {
591     GPtrArray *rval = g_ptr_array_new();
592
593     g_ptr_array_add(rval, g_strdup(""));
594
595     while (*user_name) {
596         GPtrArray *new_components;
597         GPtrArray *new_rval;
598         guint i, j;
599
600         new_components = parse_device_name_component(&user_name);
601         if (!new_components) {
602             /* parse error */
603             g_ptr_array_free(rval, TRUE);
604             return NULL;
605         }
606
607         new_rval = g_ptr_array_new();
608
609         /* do a cartesian join of rval and new_components */
610         for (i = 0; i < rval->len; i++) {
611             for (j = 0; j < new_components->len; j++) {
612                 g_ptr_array_add(new_rval, g_strconcat(
613                     g_ptr_array_index(rval, i),
614                     g_ptr_array_index(new_components, j),
615                     NULL));
616             }
617         }
618
619         g_ptr_array_free(rval, TRUE);
620         g_ptr_array_free(new_components, TRUE);
621         rval = new_rval;
622     }
623
624     return rval;
625 }
626
627 static char *
628 child_device_names_to_rait_name(RaitDevice * self) {
629     GString *rait_name = NULL;
630     guint i;
631
632     rait_name = g_string_new("rait:{");
633
634     for (i = 0; i < self->private->children->len; i ++) {
635         Device *child = g_ptr_array_index(self->private->children, i);
636         const char *child_name = NULL;
637         GValue val;
638         gboolean got_prop = FALSE;
639
640         bzero(&val, sizeof(val));
641
642         if ((signed)i != self->private->failed) {
643             if (device_property_get(child, PROPERTY_CANONICAL_NAME, &val)) {
644                 child_name = g_value_get_string(&val);
645                 got_prop = TRUE;
646             }
647         }
648
649         if (!got_prop)
650             child_name = "MISSING";
651
652         g_string_append_printf(rait_name, "%s%s", child_name,
653                 (i < self->private->children->len-1)? "," : "");
654
655         if (got_prop)
656             g_value_unset(&val);
657     }
658
659     g_string_append(rait_name, "}");
660     return g_string_free(rait_name, FALSE);
661 }
662
663 /* Find a workable child block size, based on the block size ranges of our
664  * child devices.
665  *
666  * The algorithm is to construct the intersection of all child devices'
667  * [min,max] block size ranges, and then pick the block size closest to 32k
668  * that is in the resulting range.  This avoids picking ridiculously small (1
669  * byte) or large (INT_MAX) block sizes when using devices with wide-open block
670  * size ranges.
671
672  * This function returns the calculated child block size directly, and the RAIT
673  * device's blocksize via rait_size, if not NULL.  It is resilient to errors in
674  * a single child device, but sets the device's error status and returns 0 if
675  * it cannot determine an agreeable block size.
676  */
677 static gsize
678 calculate_block_size_from_children(RaitDevice * self, gsize *rait_size)
679 {
680     gsize min = 0;
681     gsize max = SIZE_MAX;
682     gboolean found_one = FALSE;
683     gsize result;
684     guint i;
685
686     for (i = 0; i < self->private->children->len; i ++) {
687         gsize child_min = SIZE_MAX, child_max = 0;
688         Device *child;
689         GValue property_result;
690         PropertySource source;
691
692         bzero(&property_result, sizeof(property_result));
693
694         if ((signed)i == self->private->failed)
695             continue;
696
697         child = g_ptr_array_index(self->private->children, i);
698         if (!device_property_get_ex(child, PROPERTY_BLOCK_SIZE,
699                                  &property_result, NULL, &source)) {
700             g_warning("Error getting BLOCK_SIZE from %s: %s",
701                     child->device_name, device_error_or_status(child));
702             continue;
703         }
704
705         /* if the block size has been set explicitly, then we need to use that blocksize;
706          * otherwise (even if it was DETECTED), override it. */
707         if (source == PROPERTY_SOURCE_USER) {
708             child_min = child_max = g_value_get_int(&property_result);
709         } else {
710             if (!device_property_get(child, PROPERTY_MIN_BLOCK_SIZE,
711                                      &property_result)) {
712                 g_warning("Error getting MIN_BLOCK_SIZE from %s: %s",
713                         child->device_name, device_error_or_status(child));
714                 continue;
715             }
716             child_min = g_value_get_uint(&property_result);
717
718             if (!device_property_get(child, PROPERTY_MAX_BLOCK_SIZE,
719                                      &property_result)) {
720                 g_warning("Error getting MAX_BLOCK_SIZE from %s: %s",
721                         child->device_name, device_error_or_status(child));
722                 continue;
723             }
724             child_max = g_value_get_uint(&property_result);
725
726             if (child_min == 0 || child_max == 0 || (child_min > child_max)) {
727                 g_warning("Invalid min, max block sizes from %s", child->device_name);
728                 continue;
729             }
730         }
731
732         found_one = TRUE;
733         min = MAX(min, child_min);
734         max = MIN(max, child_max);
735     }
736
737     if (!found_one) {
738         device_set_error((Device*)self,
739             stralloc(_("Could not find any child devices' block size ranges")),
740             DEVICE_STATUS_DEVICE_ERROR);
741         return 0;
742     }
743
744     if (min > max) {
745         device_set_error((Device*)self,
746             stralloc(_("No block size is acceptable to all child devices")),
747             DEVICE_STATUS_DEVICE_ERROR);
748         return 0;
749     }
750
751     /* Now pick a number.  If 32k is in range, we use that; otherwise, we use
752      * the nearest acceptable size. */
753     result = CLAMP(32768, min, max);
754
755     if (rait_size) {
756         guint data_children;
757         find_simple_params(self, NULL, &data_children);
758         *rait_size = result * data_children;
759     }
760
761     return result;
762 }
763
764 /* Set BLOCK_SIZE on all children */
765 static gboolean
766 set_block_size_on_children(RaitDevice *self, gsize child_block_size)
767 {
768     GValue val;
769     guint i;
770     PropertySource source;
771
772     bzero(&val, sizeof(val));
773
774     g_assert(child_block_size < INT_MAX);
775     g_value_init(&val, G_TYPE_INT);
776     g_value_set_int(&val, (gint)child_block_size);
777
778     for (i = 0; i < self->private->children->len; i ++) {
779         Device *child;
780         GValue property_result;
781
782         bzero(&property_result, sizeof(property_result));
783
784         if ((signed)i == self->private->failed)
785             continue;
786
787         child = g_ptr_array_index(self->private->children, i);
788
789         /* first, make sure the block size is at its default, or is already
790          * correct */
791         if (device_property_get_ex(child, PROPERTY_BLOCK_SIZE,
792                                  &property_result, NULL, &source)) {
793             gsize from_child = g_value_get_int(&property_result);
794             if (source != PROPERTY_SOURCE_DEFAULT
795                     && from_child != child_block_size) {
796                 device_set_error((Device *)self,
797                     vstrallocf(_("Child device %s already has its block size set to %zd, not %zd"),
798                                 child->device_name, from_child, child_block_size),
799                     DEVICE_STATUS_DEVICE_ERROR);
800                 return FALSE;
801             }
802         } else {
803             /* failing to get the block size isn't necessarily fatal.. */
804             g_warning("Error getting BLOCK_SIZE from %s: %s",
805                     child->device_name, device_error_or_status(child));
806         }
807         g_value_unset(&property_result);
808
809         if (!device_property_set(child, PROPERTY_BLOCK_SIZE, &val)) {
810             device_set_error((Device *)self,
811                 vstrallocf(_("Error setting block size on %s"), child->device_name),
812                 DEVICE_STATUS_DEVICE_ERROR);
813             return FALSE;
814         }
815     }
816
817     return TRUE;
818 }
819
820 /* The time for users to specify block sizes has ended; set this device's
821  * block-size attributes for easy access by other RAIT functions.  Returns
822  * FALSE on error, with the device's error status already set. */
823 static gboolean
824 fix_block_size(RaitDevice *self)
825 {
826     Device *dself = (Device *)self;
827     gsize my_block_size, child_block_size;
828
829     if (dself->block_size_source == PROPERTY_SOURCE_DEFAULT) {
830         child_block_size = calculate_block_size_from_children(self, &my_block_size);
831         if (child_block_size == 0)
832             return FALSE;
833
834         self->private->child_block_size = child_block_size;
835         dself->block_size = my_block_size;
836         dself->block_size_surety = PROPERTY_SURETY_GOOD;
837         dself->block_size_source = PROPERTY_SOURCE_DETECTED;
838     } else {
839         guint data_children;
840
841         find_simple_params(self, NULL, &data_children);
842         g_assert((dself->block_size % data_children) == 0);
843         child_block_size = dself->block_size / data_children;
844     }
845
846     /* now tell the children we mean it */
847     if (!set_block_size_on_children(self, child_block_size))
848         return FALSE;
849
850     return TRUE;
851 }
852
853 /* This structure contains common fields for many operations. Not all
854    operations use all fields, however. */
855 typedef struct {
856     gpointer result; /* May be a pointer; may be an integer or boolean
857                         stored with GINT_TO_POINTER. */
858     Device * child;  /* The device in question. Used by all
859                         operations. */
860     guint child_index; /* For recoverable operations (read-related
861                           operations), this field provides the number
862                           of this child in the self->private->children
863                           array. */
864 } GenericOp;
865
866 typedef gboolean (*BooleanExtractor)(gpointer data);
867
868 /* A BooleanExtractor */
869 static gboolean extract_boolean_generic_op(gpointer data) {
870     GenericOp * op = data;
871     return GPOINTER_TO_INT(op->result);
872 }
873
874 /* A BooleanExtractor */
875 static gboolean extract_boolean_pointer_op(gpointer data) {
876     GenericOp * op = data;
877     return op->result != NULL;
878 }
879
880 /* Does the equivalent of this perl command:
881      ! (first { !extractor($_) } @_
882    That is, calls extractor on each element of the array, and returns
883    TRUE if and only if all calls to extractor return TRUE. This function
884    stops as soon as an extractor returns false, so it's best if extractor
885    functions have no side effects.
886 */
887 static gboolean g_ptr_array_and(GPtrArray * array,
888                                 BooleanExtractor extractor) {
889     guint i;
890     if (array == NULL || array->len <= 0)
891         return FALSE;
892
893     for (i = 0; i < array->len; i ++) {
894         if (!extractor(g_ptr_array_index(array, i)))
895             return FALSE;
896     }
897
898     return TRUE;
899 }
900
901 /* Takes a RaitDevice, and makes a GPtrArray of GenericOp. */
902 static GPtrArray * make_generic_boolean_op_array(RaitDevice* self) {
903     GPtrArray * rval;
904     guint i;
905
906     rval = g_ptr_array_sized_new(self->private->children->len);
907     for (i = 0; i < self->private->children->len; i ++) {
908         GenericOp * op;
909
910         if ((signed)i == self->private->failed) {
911             continue;
912         }
913
914         op = g_new(GenericOp, 1);
915         op->child = g_ptr_array_index(self->private->children, i);
916         op->child_index = i;
917         g_ptr_array_add(rval, op);
918     }
919
920     return rval;
921 }
922
923 /* Takes a GPtrArray of GenericOp, and a BooleanExtractor, and does
924    all the proper handling for the result of operations that allow
925    device isolation. Returns FALSE only if an unrecoverable error
926    occured. */
927 static gboolean g_ptr_array_union_robust(RaitDevice * self, GPtrArray * ops,
928                                          BooleanExtractor extractor) {
929     int nfailed = 0;
930     int lastfailed = 0;
931     guint i;
932
933     /* We found one or more failed elements.  See which elements failed, and
934      * isolate them*/
935     for (i = 0; i < ops->len; i ++) {
936         GenericOp * op = g_ptr_array_index(ops, i);
937         if (!extractor(op)) {
938             self->private->failed = op->child_index;
939             g_warning("RAIT array %s isolated device %s: %s",
940                     DEVICE(self)->device_name,
941                     op->child->device_name,
942                     device_error(op->child));
943             nfailed++;
944             lastfailed = i;
945         }
946     }
947
948     /* no failures? great! */
949     if (nfailed == 0)
950         return TRUE;
951
952     /* a single failure in COMPLETE just puts us in DEGRADED mode */
953     if (self->private->status == RAIT_STATUS_COMPLETE && nfailed == 1) {
954         self->private->status = RAIT_STATUS_DEGRADED;
955         self->private->failed = lastfailed;
956         g_warning("RAIT array %s DEGRADED", DEVICE(self)->device_name);
957         return TRUE;
958     } else {
959         self->private->status = RAIT_STATUS_FAILED;
960         g_warning("RAIT array %s FAILED", DEVICE(self)->device_name);
961         return FALSE;
962     }
963 }
964
965 typedef struct {
966     RaitDevice * self;
967     char *rait_name;
968     char * device_name; /* IN */
969     Device * result;    /* OUT */
970 } OpenDeviceOp;
971
972 /* A GFunc. */
973 static void device_open_do_op(gpointer data,
974                               gpointer user_data G_GNUC_UNUSED) {
975     OpenDeviceOp * op = data;
976
977     if (strcmp(op->device_name, "ERROR") == 0 ||
978         strcmp(op->device_name, "MISSING") == 0 ||
979         strcmp(op->device_name, "DEGRADED") == 0) {
980         g_warning("RAIT device %s contains a missing element, attempting "
981                   "degraded mode.\n", op->rait_name);
982         op->result = NULL;
983     } else {
984         op->result = device_open(op->device_name);
985     }
986 }
987
988 /* Returns TRUE if and only if the volume label and time are equal. */
989 static gboolean compare_volume_results(Device * a, Device * b) {
990     return (0 == compare_possibly_null_strings(a->volume_time, b->volume_time)
991          && 0 == compare_possibly_null_strings(a->volume_label, b->volume_label));
992 }
993
994 /* Stickes new_message at the end of *old_message; frees new_message and
995  * may change *old_message. */
996 static void append_message(char ** old_message, char * new_message) {
997     char * rval;
998     if (*old_message == NULL || **old_message == '\0') {
999         rval = new_message;
1000     } else {
1001         rval = g_strdup_printf("%s; %s", *old_message, new_message);
1002         amfree(new_message);
1003     }
1004     amfree(*old_message);
1005     *old_message = rval;
1006 }
1007
1008 static void
1009 rait_device_open_device (Device * dself, char * device_name,
1010             char * device_type G_GNUC_UNUSED, char * device_node) {
1011     GPtrArray *device_names;
1012     GPtrArray * device_open_ops;
1013     guint i;
1014     gboolean failure;
1015     char *failure_errmsgs;
1016     DeviceStatusFlags failure_flags;
1017     RaitDevice * self;
1018
1019     self = RAIT_DEVICE(dself);
1020
1021     device_names = parse_device_name(device_node);
1022
1023     if (device_names == NULL) {
1024         device_set_error(dself,
1025             vstrallocf(_("Invalid RAIT device name '%s'"), device_name),
1026             DEVICE_STATUS_DEVICE_ERROR);
1027         return;
1028     }
1029
1030     /* Open devices in a separate thread, in case they have to rewind etc. */
1031     device_open_ops = g_ptr_array_new();
1032
1033     for (i = 0; i < device_names->len; i++) {
1034         OpenDeviceOp *op;
1035         char *name = g_ptr_array_index(device_names, i);
1036
1037         op = g_new(OpenDeviceOp, 1);
1038         op->device_name = name;
1039         op->result = NULL;
1040         op->self = self;
1041         op->rait_name = device_name;
1042         g_ptr_array_add(device_open_ops, op);
1043     }
1044
1045     g_ptr_array_free(device_names, TRUE);
1046     do_rait_child_ops(self, device_open_do_op, device_open_ops);
1047
1048     failure = FALSE;
1049     failure_errmsgs = NULL;
1050     failure_flags = 0;
1051
1052     /* Check results of opening devices. */
1053     for (i = 0; i < device_open_ops->len; i ++) {
1054         OpenDeviceOp *op = g_ptr_array_index(device_open_ops, i);
1055
1056         if (op->result != NULL &&
1057             op->result->status == DEVICE_STATUS_SUCCESS) {
1058             g_ptr_array_add(self->private->children, op->result);
1059         } else {
1060             char * this_failure_errmsg =
1061                 g_strdup_printf("%s: %s", op->device_name,
1062                                 device_error_or_status(op->result));
1063             DeviceStatusFlags status =
1064                 op->result == NULL ?
1065                     DEVICE_STATUS_DEVICE_ERROR : op->result->status;
1066             append_message(&failure_errmsgs,
1067                            strdup(this_failure_errmsg));
1068             failure_flags |= status;
1069             if (self->private->status == RAIT_STATUS_COMPLETE) {
1070                 /* The first failure just puts us in degraded mode. */
1071                 g_warning("%s: %s",
1072                           device_name, this_failure_errmsg);
1073                 g_warning("%s: %s failed, entering degraded mode.",
1074                           device_name, op->device_name);
1075                 g_ptr_array_add(self->private->children, op->result);
1076                 self->private->status = RAIT_STATUS_DEGRADED;
1077                 self->private->failed = i;
1078             } else {
1079                 /* The second and further failures are fatal. */
1080                 failure = TRUE;
1081             }
1082         }
1083         amfree(op->device_name);
1084     }
1085
1086     g_ptr_array_free_full(device_open_ops);
1087
1088     if (failure) {
1089         self->private->status = RAIT_STATUS_FAILED;
1090         device_set_error(dself, failure_errmsgs, failure_flags);
1091         return;
1092     }
1093
1094     /* Chain up. */
1095     if (parent_class->open_device) {
1096         parent_class->open_device(dself, device_name, device_type, device_node);
1097     }
1098     return;
1099 }
1100
1101 /* A GFunc. */
1102 static void read_label_do_op(gpointer data,
1103                              gpointer user_data G_GNUC_UNUSED) {
1104     GenericOp * op = data;
1105     op->result = GINT_TO_POINTER(device_read_label(op->child));
1106 }
1107
1108 static DeviceStatusFlags rait_device_read_label(Device * dself) {
1109     RaitDevice * self;
1110     GPtrArray * ops;
1111     DeviceStatusFlags failed_result = 0;
1112     char *failed_errmsg = NULL;
1113     unsigned int i;
1114     Device * first_success = NULL;
1115
1116     self = RAIT_DEVICE(dself);
1117
1118     amfree(dself->volume_time);
1119     amfree(dself->volume_label);
1120     amfree(dself->volume_header);
1121
1122     if (rait_device_in_error(self))
1123         return dself->status | DEVICE_STATUS_DEVICE_ERROR;
1124
1125     /* nail down our block size, if we haven't already */
1126     if (!fix_block_size(self))
1127         return FALSE;
1128
1129     ops = make_generic_boolean_op_array(self);
1130     
1131     do_rait_child_ops(self, read_label_do_op, ops);
1132     
1133     for (i = 0; i < ops->len; i ++) {
1134         GenericOp * op = g_ptr_array_index(ops, i);
1135         DeviceStatusFlags result = GPOINTER_TO_INT(op->result);
1136         if (op->result == DEVICE_STATUS_SUCCESS) {
1137             if (first_success == NULL) {
1138                 /* This is the first successful device. */
1139                 first_success = op->child;
1140             } else if (!compare_volume_results(first_success, op->child)) {
1141                 /* Doesn't match. :-( */
1142                 failed_errmsg = vstrallocf("Inconsistent volume labels/datestamps: "
1143                         "Got %s/%s on %s against %s/%s on %s.",
1144                         first_success->volume_label,
1145                         first_success->volume_time,
1146                         first_success->device_name,
1147                         op->child->volume_label,
1148                         op->child->volume_time,
1149                         op->child->device_name);
1150                 g_warning("%s", failed_errmsg);
1151                 failed_result |= DEVICE_STATUS_VOLUME_ERROR;
1152             }
1153         } else {
1154             failed_result |= result;
1155         }
1156     }
1157
1158     if (failed_result != DEVICE_STATUS_SUCCESS) {
1159         /* We had multiple failures or an inconsistency. */
1160         device_set_error(dself, failed_errmsg, failed_result);
1161     } else {
1162         /* Everything peachy. */
1163         amfree(failed_errmsg);
1164
1165         g_assert(first_success != NULL);
1166         if (first_success->volume_label != NULL) {
1167             dself->volume_label = g_strdup(first_success->volume_label);
1168         }
1169         if (first_success->volume_time != NULL) {
1170             dself->volume_time = g_strdup(first_success->volume_time);
1171         }
1172         if (first_success->volume_header != NULL) {
1173             dself->volume_header = dumpfile_copy(first_success->volume_header);
1174         }
1175     }
1176     
1177     g_ptr_array_free_full(ops);
1178
1179     return dself->status;
1180 }
1181
1182 typedef struct {
1183     GenericOp base;
1184     DeviceAccessMode mode; /* IN */
1185     char * label;          /* IN */
1186     char * timestamp;      /* IN */
1187 } StartOp;
1188
1189 /* A GFunc. */
1190 static void start_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1191     DeviceClass *klass;
1192     StartOp * param = data;
1193     
1194     klass = DEVICE_GET_CLASS(param->base.child);
1195     if (klass->start) {
1196         param->base.result =
1197             GINT_TO_POINTER((klass->start)(param->base.child,
1198                                             param->mode, param->label,
1199                                             param->timestamp));
1200     } else {
1201         param->base.result = FALSE;
1202     }
1203 }
1204
1205 static gboolean
1206 rait_device_configure(Device * dself, gboolean use_global_config)
1207 {
1208     RaitDevice *self = RAIT_DEVICE(dself);
1209     guint i;
1210
1211     for (i = 0; i < self->private->children->len; i ++) {
1212         Device *child;
1213
1214         if ((signed)i == self->private->failed)
1215             continue;
1216
1217         child = g_ptr_array_index(self->private->children, i);
1218         /* unconditionally configure the child without the global
1219          * configuration */
1220         if (!device_configure(child, FALSE))
1221             return FALSE;
1222     }
1223
1224     if (parent_class->configure) {
1225         return parent_class->configure(dself, use_global_config);
1226     }
1227
1228     return TRUE;
1229 }
1230
1231 static gboolean 
1232 rait_device_start (Device * dself, DeviceAccessMode mode, char * label,
1233                    char * timestamp) {
1234     GPtrArray * ops;
1235     guint i;
1236     gboolean success;
1237     RaitDevice * self;
1238     DeviceStatusFlags total_status;
1239     char *failure_errmsgs = NULL;
1240     char * label_from_device = NULL;
1241
1242     self = RAIT_DEVICE(dself);
1243
1244     if (rait_device_in_error(self)) return FALSE;
1245
1246     /* No starting in degraded mode. */
1247     if (self->private->status != RAIT_STATUS_COMPLETE &&
1248         (mode == ACCESS_WRITE || mode == ACCESS_APPEND)) {
1249         device_set_error(dself,
1250                          g_strdup_printf(_("RAIT device %s is read-only "
1251                                            "because it is in degraded mode.\n"),
1252                                          dself->device_name),
1253                          DEVICE_STATUS_DEVICE_ERROR);
1254         return FALSE;
1255     }
1256
1257     /* nail down our block size, if we haven't already */
1258     if (!fix_block_size(self))
1259         return FALSE;
1260
1261     dself->access_mode = mode;
1262     dself->in_file = FALSE;
1263     amfree(dself->volume_label);
1264     amfree(dself->volume_time);
1265
1266     ops = g_ptr_array_sized_new(self->private->children->len);
1267     for (i = 0; i < self->private->children->len; i ++) {
1268         StartOp * op;
1269         
1270         if ((signed)i == self->private->failed) {
1271             continue;
1272         }
1273
1274         op = g_new(StartOp, 1);
1275         op->base.child = g_ptr_array_index(self->private->children, i);
1276         op->mode = mode;
1277         op->label = g_strdup(label);
1278         op->timestamp = g_strdup(timestamp);
1279         g_ptr_array_add(ops, op);
1280     }
1281     
1282     do_rait_child_ops(self, start_do_op, ops);
1283
1284     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1285
1286     /* Check results of starting devices; this is mostly about the
1287      * VOLUME_UNLABELED flag. */
1288     total_status = 0;
1289     for (i = 0; i < ops->len; i ++) {
1290         StartOp * op = g_ptr_array_index(ops, i);
1291         Device *child = op->base.child;
1292
1293         total_status |= child->status;
1294         if (child->status != DEVICE_STATUS_SUCCESS) {
1295             /* record the error message and move on. */
1296             append_message(&failure_errmsgs,
1297                            g_strdup_printf("%s: %s",
1298                                            child->device_name,
1299                                            device_error_or_status(child)));
1300         } else {
1301             if (child->volume_label != NULL && child->volume_time != NULL) {
1302                 if (label_from_device) {
1303                     if (strcmp(child->volume_label, dself->volume_label) != 0 ||
1304                         strcmp(child->volume_time, dself->volume_time) != 0) {
1305                         /* Mismatch! (Two devices provided different labels) */
1306                         char * this_message =
1307                             g_strdup_printf("%s: Label (%s/%s) is different "
1308                                             "from label (%s/%s) found at "
1309                                             "device %s",
1310                                             child->device_name,
1311                                             child->volume_label,
1312                                             child->volume_time,
1313                                             dself->volume_label,
1314                                             dself->volume_time,
1315                                             label_from_device);
1316                         append_message(&failure_errmsgs, this_message);
1317                         total_status |= DEVICE_STATUS_DEVICE_ERROR;
1318                         g_warning("RAIT device children have different labels or timestamps");
1319                     }
1320                 } else {
1321                     /* First device with a volume. */
1322                     dself->volume_label = g_strdup(child->volume_label);
1323                     dself->volume_time = g_strdup(child->volume_time);
1324                     label_from_device = g_strdup(child->device_name);
1325                 }
1326             } else {
1327                 /* Device problem, it says it succeeded but sets no label? */
1328                 char * this_message =
1329                     g_strdup_printf("%s: Says label read, but no volume "
1330                                      "label found.", child->device_name);
1331                 g_warning("RAIT device child has NULL volume or label");
1332                 append_message(&failure_errmsgs, this_message);
1333                 total_status |= DEVICE_STATUS_DEVICE_ERROR;
1334             }
1335         }
1336     }
1337
1338     amfree(label_from_device);
1339     g_ptr_array_free_full(ops);
1340
1341     dself->status = total_status;
1342
1343     if (total_status != DEVICE_STATUS_SUCCESS || !success) {
1344         device_set_error(dself, failure_errmsgs, total_status);
1345         return FALSE;
1346     }
1347
1348     amfree(failure_errmsgs);
1349     return TRUE;
1350 }
1351
1352 typedef struct {
1353     GenericOp base;
1354     dumpfile_t * info; /* IN */
1355     int fileno;
1356 } StartFileOp;
1357
1358 /* a GFunc */
1359 static void start_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1360     StartFileOp * op = data;
1361     op->base.result = GINT_TO_POINTER(device_start_file(op->base.child,
1362                                                         op->info));
1363     op->fileno = op->base.child->file;
1364     if (op->fileno < 1) {
1365         op->base.result = FALSE;
1366     }
1367 }
1368
1369 static gboolean
1370 rait_device_start_file (Device * dself, dumpfile_t * info) {
1371     GPtrArray * ops;
1372     guint i;
1373     gboolean success;
1374     RaitDevice * self;
1375     int actual_file = -1;
1376
1377     self = RAIT_DEVICE(dself);
1378
1379     if (rait_device_in_error(self)) return FALSE;
1380     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1381
1382     ops = g_ptr_array_sized_new(self->private->children->len);
1383     for (i = 0; i < self->private->children->len; i ++) {
1384         StartFileOp * op;
1385         op = g_new(StartFileOp, 1);
1386         op->base.child = g_ptr_array_index(self->private->children, i);
1387         /* each child gets its own copy of the header, to munge as it
1388          * likes (setting blocksize, at least) */
1389         op->info = dumpfile_copy(info);
1390         g_ptr_array_add(ops, op);
1391     }
1392     
1393     do_rait_child_ops(self, start_file_do_op, ops);
1394
1395     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1396     
1397     for (i = 0; i < self->private->children->len && success; i ++) {
1398         StartFileOp * op = g_ptr_array_index(ops, i);
1399         if (!op->base.result)
1400             continue;
1401         g_assert(op->fileno >= 1);
1402         if (actual_file < 1) {
1403             actual_file = op->fileno;
1404         }
1405         if (actual_file != op->fileno) {
1406             /* File number mismatch! Aah, my hair is on fire! */
1407             device_set_error(dself,
1408                              g_strdup_printf("File number mismatch in "
1409                                              "rait_device_start_file(): "
1410                                              "Child %s reported file number "
1411                                              "%d, another child reported "
1412                                              "file number %d.",
1413                                              op->base.child->device_name,
1414                                              op->fileno, actual_file),
1415                              DEVICE_STATUS_DEVICE_ERROR);
1416             success = FALSE;
1417             op->base.result = FALSE;
1418         }
1419     }
1420
1421     for (i = 0; i < ops->len && success; i ++) {
1422         StartFileOp * op = g_ptr_array_index(ops, i);
1423         if (op->info) dumpfile_free(op->info);
1424     }
1425     g_ptr_array_free_full(ops);
1426
1427     if (!success) {
1428         if (!device_in_error(dself)) {
1429             device_set_error(dself, stralloc("One or more devices "
1430                                              "failed to start_file"),
1431                              DEVICE_STATUS_DEVICE_ERROR);
1432         }
1433         return FALSE;
1434     }
1435
1436     dself->in_file = TRUE;
1437     g_assert(actual_file >= 1);
1438     dself->file = actual_file;
1439
1440     return TRUE;
1441 }
1442
1443 static void find_simple_params(RaitDevice * self,
1444                                guint * num_children,
1445                                guint * data_children) {
1446     int num, data;
1447     
1448     num = self->private->children->len;
1449     if (num > 1)
1450         data = num - 1;
1451     else
1452         data = num;
1453     if (num_children != NULL)
1454         *num_children = num;
1455     if (data_children != NULL)
1456         *data_children = data;
1457 }
1458
1459 typedef struct {
1460     GenericOp base;
1461     guint size;           /* IN */
1462     gpointer data;        /* IN */
1463     gboolean data_needs_free; /* bookkeeping */
1464 } WriteBlockOp;
1465
1466 /* a GFunc. */
1467 static void write_block_do_op(gpointer data,
1468                               gpointer user_data G_GNUC_UNUSED) {
1469     WriteBlockOp * op = data;
1470
1471     op->base.result =
1472         GINT_TO_POINTER(device_write_block(op->base.child, op->size, op->data));
1473 }
1474
1475 /* Parity block generation. Performance of this function can be improved
1476    considerably by using larger-sized integers or
1477    assembly-coded vector instructions. Parameters are:
1478    % data       - All data chunks in series (chunk_size * num_chunks bytes)
1479    % parity     - Allocated space for parity block (chunk_size bytes)
1480  */
1481 static void make_parity_block(char * data, char * parity,
1482                               guint chunk_size, guint num_chunks) {
1483     guint i;
1484     bzero(parity, chunk_size);
1485     for (i = 0; i < num_chunks - 1; i ++) {
1486         guint j;
1487         for (j = 0; j < chunk_size; j ++) {
1488             parity[j] ^= data[chunk_size*i + j];
1489         }
1490     }
1491 }
1492
1493 /* Does the same thing as make_parity_block, but instead of using a
1494    single memory chunk holding all chunks, it takes a GPtrArray of
1495    chunks. */
1496 static void make_parity_block_extents(GPtrArray * data, char * parity,
1497                                       guint chunk_size) {
1498     guint i;
1499     bzero(parity, chunk_size);
1500     for (i = 0; i < data->len; i ++) {
1501         guint j;
1502         char * data_chunk;
1503         data_chunk = g_ptr_array_index(data, i);
1504         for (j = 0; j < chunk_size; j ++) {
1505             parity[j] ^= data_chunk[j];
1506         }
1507     }
1508 }
1509
1510 /* Does the parity creation algorithm. Allocates and returns a single
1511    device block from a larger RAIT block. chunks and chunk are 1-indexed. */
1512 static char * extract_data_block(char * data, guint size,
1513                                  guint chunks, guint chunk) {
1514     char * rval;
1515     guint chunk_size;
1516
1517     g_assert(chunks > 0 && chunk > 0 && chunk <= chunks);
1518     g_assert(data != NULL);
1519     g_assert(size > 0 && size % (chunks - 1) == 0);
1520
1521     chunk_size = size / (chunks - 1);
1522     rval = g_malloc(chunk_size);
1523     if (chunks != chunk) {
1524         /* data block. */
1525         memcpy(rval, data + chunk_size * (chunk - 1), chunk_size);
1526     } else {
1527         make_parity_block(data, rval, chunk_size, chunks);
1528     }
1529     
1530     return rval;
1531 }
1532
1533 static gboolean 
1534 rait_device_write_block (Device * dself, guint size, gpointer data) {
1535     GPtrArray * ops;
1536     guint i;
1537     gboolean success;
1538     guint data_children, num_children;
1539     gsize blocksize = dself->block_size;
1540     RaitDevice * self;
1541     gboolean last_block = (size < blocksize);
1542
1543     self = RAIT_DEVICE(dself);
1544
1545     if (rait_device_in_error(self)) return FALSE;
1546     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1547
1548     find_simple_params(RAIT_DEVICE(self), &num_children, &data_children);
1549     num_children = self->private->children->len;
1550     if (num_children != 1)
1551         data_children = num_children - 1;
1552     else
1553         data_children = num_children;
1554     
1555     g_assert(size % data_children == 0 || last_block);
1556
1557     /* zero out to the end of a short block -- tape devices only write
1558      * whole blocks. */
1559     if (last_block) {
1560         char *new_data;
1561
1562         new_data = g_malloc(blocksize);
1563         memcpy(new_data, data, size);
1564         bzero(new_data + size, blocksize - size);
1565
1566         data = new_data;
1567         size = blocksize;
1568     }
1569
1570     ops = g_ptr_array_sized_new(num_children);
1571     for (i = 0; i < self->private->children->len; i ++) {
1572         WriteBlockOp * op;
1573         op = g_malloc(sizeof(*op));
1574         op->base.child = g_ptr_array_index(self->private->children, i);
1575         op->size = size / data_children;
1576         if (num_children <= 2) {
1577             op->data = data;
1578             op->data_needs_free = FALSE;
1579         } else {
1580             op->data_needs_free = TRUE;
1581             op->data = extract_data_block(data, size, num_children, i + 1);
1582         }
1583         g_ptr_array_add(ops, op);
1584     }
1585
1586     do_rait_child_ops(self, write_block_do_op, ops);
1587
1588     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1589
1590     for (i = 0; i < self->private->children->len; i ++) {
1591         WriteBlockOp * op = g_ptr_array_index(ops, i);
1592         if (op->data_needs_free)
1593             free(op->data);
1594     }
1595
1596     if (last_block) {
1597         amfree(data);
1598     }
1599     
1600     g_ptr_array_free_full(ops);
1601
1602     if (!success) {
1603         /* TODO be more specific here */
1604         /* TODO: handle EOF here -- if one or more (or two or more??)
1605          * children have is_eof* set, then reflect that in our error
1606          * status, and finish_file all of the non-EOF children. What's
1607          * more fun is when one device fails and must be isolated at
1608          * the same time another hits EOF. */
1609         device_set_error(dself,
1610             stralloc("One or more devices failed to write_block"),
1611             DEVICE_STATUS_DEVICE_ERROR);
1612         return FALSE;
1613     } else {
1614         dself->block ++;
1615
1616         return TRUE;
1617     }
1618 }
1619
1620 /* A GFunc */
1621 static void finish_file_do_op(gpointer data,
1622                               gpointer user_data G_GNUC_UNUSED) {
1623     GenericOp * op = data;
1624     if (op->child) {
1625         op->result = GINT_TO_POINTER(device_finish_file(op->child));
1626     } else {
1627         op->result = FALSE;
1628     }
1629 }
1630
1631 static gboolean 
1632 rait_device_finish_file (Device * dself) {
1633     GPtrArray * ops;
1634     gboolean success;
1635     RaitDevice * self = RAIT_DEVICE(dself);
1636
1637     g_assert(self != NULL);
1638     if (rait_device_in_error(dself)) return FALSE;
1639     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1640
1641     ops = make_generic_boolean_op_array(self);
1642     
1643     do_rait_child_ops(self, finish_file_do_op, ops);
1644
1645     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1646
1647     g_ptr_array_free_full(ops);
1648
1649     if (!success) {
1650         /* TODO: be more specific here */
1651         device_set_error(dself,
1652                          g_strdup("One or more devices failed to finish_file"),
1653             DEVICE_STATUS_DEVICE_ERROR);
1654         return FALSE;
1655     }
1656
1657     dself->in_file = FALSE;
1658     return TRUE;
1659 }
1660
1661 typedef struct {
1662     GenericOp base;
1663     guint requested_file;                /* IN */
1664     guint actual_file;                   /* OUT */
1665 } SeekFileOp;
1666
1667 /* a GFunc. */
1668 static void seek_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1669     SeekFileOp * op = data;
1670     op->base.result = device_seek_file(op->base.child, op->requested_file);
1671     op->actual_file = op->base.child->file;
1672 }
1673
1674 static dumpfile_t * 
1675 rait_device_seek_file (Device * dself, guint file) {
1676     GPtrArray * ops;
1677     guint i;
1678     gboolean success;
1679     dumpfile_t * rval;
1680     RaitDevice * self = RAIT_DEVICE(dself);
1681     guint actual_file = 0;
1682     gboolean in_file = FALSE;
1683
1684     if (rait_device_in_error(self)) return NULL;
1685
1686     dself->in_file = FALSE;
1687     dself->is_eof = FALSE;
1688     dself->block = 0;
1689
1690     ops = g_ptr_array_sized_new(self->private->children->len);
1691     for (i = 0; i < self->private->children->len; i ++) {
1692         SeekFileOp * op;
1693         if ((int)i == self->private->failed)
1694             continue; /* This device is broken. */
1695         op = g_new(SeekFileOp, 1);
1696         op->base.child = g_ptr_array_index(self->private->children, i);
1697         op->base.child_index = i;
1698         op->requested_file = file;
1699         g_ptr_array_add(ops, op);
1700     }
1701     
1702     do_rait_child_ops(self, seek_file_do_op, ops);
1703
1704     /* This checks for NULL values, but we still have to check for
1705        consistant headers. */
1706     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1707                                        ops, extract_boolean_pointer_op);
1708
1709     rval = NULL;
1710     for (i = 0; i < ops->len; i ++) {
1711         SeekFileOp * this_op;
1712         dumpfile_t * this_result;
1713         guint this_actual_file;
1714         gboolean this_in_file;
1715         
1716         this_op = (SeekFileOp*)g_ptr_array_index(ops, i);
1717
1718         if ((signed)this_op->base.child_index == self->private->failed)
1719             continue;
1720
1721         this_result = this_op->base.result;
1722         this_actual_file = this_op->actual_file;
1723         this_in_file = this_op->base.child->in_file;
1724
1725         if (rval == NULL) {
1726             rval = this_result;
1727             actual_file = this_actual_file;
1728             in_file = this_in_file;
1729         } else {
1730             if (headers_are_equal(rval, this_result) &&
1731                 actual_file == this_actual_file &&
1732                 in_file == this_in_file) {
1733                 /* Do nothing. */
1734             } else {
1735                 success = FALSE;
1736             }
1737             free(this_result);
1738         }
1739     }
1740
1741     g_ptr_array_free_full(ops);
1742
1743     if (!success) {
1744         amfree(rval);
1745         /* TODO: be more specific here */
1746         device_set_error(dself,
1747                          g_strdup("One or more devices failed to seek_file"),
1748             DEVICE_STATUS_DEVICE_ERROR);
1749         return NULL;
1750     }
1751
1752     /* update our state */
1753     dself->in_file = in_file;
1754     dself->file = actual_file;
1755
1756     return rval;
1757 }
1758
1759 typedef struct {
1760     GenericOp base;
1761     guint64 block; /* IN */
1762 } SeekBlockOp;
1763
1764 /* a GFunc. */
1765 static void seek_block_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1766     SeekBlockOp * op = data;
1767     op->base.result =
1768         GINT_TO_POINTER(device_seek_block(op->base.child, op->block));
1769 }
1770
1771 static gboolean 
1772 rait_device_seek_block (Device * dself, guint64 block) {
1773     GPtrArray * ops;
1774     guint i;
1775     gboolean success;
1776
1777     RaitDevice * self = RAIT_DEVICE(dself);
1778
1779     if (rait_device_in_error(self)) return FALSE;
1780
1781     ops = g_ptr_array_sized_new(self->private->children->len);
1782     for (i = 0; i < self->private->children->len; i ++) {
1783         SeekBlockOp * op;
1784         if ((int)i == self->private->failed)
1785             continue; /* This device is broken. */
1786         op = g_new(SeekBlockOp, 1);
1787         op->base.child = g_ptr_array_index(self->private->children, i);
1788         op->base.child_index = i;
1789         op->block = block;
1790         g_ptr_array_add(ops, op);
1791     }
1792     
1793     do_rait_child_ops(self, seek_block_do_op, ops);
1794
1795     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1796                                        ops, extract_boolean_generic_op);
1797
1798     g_ptr_array_free_full(ops);
1799
1800     if (!success) {
1801         /* TODO: be more specific here */
1802         device_set_error(dself,
1803             stralloc("One or more devices failed to seek_block"),
1804             DEVICE_STATUS_DEVICE_ERROR);
1805         return FALSE;
1806     }
1807
1808     dself->block = block;
1809     return TRUE;
1810 }
1811
1812 typedef struct {
1813     GenericOp base;
1814     gpointer buffer; /* IN */
1815     int read_size;      /* IN/OUT -- note not a pointer */
1816     int desired_read_size; /* bookkeeping */
1817 } ReadBlockOp;
1818
1819 /* a GFunc. */
1820 static void read_block_do_op(gpointer data,
1821                              gpointer user_data G_GNUC_UNUSED) {
1822     ReadBlockOp * op = data;
1823     op->base.result =
1824         GINT_TO_POINTER(device_read_block(op->base.child, op->buffer,
1825                                           &(op->read_size)));
1826     if (op->read_size > op->desired_read_size) {
1827         g_warning("child device %s tried to return an oversized block, which the RAIT device does not support",
1828                   op->base.child->device_name);
1829     }
1830 }
1831
1832 /* A BooleanExtractor. This one checks for a successful read. */
1833 static gboolean extract_boolean_read_block_op_data(gpointer data) {
1834     ReadBlockOp * op = data;
1835     return GPOINTER_TO_INT(op->base.result) == op->desired_read_size;
1836 }
1837
1838 /* A BooleanExtractor. This one checks for EOF. */
1839 static gboolean extract_boolean_read_block_op_eof(gpointer data) {
1840     ReadBlockOp * op = data;
1841     return op->base.child->is_eof;
1842 }
1843
1844 /* Counts the number of elements in an array matching a given proposition. */
1845 static int g_ptr_array_count(GPtrArray * array, BooleanExtractor filter) {
1846     int rval;
1847     unsigned int i;
1848     rval = 0;
1849     for (i = 0; i < array->len ; i++) {
1850         if (filter(g_ptr_array_index(array, i)))
1851             rval ++;
1852     }
1853     return rval;
1854 }
1855
1856 static gboolean raid_block_reconstruction(RaitDevice * self, GPtrArray * ops,
1857                                       gpointer buf, size_t bufsize) {
1858     guint num_children, data_children;
1859     gsize blocksize;
1860     gsize child_blocksize;
1861     guint i;
1862     int parity_child;
1863     gpointer parity_block = NULL;
1864     gboolean success;
1865
1866     success = TRUE;
1867
1868     blocksize = DEVICE(self)->block_size;
1869     find_simple_params(self, &num_children, &data_children);
1870
1871     if (num_children > 1)
1872         parity_child = num_children - 1;
1873     else
1874         parity_child = -1;
1875
1876     child_blocksize = blocksize / data_children;
1877
1878     for (i = 0; i < ops->len; i ++) {
1879         ReadBlockOp * op = g_ptr_array_index(ops, i);
1880         if (!extract_boolean_read_block_op_data(op))
1881             continue;
1882         if ((int)(op->base.child_index) == parity_child) {
1883             parity_block = op->buffer;
1884         } else {
1885             g_assert(child_blocksize * (op->base.child_index+1) <= bufsize);
1886             memcpy((char *)buf + child_blocksize * op->base.child_index, op->buffer,
1887                    child_blocksize);
1888         }
1889     }
1890     g_assert(parity_block != NULL); /* should have found parity_child */
1891
1892     if (self->private->status == RAIT_STATUS_COMPLETE) {
1893         if (num_children >= 2) {
1894             /* Verify the parity block. This code is inefficient but
1895                does the job for the 2-device case, too. */
1896             gpointer constructed_parity;
1897             GPtrArray * data_extents;
1898             
1899             constructed_parity = g_malloc(child_blocksize);
1900             data_extents = g_ptr_array_sized_new(data_children);
1901             for (i = 0; i < data_children; i ++) {
1902                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1903                 g_assert(extract_boolean_read_block_op_data(op));
1904                 if ((int)op->base.child_index == parity_child)
1905                     continue;
1906                 g_ptr_array_add(data_extents, op->buffer);
1907             }
1908             make_parity_block_extents(data_extents, constructed_parity,
1909                                       child_blocksize);
1910             
1911             if (0 != memcmp(parity_block, constructed_parity,
1912                             child_blocksize)) {
1913                 device_set_error(DEVICE(self),
1914                     stralloc(_("RAIT is inconsistent: Parity block did not match data blocks.")),
1915                     DEVICE_STATUS_DEVICE_ERROR);
1916                 /* TODO: can't we just isolate the device in this case? */
1917                 success = FALSE;
1918             }
1919             g_ptr_array_free(data_extents, TRUE);
1920             amfree(constructed_parity);
1921         } else { /* do nothing. */ }
1922     } else if (self->private->status == RAIT_STATUS_DEGRADED) {
1923         g_assert(self->private->failed >= 0 && self->private->failed < (int)num_children);
1924         /* We are in degraded mode. What's missing? */
1925         if (self->private->failed == parity_child) {
1926             /* do nothing. */
1927         } else if (num_children >= 2) {
1928             /* Reconstruct failed block from parity block. */
1929             GPtrArray * data_extents = g_ptr_array_new();            
1930
1931             for (i = 0; i < data_children; i ++) {
1932                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1933                 if (!extract_boolean_read_block_op_data(op))
1934                     continue;
1935                 g_ptr_array_add(data_extents, op->buffer);
1936             }
1937
1938             /* Conveniently, the reconstruction is the same procedure
1939                as the parity generation. This even works if there is
1940                only one remaining device! */
1941             make_parity_block_extents(data_extents,
1942                                       (char *)buf + (child_blocksize *
1943                                              self->private->failed),
1944                                       child_blocksize);
1945
1946             /* The array members belong to our ops argument. */
1947             g_ptr_array_free(data_extents, TRUE);
1948         } else {
1949             g_assert_not_reached();
1950         }
1951     } else {
1952         /* device is already in FAILED state -- we shouldn't even be here */
1953         success = FALSE;
1954     }
1955     return success;
1956 }
1957
1958 static int
1959 rait_device_read_block (Device * dself, gpointer buf, int * size) {
1960     GPtrArray * ops;
1961     guint i;
1962     gboolean success;
1963     guint num_children, data_children;
1964     gsize blocksize = dself->block_size;
1965     gsize child_blocksize;
1966
1967     RaitDevice * self = RAIT_DEVICE(dself);
1968
1969     if (rait_device_in_error(self)) return -1;
1970
1971     find_simple_params(self, &num_children, &data_children);
1972
1973     /* tell caller they haven't given us a big enough buffer */
1974     if (blocksize > (gsize)*size) {
1975         g_assert(blocksize < INT_MAX);
1976         *size = (int)blocksize;
1977         return 0;
1978     }
1979
1980     g_assert(blocksize % data_children == 0); /* see find_block_size */
1981     child_blocksize = blocksize / data_children;
1982
1983     ops = g_ptr_array_sized_new(num_children);
1984     for (i = 0; i < num_children; i ++) {
1985         ReadBlockOp * op;
1986         if ((int)i == self->private->failed)
1987             continue; /* This device is broken. */
1988         op = g_new(ReadBlockOp, 1);
1989         op->base.child = g_ptr_array_index(self->private->children, i);
1990         op->base.child_index = i;
1991         op->buffer = g_malloc(child_blocksize);
1992         op->desired_read_size = op->read_size = child_blocksize;
1993         g_ptr_array_add(ops, op);
1994     }
1995     
1996     do_rait_child_ops(self, read_block_do_op, ops);
1997
1998     if (g_ptr_array_count(ops, extract_boolean_read_block_op_data)) {
1999         if (!g_ptr_array_union_robust(RAIT_DEVICE(self),
2000                                      ops,
2001                                      extract_boolean_read_block_op_data)) {
2002             /* TODO: be more specific */
2003             device_set_error(dself,
2004                 stralloc(_("Error occurred combining blocks from child devices")),
2005                 DEVICE_STATUS_DEVICE_ERROR);
2006             success = FALSE;
2007         } else {
2008             /* raid_block_reconstruction sets the error status if necessary */
2009             success = raid_block_reconstruction(RAIT_DEVICE(self),
2010                                                 ops, buf, (size_t)*size);
2011         }
2012     } else {
2013         success = FALSE;
2014         if (g_ptr_array_union_robust(RAIT_DEVICE(self),
2015                                      ops,
2016                                      extract_boolean_read_block_op_eof)) {
2017             device_set_error(dself,
2018                 stralloc(_("EOF")),
2019                 DEVICE_STATUS_SUCCESS);
2020             dself->is_eof = TRUE;
2021             dself->in_file = FALSE;
2022         } else {
2023             device_set_error(dself,
2024                 stralloc(_("All child devices failed to read, but not all are at eof")),
2025                 DEVICE_STATUS_DEVICE_ERROR);
2026         }
2027     }
2028
2029     for (i = 0; i < ops->len; i ++) {
2030         ReadBlockOp * op = g_ptr_array_index(ops, i);
2031         amfree(op->buffer);
2032     }
2033     g_ptr_array_free_full(ops);
2034
2035     if (success) {
2036         dself->block++;
2037         *size = blocksize;
2038         return blocksize;
2039     } else {
2040         return -1;
2041     }
2042 }
2043
2044 /* property utility functions */
2045
2046 typedef struct {
2047     GenericOp base;
2048     DevicePropertyId id;   /* IN */
2049     GValue value;          /* IN/OUT */
2050     PropertySurety surety; /* IN (for set) */
2051     PropertySource source; /* IN (for set) */
2052 } PropertyOp;
2053
2054 /* Creates a GPtrArray of PropertyOf for a get or set operation. */
2055 static GPtrArray * make_property_op_array(RaitDevice * self,
2056                                           DevicePropertyId id,
2057                                           GValue * value,
2058                                           PropertySurety surety,
2059                                           PropertySource source) {
2060     guint i;
2061     GPtrArray * ops;
2062     ops = g_ptr_array_sized_new(self->private->children->len);
2063     for (i = 0; i < self->private->children->len; i ++) {
2064         PropertyOp * op;
2065
2066         if ((signed)i == self->private->failed) {
2067             continue;
2068         }
2069
2070         op = g_new(PropertyOp, 1);
2071         op->base.child = g_ptr_array_index(self->private->children, i);
2072         op->id = id;
2073         bzero(&(op->value), sizeof(op->value));
2074         if (value != NULL) {
2075             g_value_unset_copy(value, &(op->value));
2076         }
2077         op->surety = surety;
2078         op->source = source;
2079         g_ptr_array_add(ops, op);
2080     }
2081
2082     return ops;
2083 }
2084
2085 /* A GFunc. */
2086 static void property_get_do_op(gpointer data,
2087                                gpointer user_data G_GNUC_UNUSED) {
2088     PropertyOp * op = data;
2089
2090     bzero(&(op->value), sizeof(op->value));
2091     op->base.result =
2092         GINT_TO_POINTER(device_property_get(op->base.child, op->id,
2093                                             &(op->value)));
2094 }
2095
2096 /* A GFunc. */
2097 static void property_set_do_op(gpointer data,
2098                                gpointer user_data G_GNUC_UNUSED) {
2099     PropertyOp * op = data;
2100
2101     op->base.result =
2102         GINT_TO_POINTER(device_property_set_ex(op->base.child, op->id,
2103                                                &(op->value), op->surety,
2104                                                op->source));
2105     g_value_unset(&(op->value));
2106 }
2107
2108 /* PropertyGetFns and PropertySetFns */
2109
2110 static gboolean
2111 property_get_block_size_fn(Device *dself,
2112     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2113     PropertySurety *surety, PropertySource *source)
2114 {
2115     RaitDevice *self = RAIT_DEVICE(dself);
2116     gsize my_block_size;
2117
2118     if (dself->block_size_source != PROPERTY_SOURCE_DEFAULT) {
2119         my_block_size = dself->block_size;
2120
2121         if (surety)
2122             *surety = dself->block_size_surety;
2123     } else {
2124         gsize child_block_size;
2125         child_block_size = calculate_block_size_from_children(self,
2126                                                     &my_block_size);
2127         if (child_block_size == 0)
2128             return FALSE;
2129
2130         if (surety)
2131             *surety = PROPERTY_SURETY_BAD; /* may still change */
2132     }
2133
2134     if (val) {
2135         g_value_unset_init(val, G_TYPE_INT);
2136         g_assert(my_block_size < G_MAXINT); /* gsize -> gint */
2137         g_value_set_int(val, (gint)my_block_size);
2138     }
2139
2140     if (source)
2141         *source = dself->block_size_source;
2142
2143     return TRUE;
2144 }
2145
2146 static gboolean
2147 property_set_block_size_fn(Device *dself,
2148     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2149     PropertySurety surety, PropertySource source)
2150 {
2151     RaitDevice *self = RAIT_DEVICE(dself);
2152     gint my_block_size = g_value_get_int(val);
2153     guint data_children;
2154
2155     find_simple_params(self, NULL, &data_children);
2156     if ((my_block_size % data_children) != 0) {
2157         device_set_error(dself,
2158             vstrallocf(_("Block size must be a multiple of %d"), data_children),
2159             DEVICE_STATUS_DEVICE_ERROR);
2160         return FALSE;
2161     }
2162
2163     dself->block_size = my_block_size;
2164     dself->block_size_source = source;
2165     dself->block_size_surety = surety;
2166
2167     if (!fix_block_size(self))
2168         return FALSE;
2169
2170     return TRUE;
2171 }
2172
2173 static gboolean
2174 property_get_canonical_name_fn(Device *dself,
2175     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2176     PropertySurety *surety, PropertySource *source)
2177 {
2178     RaitDevice *self = RAIT_DEVICE(dself);
2179     char *canonical = child_device_names_to_rait_name(self);
2180
2181     if (val) {
2182         g_value_unset_init(val, G_TYPE_STRING);
2183         g_value_set_string(val, canonical);
2184         g_free(canonical);
2185     }
2186
2187     if (surety)
2188         *surety = PROPERTY_SURETY_GOOD;
2189
2190     if (source)
2191         *source = PROPERTY_SOURCE_DETECTED;
2192
2193     return TRUE;
2194 }
2195
2196 static gboolean
2197 property_get_concurrency_fn(Device *dself,
2198     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2199     PropertySurety *surety, PropertySource *source)
2200 {
2201     RaitDevice *self = RAIT_DEVICE(dself);
2202     ConcurrencyParadigm result;
2203     guint i;
2204     GPtrArray * ops;
2205     gboolean success;
2206
2207     ops = make_property_op_array(self, PROPERTY_CONCURRENCY, NULL, 0, 0);
2208     do_rait_child_ops(self, property_get_do_op, ops);
2209
2210     /* find the most restrictive paradigm acceptable to all
2211      * child devices */
2212     result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
2213     success = TRUE;
2214     for (i = 0; i < ops->len; i ++) {
2215         ConcurrencyParadigm cur;
2216         PropertyOp * op = g_ptr_array_index(ops, i);
2217
2218         if (!op->base.result
2219             || G_VALUE_TYPE(&(op->value)) != CONCURRENCY_PARADIGM_TYPE) {
2220             success = FALSE;
2221             break;
2222         }
2223
2224         cur = g_value_get_enum(&(op->value));
2225         if (result == CONCURRENCY_PARADIGM_EXCLUSIVE ||
2226             cur == CONCURRENCY_PARADIGM_EXCLUSIVE) {
2227             result = CONCURRENCY_PARADIGM_EXCLUSIVE;
2228         } else if (result == CONCURRENCY_PARADIGM_SHARED_READ ||
2229                    cur == CONCURRENCY_PARADIGM_SHARED_READ) {
2230             result = CONCURRENCY_PARADIGM_SHARED_READ;
2231         } else if (result == CONCURRENCY_PARADIGM_RANDOM_ACCESS &&
2232                    cur == CONCURRENCY_PARADIGM_RANDOM_ACCESS) {
2233             result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
2234         } else {
2235             success = FALSE;
2236             break;
2237         }
2238     }
2239
2240     g_ptr_array_free_full(ops);
2241
2242     if (success) {
2243         if (val) {
2244             g_value_unset_init(val, CONCURRENCY_PARADIGM_TYPE);
2245             g_value_set_enum(val, result);
2246         }
2247
2248         if (surety)
2249             *surety = PROPERTY_SURETY_GOOD;
2250
2251         if (source)
2252             *source = PROPERTY_SOURCE_DETECTED;
2253     }
2254
2255     return success;
2256 }
2257
2258 static gboolean
2259 property_get_streaming_fn(Device *dself,
2260     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2261     PropertySurety *surety, PropertySource *source)
2262 {
2263     RaitDevice *self = RAIT_DEVICE(dself);
2264     StreamingRequirement result;
2265     guint i;
2266     GPtrArray * ops;
2267     gboolean success;
2268
2269     ops = make_property_op_array(self, PROPERTY_STREAMING, NULL, 0, 0);
2270     do_rait_child_ops(self, property_get_do_op, ops);
2271
2272     /* combine the child streaming requirements, selecting the strongest
2273      * requirement of the bunch. */
2274     result = STREAMING_REQUIREMENT_NONE;
2275     success = TRUE;
2276     for (i = 0; i < ops->len; i ++) {
2277         StreamingRequirement cur;
2278         PropertyOp * op = g_ptr_array_index(ops, i);
2279
2280         if (!op->base.result
2281             || G_VALUE_TYPE(&(op->value)) != STREAMING_REQUIREMENT_TYPE) {
2282             success = FALSE;
2283             break;
2284         }
2285
2286         cur = g_value_get_enum(&(op->value));
2287         if (result == STREAMING_REQUIREMENT_REQUIRED ||
2288             cur == STREAMING_REQUIREMENT_REQUIRED) {
2289             result = STREAMING_REQUIREMENT_REQUIRED;
2290         } else if (result == STREAMING_REQUIREMENT_DESIRED ||
2291                    cur == STREAMING_REQUIREMENT_DESIRED) {
2292             result = STREAMING_REQUIREMENT_DESIRED;
2293         } else if (result == STREAMING_REQUIREMENT_NONE &&
2294                    cur == STREAMING_REQUIREMENT_NONE) {
2295             result = STREAMING_REQUIREMENT_NONE;
2296         } else {
2297             success = FALSE;
2298             break;
2299         }
2300     }
2301
2302     g_ptr_array_free_full(ops);
2303
2304     if (success) {
2305         if (val) {
2306             g_value_unset_init(val, STREAMING_REQUIREMENT_TYPE);
2307             g_value_set_enum(val, result);
2308         }
2309
2310         if (surety)
2311             *surety = PROPERTY_SURETY_GOOD;
2312
2313         if (source)
2314             *source = PROPERTY_SOURCE_DETECTED;
2315     }
2316
2317     return success;
2318 }
2319
2320 static gboolean
2321 property_get_boolean_and_fn(Device *dself,
2322     DevicePropertyBase *base, GValue *val,
2323     PropertySurety *surety, PropertySource *source)
2324 {
2325     RaitDevice *self = RAIT_DEVICE(dself);
2326     gboolean result;
2327     guint i;
2328     GPtrArray * ops;
2329     gboolean success;
2330
2331     ops = make_property_op_array(self, base->ID, NULL, 0, 0);
2332     do_rait_child_ops(self, property_get_do_op, ops);
2333
2334     /* combine the child values, applying a simple AND */
2335     result = TRUE;
2336     success = TRUE;
2337     for (i = 0; i < ops->len; i ++) {
2338         PropertyOp * op = g_ptr_array_index(ops, i);
2339
2340         if (!op->base.result || !G_VALUE_HOLDS_BOOLEAN(&(op->value))) {
2341             success = FALSE;
2342             break;
2343         }
2344
2345         if (!g_value_get_boolean(&(op->value))) {
2346             result = FALSE;
2347             break;
2348         }
2349     }
2350
2351     g_ptr_array_free_full(ops);
2352
2353     if (success) {
2354         if (val) {
2355             g_value_unset_init(val, G_TYPE_BOOLEAN);
2356             g_value_set_boolean(val, result);
2357         }
2358
2359         if (surety)
2360             *surety = PROPERTY_SURETY_GOOD;
2361
2362         if (source)
2363             *source = PROPERTY_SOURCE_DETECTED;
2364     }
2365
2366     return success;
2367 }
2368
2369 static gboolean
2370 property_get_medium_access_type_fn(Device *dself,
2371     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2372     PropertySurety *surety, PropertySource *source)
2373 {
2374     RaitDevice *self = RAIT_DEVICE(dself);
2375     MediaAccessMode result;
2376     guint i;
2377     GPtrArray * ops;
2378     gboolean success;
2379
2380     ops = make_property_op_array(self, PROPERTY_MEDIUM_ACCESS_TYPE, NULL, 0, 0);
2381     do_rait_child_ops(self, property_get_do_op, ops);
2382
2383     /* combine the modes as best we can */
2384     result = 0;
2385     success = TRUE;
2386     for (i = 0; i < ops->len; i ++) {
2387         MediaAccessMode cur;
2388         PropertyOp * op = g_ptr_array_index(ops, i);
2389
2390         if (!op->base.result || G_VALUE_TYPE(&(op->value)) != MEDIA_ACCESS_MODE_TYPE) {
2391             success = FALSE;
2392             break;
2393         }
2394
2395         cur = g_value_get_enum(&(op->value));
2396
2397         if (i == 0) {
2398             result = cur;
2399         } else if ((result == MEDIA_ACCESS_MODE_READ_ONLY &&
2400                     cur == MEDIA_ACCESS_MODE_WRITE_ONLY) ||
2401                    (result == MEDIA_ACCESS_MODE_WRITE_ONLY &&
2402                     cur == MEDIA_ACCESS_MODE_READ_ONLY)) {
2403             /* Invalid combination; one device can only read, other
2404                can only write. */
2405             success = FALSE;
2406             break;
2407         } else if (result == MEDIA_ACCESS_MODE_READ_ONLY ||
2408                    cur == MEDIA_ACCESS_MODE_READ_ONLY) {
2409             result = MEDIA_ACCESS_MODE_READ_ONLY;
2410         } else if (result == MEDIA_ACCESS_MODE_WRITE_ONLY ||
2411                    cur == MEDIA_ACCESS_MODE_WRITE_ONLY) {
2412             result = MEDIA_ACCESS_MODE_WRITE_ONLY;
2413         } else if (result == MEDIA_ACCESS_MODE_WORM ||
2414                    cur == MEDIA_ACCESS_MODE_WORM) {
2415             result = MEDIA_ACCESS_MODE_WORM;
2416         } else if (result == MEDIA_ACCESS_MODE_READ_WRITE &&
2417                    cur == MEDIA_ACCESS_MODE_READ_WRITE) {
2418             result = MEDIA_ACCESS_MODE_READ_WRITE;
2419         } else {
2420             success = FALSE;
2421             break;
2422         }
2423     }
2424
2425     g_ptr_array_free_full(ops);
2426
2427     if (success) {
2428         if (val) {
2429             g_value_unset_init(val, MEDIA_ACCESS_MODE_TYPE);
2430             g_value_set_enum(val, result);
2431         }
2432
2433         if (surety)
2434             *surety = PROPERTY_SURETY_GOOD;
2435
2436         if (source)
2437             *source = PROPERTY_SOURCE_DETECTED;
2438     }
2439
2440     return success;
2441 }
2442
2443 static gboolean
2444 property_get_free_space_fn(Device *dself,
2445     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2446     PropertySurety *surety, PropertySource *source)
2447 {
2448     RaitDevice *self = RAIT_DEVICE(dself);
2449     QualifiedSize result;
2450     guint i;
2451     GPtrArray * ops;
2452     guint data_children;
2453
2454     ops = make_property_op_array(self, PROPERTY_MEDIUM_ACCESS_TYPE, NULL, 0, 0);
2455     do_rait_child_ops(self, property_get_do_op, ops);
2456
2457     /* Find the minimal available space of any child, with some funny business
2458      * to deal with varying degrees of accuracy. */
2459     result.accuracy = SIZE_ACCURACY_UNKNOWN;
2460     result.bytes = 0;
2461     for (i = 0; i < ops->len; i ++) {
2462         QualifiedSize cur;
2463         PropertyOp * op = g_ptr_array_index(ops, i);
2464
2465         if (!op->base.result || G_VALUE_TYPE(&(op->value)) != QUALIFIED_SIZE_TYPE) {
2466             /* maybe this child can't tell us .. so this is just an estimate */
2467             if (result.accuracy == SIZE_ACCURACY_REAL)
2468                 result.accuracy = SIZE_ACCURACY_ESTIMATE;
2469
2470             continue;
2471         }
2472
2473         cur = *(QualifiedSize*)(g_value_get_boxed(&(op->value)));
2474
2475         if (result.accuracy != cur.accuracy) {
2476             result.accuracy = SIZE_ACCURACY_ESTIMATE;
2477         }
2478
2479         if (result.accuracy == SIZE_ACCURACY_UNKNOWN &&
2480             cur.accuracy != SIZE_ACCURACY_UNKNOWN) {
2481             result.bytes = cur.bytes;
2482         } else if (result.accuracy != SIZE_ACCURACY_UNKNOWN &&
2483                    cur.accuracy == SIZE_ACCURACY_UNKNOWN) {
2484             /* result.bytes unchanged. */
2485         } else {
2486             result.bytes = MIN(result.bytes, cur.bytes);
2487         }
2488     }
2489
2490     g_ptr_array_free_full(ops);
2491
2492     /* result contains the minimum size available on any child.  We
2493      * can use that space on each of our data children, so the total
2494      * is larger */
2495     find_simple_params(self, NULL, &data_children);
2496     result.bytes *= data_children;
2497
2498     if (val) {
2499         g_value_unset_init(val, QUALIFIED_SIZE_TYPE);
2500         g_value_set_boxed(val, &result);
2501     }
2502
2503     if (surety)
2504         *surety = (result.accuracy == SIZE_ACCURACY_UNKNOWN)?
2505                     PROPERTY_SURETY_BAD : PROPERTY_SURETY_GOOD;
2506
2507     if (source)
2508         *source = PROPERTY_SOURCE_DETECTED;
2509
2510     return TRUE;
2511 }
2512
2513 static gboolean
2514 property_get_max_volume_usage_fn(Device *dself,
2515     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2516     PropertySurety *surety, PropertySource *source)
2517 {
2518     RaitDevice *self = RAIT_DEVICE(dself);
2519     guint64 result;
2520     guint i;
2521     GPtrArray * ops;
2522     guint data_children;
2523
2524     ops = make_property_op_array(self, PROPERTY_MAX_VOLUME_USAGE, NULL, 0, 0);
2525     do_rait_child_ops(self, property_get_do_op, ops);
2526
2527     /* look for the smallest value that is set */
2528     result = 0;
2529     for (i = 0; i < ops->len; i ++) {
2530         guint64 cur;
2531         PropertyOp * op = g_ptr_array_index(ops, i);
2532
2533         if (!op->base.result || !G_VALUE_HOLDS_UINT64(&(op->value))) {
2534             continue; /* ignore children without this property */
2535         }
2536
2537         cur = g_value_get_uint64(&(op->value));
2538
2539         result = MIN(cur, result);
2540     }
2541
2542     g_ptr_array_free_full(ops);
2543
2544     if (result) {
2545         /* result contains the minimum usage on any child.  We can use that space
2546          * on each of our data children, so the total is larger */
2547         find_simple_params(self, NULL, &data_children);
2548         result *= data_children;
2549
2550         if (val) {
2551             g_value_unset_init(val, G_TYPE_UINT64);
2552             g_value_set_uint64(val, result);
2553         }
2554
2555         if (surety)
2556             *surety = PROPERTY_SURETY_GOOD;
2557
2558         if (source)
2559             *source = PROPERTY_SOURCE_DETECTED;
2560
2561         return TRUE;
2562     } else {
2563         /* no result from any children, so we effectively don't have this property */
2564         return FALSE;
2565     }
2566 }
2567
2568 static gboolean
2569 property_set_max_volume_usage_fn(Device *dself,
2570     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2571     PropertySurety surety, PropertySource source)
2572 {
2573     RaitDevice *self = RAIT_DEVICE(dself);
2574     guint64 parent_usage;
2575     guint64 child_usage;
2576     GValue child_val;
2577     guint i;
2578     gboolean success;
2579     GPtrArray * ops;
2580     guint data_children;
2581
2582     parent_usage = g_value_get_uint64(val);
2583     find_simple_params(self, NULL, &data_children);
2584
2585     child_usage = parent_usage / data_children;
2586
2587     bzero(&child_val, sizeof(child_val));
2588     g_value_init(&child_val, G_TYPE_UINT64);
2589     g_value_set_uint64(&child_val, child_usage);
2590
2591     ops = make_property_op_array(self, PROPERTY_MAX_VOLUME_USAGE,
2592                                 &child_val, surety, source);
2593     do_rait_child_ops(self, property_set_do_op, ops);
2594
2595     /* if any of the kids succeeded, then we did too */
2596     success = FALSE;
2597     for (i = 0; i < ops->len; i ++) {
2598         PropertyOp * op = g_ptr_array_index(ops, i);
2599
2600         if (op->base.result) {
2601             success = TRUE;
2602             break;
2603         }
2604     }
2605
2606     g_ptr_array_free_full(ops);
2607
2608     return success;
2609 }
2610
2611 typedef struct {
2612     GenericOp base;
2613     guint filenum;
2614 } RecycleFileOp;
2615
2616 /* A GFunc */
2617 static void recycle_file_do_op(gpointer data,
2618                                gpointer user_data G_GNUC_UNUSED) {
2619     RecycleFileOp * op = data;
2620     op->base.result =
2621         GINT_TO_POINTER(device_recycle_file(op->base.child, op->filenum));
2622 }
2623
2624 static gboolean 
2625 rait_device_recycle_file (Device * dself, guint filenum) {
2626     GPtrArray * ops;
2627     guint i;
2628     gboolean success;
2629
2630     RaitDevice * self = RAIT_DEVICE(dself);
2631
2632     if (rait_device_in_error(self)) return FALSE;
2633
2634     ops = g_ptr_array_sized_new(self->private->children->len);
2635     for (i = 0; i < self->private->children->len; i ++) {
2636         RecycleFileOp * op;
2637         op = g_new(RecycleFileOp, 1);
2638         op->base.child = g_ptr_array_index(self->private->children, i);
2639         op->filenum = filenum;
2640         g_ptr_array_add(ops, op);
2641     }
2642     
2643     do_rait_child_ops(self, recycle_file_do_op, ops);
2644
2645     success = g_ptr_array_and(ops, extract_boolean_generic_op);
2646
2647     g_ptr_array_free_full(ops);
2648
2649     if (!success) {
2650         /* TODO: be more specific here */
2651         device_set_error(dself,
2652             stralloc(_("One or more devices failed to recycle_file")),
2653             DEVICE_STATUS_DEVICE_ERROR);
2654         return FALSE;
2655     }
2656     return TRUE;
2657 }
2658
2659 /* GFunc */
2660 static void finish_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
2661     GenericOp * op = data;
2662     op->result = GINT_TO_POINTER(device_finish(op->child));
2663 }
2664
2665 static gboolean 
2666 rait_device_finish (Device * self) {
2667     GPtrArray * ops;
2668     gboolean success;
2669
2670     if (rait_device_in_error(self)) return FALSE;
2671
2672     ops = make_generic_boolean_op_array(RAIT_DEVICE(self));
2673     
2674     do_rait_child_ops(RAIT_DEVICE(self), finish_do_op, ops);
2675
2676     success = g_ptr_array_and(ops, extract_boolean_generic_op);
2677
2678     g_ptr_array_free_full(ops);
2679
2680     self->access_mode = ACCESS_NULL;
2681
2682     if (!success)
2683         return FALSE;
2684
2685     return TRUE;
2686 }
2687
2688 static Device *
2689 rait_device_factory (char * device_name, char * device_type, char * device_node) {
2690     Device * rval;
2691     g_assert(0 == strcmp(device_type, "rait"));
2692     rval = DEVICE(g_object_new(TYPE_RAIT_DEVICE, NULL));
2693     device_open_device(rval, device_name, device_type, device_node);
2694     return rval;
2695 }
2696
2697 void 
2698 rait_device_register (void) {
2699     static const char * device_prefix_list[] = {"rait", NULL};
2700     register_device(rait_device_factory, device_prefix_list);
2701 }