Imported Upstream version 2.6.1
[debian/amanda] / device-src / rait-device.c
1 /*
2  * Copyright (c) 2005-2008 Zmanda, Inc.  All Rights Reserved.
3  * 
4  * This library is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU Lesser General Public License version 2.1 as 
6  * published by the Free Software Foundation.
7  * 
8  * This library is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
11  * License for more details.
12  * 
13  * You should have received a copy of the GNU Lesser General Public License
14  * along with this library; if not, write to the Free Software Foundation,
15  * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
16  * 
17  * Contact information: Zmanda Inc., 465 S Mathlida Ave, Suite 300
18  * Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
19  */
20
21 /* The RAIT device encapsulates some number of other devices into a single
22  * redundant device. */
23
24 #include <amanda.h>
25 #include "property.h"
26 #include "util.h"
27 #include <glib.h>
28 #include "glib-util.h"
29 #include "device.h"
30 #include "fileheader.h"
31 #include "semaphore.h"
32
33 /* Just a note about the failure mode of different operations:
34    - Recovers from a failure (enters degraded mode)
35      open_device()
36      seek_file() -- explodes if headers don't match.
37      seek_block() -- explodes if headers don't match.
38      read_block() -- explodes if data doesn't match.
39
40    - Operates in degraded mode (but dies if a new problem shows up)
41      read_label() -- but dies on label mismatch.
42      start() -- but dies when writing in degraded mode.
43      property functions
44      finish()
45
46    - Dies in degraded mode (even if remaining devices are OK)
47      start_file()
48      write_block()
49      finish_file()
50      recycle_file()
51 */
52
53 /*
54  * Type checking and casting macros
55  */
56 #define TYPE_RAIT_DEVICE        (rait_device_get_type())
57 #define RAIT_DEVICE(obj)        G_TYPE_CHECK_INSTANCE_CAST((obj), rait_device_get_type(), RaitDevice)
58 #define RAIT_DEVICE_CONST(obj)  G_TYPE_CHECK_INSTANCE_CAST((obj), rait_device_get_type(), RaitDevice const)
59 #define RAIT_DEVICE_CLASS(klass)        G_TYPE_CHECK_CLASS_CAST((klass), rait_device_get_type(), RaitDeviceClass)
60 #define IS_RAIT_DEVICE(obj)     G_TYPE_CHECK_INSTANCE_TYPE((obj), rait_device_get_type ())
61
62 #define RAIT_DEVICE_GET_CLASS(obj)      G_TYPE_INSTANCE_GET_CLASS((obj), rait_device_get_type(), RaitDeviceClass)
63 static GType    rait_device_get_type    (void);
64
65 /*
66  * Main object structure
67  */
68 typedef struct RaitDevice_s {
69     Device __parent__;
70
71     struct RaitDevicePrivate_s * private;
72 } RaitDevice;
73
74 /*
75  * Class definition
76  */
77 typedef struct _RaitDeviceClass RaitDeviceClass;
78 struct _RaitDeviceClass {
79     DeviceClass __parent__;
80 };
81
82 typedef enum {
83     RAIT_STATUS_COMPLETE, /* All subdevices OK. */
84     RAIT_STATUS_DEGRADED, /* One subdevice failed. */
85     RAIT_STATUS_FAILED    /* Two or more subdevices failed. */
86 } RaitStatus;
87
88 /* Older versions of glib have a deadlock in their thread pool implementations,
89  * so we include a simple thread-pool implementation here to replace it.
90  *
91  * This implementation assumes that threads are used for paralellizing a single
92  * operation, so all threads run a function to completion before the main thread
93  * continues.  This simplifies some of the locking semantics, and in particular
94  * there is no need to wait for stray threads to finish an operation when
95  * finalizing the RaitDevice object or when beginning a new operation.
96  */
97 #if !(GLIB_CHECK_VERSION(2,10,0))
98 #define USE_INTERNAL_THREADPOOL
99 #endif
100
101 typedef struct RaitDevicePrivate_s {
102     GPtrArray * children;
103     /* These flags are only relevant for reading. */
104     RaitStatus status;
105     /* If status == RAIT_STATUS_DEGRADED, this holds the index of the
106        failed node. It holds a negative number otherwise. */
107     int failed;
108
109     /* the child block size */
110     gsize child_block_size;
111
112 #ifdef USE_INTERNAL_THREADPOOL
113     /* array of ThreadInfo for performing parallel operations */
114     GArray *threads;
115
116     /* value of this semaphore is the number of threaded operations
117      * in progress */
118     semaphore_t *threads_sem;
119 #endif
120 } RaitDevicePrivate;
121
122 #ifdef USE_INTERNAL_THREADPOOL
123 typedef struct ThreadInfo {
124     GThread *thread;
125
126     /* struct fields below are protected by this mutex and condition variable */
127     GMutex *mutex;
128     GCond *cond;
129
130     gboolean die;
131     GFunc func;
132     gpointer data;
133
134     /* give threads access to active_threads and its mutex/cond */
135     struct RaitDevicePrivate_s *private;
136 } ThreadInfo;
137 #endif
138
139
140 #define PRIVATE(o) (o->private)
141
142 #define rait_device_in_error(dev) \
143     (device_in_error((dev)) || PRIVATE(RAIT_DEVICE((dev)))->status == RAIT_STATUS_FAILED)
144
145 void rait_device_register (void);
146
147 /* here are local prototypes */
148 static void rait_device_init (RaitDevice * o);
149 static void rait_device_class_init (RaitDeviceClass * c);
150 static void rait_device_base_init (RaitDeviceClass * c);
151 static void rait_device_open_device (Device * self, char * device_name, char * device_type, char * device_node);
152 static gboolean rait_device_start (Device * self, DeviceAccessMode mode,
153                                    char * label, char * timestamp);
154 static gboolean rait_device_configure(Device * self, gboolean use_global_config);
155 static gboolean rait_device_start_file(Device * self, dumpfile_t * info);
156 static gboolean rait_device_write_block (Device * self, guint size, gpointer data);
157 static gboolean rait_device_finish_file (Device * self);
158 static dumpfile_t * rait_device_seek_file (Device * self, guint file);
159 static gboolean rait_device_seek_block (Device * self, guint64 block);
160 static int      rait_device_read_block (Device * self, gpointer buf,
161                                         int * size);
162 static gboolean rait_device_recycle_file (Device * self, guint filenum);
163 static gboolean rait_device_finish (Device * self);
164 static DeviceStatusFlags rait_device_read_label(Device * dself);
165 static void find_simple_params(RaitDevice * self, guint * num_children,
166                                guint * data_children);
167
168 /* property handlers */
169
170 static gboolean property_get_block_size_fn(Device *self,
171     DevicePropertyBase *base, GValue *val,
172     PropertySurety *surety, PropertySource *source);
173
174 static gboolean property_set_block_size_fn(Device *self,
175     DevicePropertyBase *base, GValue *val,
176     PropertySurety surety, PropertySource source);
177
178 static gboolean property_get_canonical_name_fn(Device *self,
179     DevicePropertyBase *base, GValue *val,
180     PropertySurety *surety, PropertySource *source);
181
182 static gboolean property_get_concurrency_fn(Device *self,
183     DevicePropertyBase *base, GValue *val,
184     PropertySurety *surety, PropertySource *source);
185
186 static gboolean property_get_streaming_fn(Device *self,
187     DevicePropertyBase *base, GValue *val,
188     PropertySurety *surety, PropertySource *source);
189
190 static gboolean property_get_boolean_and_fn(Device *self,
191     DevicePropertyBase *base, GValue *val,
192     PropertySurety *surety, PropertySource *source);
193
194 static gboolean property_get_medium_access_type_fn(Device *self,
195     DevicePropertyBase *base, GValue *val,
196     PropertySurety *surety, PropertySource *source);
197
198 static gboolean property_get_free_space_fn(Device *self,
199     DevicePropertyBase *base, GValue *val,
200     PropertySurety *surety, PropertySource *source);
201
202 static gboolean property_get_max_volume_usage_fn(Device *self,
203     DevicePropertyBase *base, GValue *val,
204     PropertySurety *surety, PropertySource *source);
205
206 static gboolean property_set_max_volume_usage_fn(Device *self,
207     DevicePropertyBase *base, GValue *val,
208     PropertySurety surety, PropertySource source);
209
210
211 /* pointer to the class of our parent */
212 static DeviceClass *parent_class = NULL;
213
214 static GType
215 rait_device_get_type (void)
216 {
217     static GType type = 0;
218
219     if G_UNLIKELY(type == 0) {
220         static const GTypeInfo info = {
221             sizeof (RaitDeviceClass),
222             (GBaseInitFunc) rait_device_base_init,
223             (GBaseFinalizeFunc) NULL,
224             (GClassInitFunc) rait_device_class_init,
225             (GClassFinalizeFunc) NULL,
226             NULL /* class_data */,
227             sizeof (RaitDevice),
228             0 /* n_preallocs */,
229             (GInstanceInitFunc) rait_device_init,
230             NULL
231         };
232         
233         type = g_type_register_static (TYPE_DEVICE, "RaitDevice", &info,
234                                        (GTypeFlags)0);
235         }
236     
237     return type;
238 }
239
240 static void g_object_unref_foreach(gpointer data,
241                                    gpointer user_data G_GNUC_UNUSED) {
242     if (data != NULL && G_IS_OBJECT(data)) {
243         g_object_unref(data);
244     }
245 }
246
247 static void
248 rait_device_finalize(GObject *obj_self)
249 {
250     RaitDevice *self = RAIT_DEVICE (obj_self);
251     if(G_OBJECT_CLASS(parent_class)->finalize) \
252            (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
253     if(self->private->children) {
254         g_ptr_array_foreach(self->private->children,
255                             g_object_unref_foreach, NULL);
256         g_ptr_array_free (self->private->children, TRUE);
257         self->private->children = NULL;
258     }
259 #ifdef USE_INTERNAL_THREADPOOL
260     g_assert(PRIVATE(self)->threads_sem == NULL || PRIVATE(self)->threads_sem->value == 0);
261
262     if (PRIVATE(self)->threads) {
263         guint i;
264
265         for (i = 0; i < PRIVATE(self)->threads->len; i++) {
266             ThreadInfo *inf = &g_array_index(PRIVATE(self)->threads, ThreadInfo, i);
267             if (inf->thread) {
268                 /* NOTE: the thread is waiting on this condition right now, not
269                  * executing an operation. */
270
271                 /* ask the thread to die */
272                 g_mutex_lock(inf->mutex);
273                 inf->die = TRUE;
274                 g_cond_signal(inf->cond);
275                 g_mutex_unlock(inf->mutex);
276
277                 /* and wait for it to die, which should happen soon */
278                 g_thread_join(inf->thread);
279             }
280
281             if (inf->mutex)
282                 g_mutex_free(inf->mutex);
283             if (inf->cond)
284                 g_cond_free(inf->cond);
285         }
286     }
287
288     if (PRIVATE(self)->threads_sem)
289         semaphore_free(PRIVATE(self)->threads_sem);
290 #endif
291     amfree(self->private);
292 }
293
294 static void 
295 rait_device_init (RaitDevice * o G_GNUC_UNUSED)
296 {
297     PRIVATE(o) = g_new(RaitDevicePrivate, 1);
298     PRIVATE(o)->children = g_ptr_array_new();
299     PRIVATE(o)->status = RAIT_STATUS_COMPLETE;
300     PRIVATE(o)->failed = -1;
301 #ifdef USE_INTERNAL_THREADPOOL
302     PRIVATE(o)->threads = NULL;
303     PRIVATE(o)->threads_sem = NULL;
304 #endif
305 }
306
307 static void 
308 rait_device_class_init (RaitDeviceClass * c)
309 {
310     GObjectClass *g_object_class = (GObjectClass*) c;
311     DeviceClass *device_class = (DeviceClass *)c;
312
313     parent_class = g_type_class_ref (TYPE_DEVICE);
314
315     device_class->open_device = rait_device_open_device;
316     device_class->configure = rait_device_configure;
317     device_class->start = rait_device_start;
318     device_class->start_file = rait_device_start_file;
319     device_class->write_block = rait_device_write_block;
320     device_class->finish_file = rait_device_finish_file;
321     device_class->seek_file = rait_device_seek_file;
322     device_class->seek_block = rait_device_seek_block;
323     device_class->read_block = rait_device_read_block;
324     device_class->recycle_file = rait_device_recycle_file; 
325     device_class->finish = rait_device_finish;
326     device_class->read_label = rait_device_read_label;
327
328     g_object_class->finalize = rait_device_finalize;
329
330 #ifndef USE_INTERNAL_THREADPOOL
331 #if !GLIB_CHECK_VERSION(2,10,2)
332     /* Versions of glib before 2.10.2 crash if
333      * g_thread_pool_set_max_unused_threads is called before the first
334      * invocation of g_thread_pool_new.  So we make up a thread pool, but don't
335      * start any threads in it, and free it */
336     {
337         GThreadPool *pool = g_thread_pool_new((GFunc)-1, NULL, -1, FALSE, NULL);
338         g_thread_pool_free(pool, TRUE, FALSE);
339     }
340 #endif
341
342     g_thread_pool_set_max_unused_threads(-1);
343 #endif
344 }
345
346 static void
347 rait_device_base_init (RaitDeviceClass * c)
348 {
349     DeviceClass *device_class = (DeviceClass *)c;
350
351     /* the RAIT device overrides most of the standard properties, so that it
352      * can calculate them by querying the same property on the children */
353     device_class_register_property(device_class, PROPERTY_BLOCK_SIZE,
354             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
355             property_get_block_size_fn,
356             property_set_block_size_fn);
357
358     device_class_register_property(device_class, PROPERTY_CANONICAL_NAME,
359             PROPERTY_ACCESS_GET_MASK,
360             property_get_canonical_name_fn, NULL);
361
362     device_class_register_property(device_class, PROPERTY_CONCURRENCY,
363             PROPERTY_ACCESS_GET_MASK,
364             property_get_concurrency_fn, NULL);
365
366     device_class_register_property(device_class, PROPERTY_STREAMING,
367             PROPERTY_ACCESS_GET_MASK,
368             property_get_streaming_fn, NULL);
369
370     device_class_register_property(device_class, PROPERTY_APPENDABLE,
371             PROPERTY_ACCESS_GET_MASK,
372             property_get_boolean_and_fn, NULL);
373
374     device_class_register_property(device_class, PROPERTY_PARTIAL_DELETION,
375             PROPERTY_ACCESS_GET_MASK,
376             property_get_boolean_and_fn, NULL);
377
378     device_class_register_property(device_class, PROPERTY_MEDIUM_ACCESS_TYPE,
379             PROPERTY_ACCESS_GET_MASK,
380             property_get_medium_access_type_fn, NULL);
381
382     device_class_register_property(device_class, PROPERTY_FREE_SPACE,
383             PROPERTY_ACCESS_GET_MASK,
384             property_get_free_space_fn, NULL);
385
386     device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
387             PROPERTY_ACCESS_GET_MASK,
388             property_get_max_volume_usage_fn,
389             property_set_max_volume_usage_fn);
390 }
391
392 /* This function does something a little clever and a little
393  * complicated. It takes an array of operations and runs the given
394  * function on each element in the array. The trick is that it runs them
395  * all in parallel, in different threads. This is more efficient than it
396  * sounds because we use a GThreadPool, which means calling this function
397  * will probably not start any new threads at all, but rather use
398  * existing ones. The func is called with two gpointer arguments: The
399  * first from the array, the second is the data argument.
400  * 
401  * When it returns, all the operations have been successfully
402  * executed. If you want results from your operations, do it yourself
403  * through the array.
404  */
405
406 #ifdef USE_INTERNAL_THREADPOOL
407 static gpointer rait_thread_pool_func(gpointer data) {
408     ThreadInfo *inf = data;
409
410     g_mutex_lock(inf->mutex);
411     while (TRUE) {
412         while (!inf->die && !inf->func)
413             g_cond_wait(inf->cond, inf->mutex);
414
415         if (inf->die)
416             break;
417
418         if (inf->func) {
419             /* invoke the function */
420             inf->func(inf->data, NULL);
421             inf->func = NULL;
422             inf->data = NULL;
423
424             /* indicate that we're finished; will not block */
425             semaphore_down(inf->private->threads_sem);
426         }
427     }
428     g_mutex_unlock(inf->mutex);
429     return NULL;
430 }
431
432 static void do_thread_pool_op(RaitDevice *self, GFunc func, GPtrArray * ops) {
433     guint i;
434
435     if (PRIVATE(self)->threads_sem == NULL)
436         PRIVATE(self)->threads_sem = semaphore_new_with_value(0);
437
438     if (PRIVATE(self)->threads == NULL)
439         PRIVATE(self)->threads = g_array_sized_new(FALSE, TRUE,
440                                             sizeof(ThreadInfo), ops->len);
441
442     g_assert(PRIVATE(self)->threads_sem->value == 0);
443
444     if (PRIVATE(self)->threads->len < ops->len)
445         g_array_set_size(PRIVATE(self)->threads, ops->len);
446
447     /* the semaphore will hit zero when each thread has decremented it */
448     semaphore_force_set(PRIVATE(self)->threads_sem, ops->len);
449
450     for (i = 0; i < ops->len; i++) {
451         ThreadInfo *inf = &g_array_index(PRIVATE(self)->threads, ThreadInfo, i);
452         if (!inf->thread) {
453             inf->mutex = g_mutex_new();
454             inf->cond = g_cond_new();
455             inf->private = PRIVATE(self);
456             inf->thread = g_thread_create(rait_thread_pool_func, inf, TRUE, NULL);
457         }
458
459         /* set up the info the thread needs and trigger it to start */
460         g_mutex_lock(inf->mutex);
461         inf->data = g_ptr_array_index(ops, i);
462         inf->func = func;
463         g_cond_signal(inf->cond);
464         g_mutex_unlock(inf->mutex);
465     }
466
467     /* wait until semaphore hits zero */
468     semaphore_wait_empty(PRIVATE(self)->threads_sem);
469 }
470
471 #else /* USE_INTERNAL_THREADPOOL */
472
473 static void do_thread_pool_op(RaitDevice *self G_GNUC_UNUSED, GFunc func, GPtrArray * ops) {
474     GThreadPool * pool;
475     guint i;
476
477     pool = g_thread_pool_new(func, NULL, -1, FALSE, NULL);
478     for (i = 0; i < ops->len; i ++) {
479         g_thread_pool_push(pool, g_ptr_array_index(ops, i), NULL);
480     }
481
482     g_thread_pool_free(pool, FALSE, TRUE);
483 }
484
485 #endif /* USE_INTERNAL_THREADPOOL */
486
487 /* This does the above, in a serial fashion (and without using threads) */
488 static void do_unthreaded_ops(RaitDevice *self G_GNUC_UNUSED, GFunc func, GPtrArray * ops) {
489     guint i;
490
491     for (i = 0; i < ops->len; i ++) {
492         func(g_ptr_array_index(ops, i), NULL);
493     }
494 }
495
496 /* This is the one that code below should call. It switches
497    automatically between do_thread_pool_op and do_unthreaded_ops,
498    depending on g_thread_supported(). */
499 static void do_rait_child_ops(RaitDevice *self, GFunc func, GPtrArray * ops) {
500     if (g_thread_supported()) {
501         do_thread_pool_op(self, func, ops);
502     } else {
503         do_unthreaded_ops(self, func, ops);
504     }
505 }
506
507 /* Helper for parse_device_name; returns a list of un-escaped strings for
508  * the first "component" of str, where a component is a plain string or a
509  * brace-enclosed set of alternatives.  str is pointing to the first character
510  * of the next component on return. */
511 static GPtrArray *
512 parse_device_name_component(char **str)
513 {
514     GPtrArray *result = g_ptr_array_new();
515
516     if (**str == '{') {
517         char *p = (*str)+1;
518         char *local = g_malloc(strlen(*str)+1);
519         char *current = local;
520         char *c = current;
521
522         while (1) {
523             if (*p == '\0' || *p == '{') {
524                 /* unterminated { .. } or extra '{' */
525                 amfree(local);
526                 g_ptr_array_free(result, TRUE);
527                 return NULL;
528             }
529
530             if (*p == '}' || *p == ',') {
531                 *c = '\0';
532                 g_ptr_array_add(result, g_strdup(current));
533                 current = ++c;
534
535                 if (*p == '}')
536                     break;
537                 else
538                     p++;
539             }
540
541             if (*p == '\\') {
542                 if (*(p+1) == '{' || *(p+1) == '}' || *(p+1) == '\\' || *(p+1) == ',')
543                     p++;
544             }
545             *(c++) = *(p++);
546         }
547
548         amfree(local);
549
550         if (*p)
551             *str = p+1;
552         else
553             *str = p;
554     } else {
555         /* no braces -- just un-escape a plain string */
556         char *local = g_malloc(strlen(*str)+1);
557         char *r = local;
558         char *p = *str;
559
560         while (*p && *p != '{') {
561             if (*p == '\\') {
562                 if (*(p+1) == '{' || *(p+1) == '}' || *(p+1) == '\\' || *(p+1) == ',')
563                     p++;
564             }
565             *(r++) = *(p++);
566         }
567         *r = '\0';
568         g_ptr_array_add(result, local);
569         *str = p;
570     }
571
572     return result;
573 }
574
575 /* Take a text string user_name, and break it out into an argv-style
576    array of strings, using a {..,..} syntax similar to shell brace expansion.
577    For example:
578
579      "{foo,bar,bat}" -> [ "foo", "bar", "bat" ]
580      "foo{1,2}bar" -> [ "foo1bar", "foo2bar" ]
581      "foo{1\,2,3}bar" -> [ "foo1,2bar", "foo3bar" ]
582      "{a,b}-{1,2}" -> [ "a-1", "a-2", "b-1", "b-2" ]
583
584    Note that nested braces are not processed.  Braces, commas, and backslashes
585    may be escaped with backslashes.  Returns NULL on invalid strings.
586    */
587
588 static GPtrArray *
589 parse_device_name(char * user_name)
590 {
591     GPtrArray *rval = g_ptr_array_new();
592
593     g_ptr_array_add(rval, g_strdup(""));
594
595     while (*user_name) {
596         GPtrArray *new_components;
597         GPtrArray *new_rval;
598         guint i, j;
599
600         new_components = parse_device_name_component(&user_name);
601         if (!new_components) {
602             /* parse error */
603             g_ptr_array_free(rval, TRUE);
604             return NULL;
605         }
606
607         new_rval = g_ptr_array_new();
608
609         /* do a cartesian join of rval and new_components */
610         for (i = 0; i < rval->len; i++) {
611             for (j = 0; j < new_components->len; j++) {
612                 g_ptr_array_add(new_rval, g_strconcat(
613                     g_ptr_array_index(rval, i),
614                     g_ptr_array_index(new_components, j),
615                     NULL));
616             }
617         }
618
619         g_ptr_array_free(rval, TRUE);
620         g_ptr_array_free(new_components, TRUE);
621         rval = new_rval;
622     }
623
624     return rval;
625 }
626
627 static char *
628 child_device_names_to_rait_name(RaitDevice * self) {
629     GString *rait_name = NULL;
630     guint i;
631
632     rait_name = g_string_new("rait:{");
633
634     for (i = 0; i < self->private->children->len; i ++) {
635         Device *child = g_ptr_array_index(self->private->children, i);
636         const char *child_name = NULL;
637         GValue val;
638         gboolean got_prop = FALSE;
639
640         bzero(&val, sizeof(val));
641
642         if ((signed)i != self->private->failed) {
643             if (device_property_get(child, PROPERTY_CANONICAL_NAME, &val)) {
644                 child_name = g_value_get_string(&val);
645                 got_prop = TRUE;
646             }
647         }
648
649         if (!got_prop)
650             child_name = "MISSING";
651
652         g_string_append_printf(rait_name, "%s%s", child_name,
653                 (i < self->private->children->len-1)? "," : "");
654
655         if (got_prop)
656             g_value_unset(&val);
657     }
658
659     g_string_append(rait_name, "}");
660     return g_string_free(rait_name, FALSE);
661 }
662
663 /* Find a workable child block size, based on the block size ranges of our
664  * child devices.
665  *
666  * The algorithm is to construct the intersection of all child devices'
667  * [min,max] block size ranges, and then pick the block size closest to 32k
668  * that is in the resulting range.  This avoids picking ridiculously small (1
669  * byte) or large (INT_MAX) block sizes when using devices with wide-open block
670  * size ranges.
671
672  * This function returns the calculated child block size directly, and the RAIT
673  * device's blocksize via rait_size, if not NULL.  It is resilient to errors in
674  * a single child device, but sets the device's error status and returns 0 if
675  * it cannot determine an agreeable block size.
676  */
677 static gsize
678 calculate_block_size_from_children(RaitDevice * self, gsize *rait_size)
679 {
680     gsize min = 0;
681     gsize max = SIZE_MAX;
682     gboolean found_one = FALSE;
683     gsize result;
684     guint i;
685
686     for (i = 0; i < self->private->children->len; i ++) {
687         gsize child_min = SIZE_MAX, child_max = 0;
688         Device *child;
689         GValue property_result;
690         PropertySource source;
691
692         bzero(&property_result, sizeof(property_result));
693
694         if ((signed)i == self->private->failed)
695             continue;
696
697         child = g_ptr_array_index(self->private->children, i);
698         if (!device_property_get_ex(child, PROPERTY_BLOCK_SIZE,
699                                  &property_result, NULL, &source)) {
700             g_warning("Error getting BLOCK_SIZE from %s: %s",
701                     child->device_name, device_error_or_status(child));
702             continue;
703         }
704
705         /* if the block size has been set explicitly, then we need to use that blocksize;
706          * otherwise (even if it was DETECTED), override it. */
707         if (source == PROPERTY_SOURCE_USER) {
708             child_min = child_max = g_value_get_int(&property_result);
709         } else {
710             if (!device_property_get(child, PROPERTY_MIN_BLOCK_SIZE,
711                                      &property_result)) {
712                 g_warning("Error getting MIN_BLOCK_SIZE from %s: %s",
713                         child->device_name, device_error_or_status(child));
714                 continue;
715             }
716             child_min = g_value_get_uint(&property_result);
717
718             if (!device_property_get(child, PROPERTY_MAX_BLOCK_SIZE,
719                                      &property_result)) {
720                 g_warning("Error getting MAX_BLOCK_SIZE from %s: %s",
721                         child->device_name, device_error_or_status(child));
722                 continue;
723             }
724             child_max = g_value_get_uint(&property_result);
725
726             if (child_min == 0 || child_max == 0 || (child_min > child_max)) {
727                 g_warning("Invalid min, max block sizes from %s", child->device_name);
728                 continue;
729             }
730         }
731
732         found_one = TRUE;
733         min = MAX(min, child_min);
734         max = MIN(max, child_max);
735     }
736
737     if (!found_one) {
738         device_set_error((Device*)self,
739             stralloc(_("Could not find any child devices' block size ranges")),
740             DEVICE_STATUS_DEVICE_ERROR);
741         return 0;
742     }
743
744     if (min > max) {
745         device_set_error((Device*)self,
746             stralloc(_("No block size is acceptable to all child devices")),
747             DEVICE_STATUS_DEVICE_ERROR);
748         return 0;
749     }
750
751     /* Now pick a number.  If 32k is in range, we use that; otherwise, we use
752      * the nearest acceptable size. */
753     result = CLAMP(32768, min, max);
754
755     if (rait_size) {
756         guint data_children;
757         find_simple_params(self, NULL, &data_children);
758         *rait_size = result * data_children;
759     }
760
761     return result;
762 }
763
764 /* Set BLOCK_SIZE on all children */
765 static gboolean
766 set_block_size_on_children(RaitDevice *self, gsize child_block_size)
767 {
768     GValue val;
769     guint i;
770     PropertySource source;
771
772     bzero(&val, sizeof(val));
773
774     g_assert(child_block_size < INT_MAX);
775     g_value_init(&val, G_TYPE_INT);
776     g_value_set_int(&val, (gint)child_block_size);
777
778     for (i = 0; i < self->private->children->len; i ++) {
779         Device *child;
780         GValue property_result;
781
782         bzero(&property_result, sizeof(property_result));
783
784         if ((signed)i == self->private->failed)
785             continue;
786
787         child = g_ptr_array_index(self->private->children, i);
788
789         /* first, make sure the block size is at its default, or is already
790          * correct */
791         if (device_property_get_ex(child, PROPERTY_BLOCK_SIZE,
792                                  &property_result, NULL, &source)) {
793             gsize from_child = g_value_get_int(&property_result);
794             if (source != PROPERTY_SOURCE_DEFAULT
795                     && from_child != child_block_size) {
796                 device_set_error((Device *)self,
797                     vstrallocf(_("Child device %s already has its block size set to %zd, not %zd"),
798                                 child->device_name, from_child, child_block_size),
799                     DEVICE_STATUS_DEVICE_ERROR);
800                 return FALSE;
801             }
802         } else {
803             /* failing to get the block size isn't necessarily fatal.. */
804             g_warning("Error getting BLOCK_SIZE from %s: %s",
805                     child->device_name, device_error_or_status(child));
806         }
807         g_value_unset(&property_result);
808
809         if (!device_property_set(child, PROPERTY_BLOCK_SIZE, &val)) {
810             device_set_error((Device *)self,
811                 vstrallocf(_("Error setting block size on %s"), child->device_name),
812                 DEVICE_STATUS_DEVICE_ERROR);
813             return FALSE;
814         }
815     }
816
817     return TRUE;
818 }
819
820 /* The time for users to specify block sizes has ended; set this device's
821  * block-size attributes for easy access by other RAIT functions.  Returns
822  * FALSE on error, with the device's error status already set. */
823 static gboolean
824 fix_block_size(RaitDevice *self)
825 {
826     Device *dself = (Device *)self;
827     gsize my_block_size, child_block_size;
828
829     if (dself->block_size_source == PROPERTY_SOURCE_DEFAULT) {
830         child_block_size = calculate_block_size_from_children(self, &my_block_size);
831         if (child_block_size == 0)
832             return FALSE;
833
834         self->private->child_block_size = child_block_size;
835         dself->block_size = my_block_size;
836         dself->block_size_surety = PROPERTY_SURETY_GOOD;
837         dself->block_size_source = PROPERTY_SOURCE_DETECTED;
838     } else {
839         guint data_children;
840
841         find_simple_params(self, NULL, &data_children);
842         g_assert((dself->block_size % data_children) == 0);
843         child_block_size = dself->block_size / data_children;
844     }
845
846     /* now tell the children we mean it */
847     if (!set_block_size_on_children(self, child_block_size))
848         return FALSE;
849
850     return TRUE;
851 }
852
853 /* This structure contains common fields for many operations. Not all
854    operations use all fields, however. */
855 typedef struct {
856     gpointer result; /* May be a pointer; may be an integer or boolean
857                         stored with GINT_TO_POINTER. */
858     Device * child;  /* The device in question. Used by all
859                         operations. */
860     guint child_index; /* For recoverable operations (read-related
861                           operations), this field provides the number
862                           of this child in the self->private->children
863                           array. */
864 } GenericOp;
865
866 typedef gboolean (*BooleanExtractor)(gpointer data);
867
868 /* A BooleanExtractor */
869 static gboolean extract_boolean_generic_op(gpointer data) {
870     GenericOp * op = data;
871     return GPOINTER_TO_INT(op->result);
872 }
873
874 /* A BooleanExtractor */
875 static gboolean extract_boolean_pointer_op(gpointer data) {
876     GenericOp * op = data;
877     return op->result != NULL;
878 }
879
880 /* Does the equivalent of this perl command:
881      ! (first { !extractor($_) } @_
882    That is, calls extractor on each element of the array, and returns
883    TRUE if and only if all calls to extractor return TRUE. This function
884    stops as soon as an extractor returns false, so it's best if extractor
885    functions have no side effects.
886 */
887 static gboolean g_ptr_array_and(GPtrArray * array,
888                                 BooleanExtractor extractor) {
889     guint i;
890     if (array == NULL || array->len <= 0)
891         return FALSE;
892
893     for (i = 0; i < array->len; i ++) {
894         if (!extractor(g_ptr_array_index(array, i)))
895             return FALSE;
896     }
897
898     return TRUE;
899 }
900
901 /* Takes a RaitDevice, and makes a GPtrArray of GenericOp. */
902 static GPtrArray * make_generic_boolean_op_array(RaitDevice* self) {
903     GPtrArray * rval;
904     guint i;
905
906     rval = g_ptr_array_sized_new(self->private->children->len);
907     for (i = 0; i < self->private->children->len; i ++) {
908         GenericOp * op;
909
910         if ((signed)i == self->private->failed) {
911             continue;
912         }
913
914         op = g_new(GenericOp, 1);
915         op->child = g_ptr_array_index(self->private->children, i);
916         op->child_index = i;
917         g_ptr_array_add(rval, op);
918     }
919
920     return rval;
921 }
922
923 /* Takes a GPtrArray of GenericOp, and a BooleanExtractor, and does
924    all the proper handling for the result of operations that allow
925    device isolation. Returns FALSE only if an unrecoverable error
926    occured. */
927 static gboolean g_ptr_array_union_robust(RaitDevice * self, GPtrArray * ops,
928                                          BooleanExtractor extractor) {
929     int nfailed = 0;
930     int lastfailed = 0;
931     guint i;
932
933     /* We found one or more failed elements.  See which elements failed, and
934      * isolate them*/
935     for (i = 0; i < ops->len; i ++) {
936         GenericOp * op = g_ptr_array_index(ops, i);
937         if (!extractor(op)) {
938             self->private->failed = op->child_index;
939             g_warning("RAIT array %s isolated device %s: %s",
940                     DEVICE(self)->device_name,
941                     op->child->device_name,
942                     device_error(op->child));
943             nfailed++;
944             lastfailed = i;
945         }
946     }
947
948     /* no failures? great! */
949     if (nfailed == 0)
950         return TRUE;
951
952     /* a single failure in COMPLETE just puts us in DEGRADED mode */
953     if (self->private->status == RAIT_STATUS_COMPLETE && nfailed == 1) {
954         self->private->status = RAIT_STATUS_DEGRADED;
955         self->private->failed = lastfailed;
956         g_warning("RAIT array %s DEGRADED", DEVICE(self)->device_name);
957         return TRUE;
958     } else {
959         self->private->status = RAIT_STATUS_FAILED;
960         g_warning("RAIT array %s FAILED", DEVICE(self)->device_name);
961         return FALSE;
962     }
963 }
964
965 typedef struct {
966     RaitDevice * self;
967     char *rait_name;
968     char * device_name; /* IN */
969     Device * result;    /* OUT */
970 } OpenDeviceOp;
971
972 /* A GFunc. */
973 static void device_open_do_op(gpointer data,
974                               gpointer user_data G_GNUC_UNUSED) {
975     OpenDeviceOp * op = data;
976
977     if (strcmp(op->device_name, "ERROR") == 0 ||
978         strcmp(op->device_name, "MISSING") == 0 ||
979         strcmp(op->device_name, "DEGRADED") == 0) {
980         g_warning("RAIT device %s contains a missing element, attempting "
981                   "degraded mode.\n", op->rait_name);
982         op->result = NULL;
983     } else {
984         op->result = device_open(op->device_name);
985     }
986 }
987
988 /* Returns TRUE if and only if the volume label and time are equal. */
989 static gboolean compare_volume_results(Device * a, Device * b) {
990     return (0 == compare_possibly_null_strings(a->volume_time, b->volume_time)
991          && 0 == compare_possibly_null_strings(a->volume_label, b->volume_label));
992 }
993
994 /* Stickes new_message at the end of *old_message; frees new_message and
995  * may change *old_message. */
996 static void append_message(char ** old_message, char * new_message) {
997     char * rval;
998     if (*old_message == NULL || **old_message == '\0') {
999         rval = new_message;
1000     } else {
1001         rval = g_strdup_printf("%s; %s", *old_message, new_message);
1002         amfree(new_message);
1003     }
1004     amfree(*old_message);
1005     *old_message = rval;
1006 }
1007
1008 static void
1009 rait_device_open_device (Device * dself, char * device_name,
1010             char * device_type G_GNUC_UNUSED, char * device_node) {
1011     GPtrArray *device_names;
1012     GPtrArray * device_open_ops;
1013     guint i;
1014     gboolean failure;
1015     char *failure_errmsgs;
1016     DeviceStatusFlags failure_flags;
1017     RaitDevice * self;
1018
1019     self = RAIT_DEVICE(dself);
1020
1021     device_names = parse_device_name(device_node);
1022
1023     if (device_names == NULL) {
1024         device_set_error(dself,
1025             vstrallocf(_("Invalid RAIT device name '%s'"), device_name),
1026             DEVICE_STATUS_DEVICE_ERROR);
1027         return;
1028     }
1029
1030     /* Open devices in a separate thread, in case they have to rewind etc. */
1031     device_open_ops = g_ptr_array_new();
1032
1033     for (i = 0; i < device_names->len; i++) {
1034         OpenDeviceOp *op;
1035         char *name = g_ptr_array_index(device_names, i);
1036
1037         op = g_new(OpenDeviceOp, 1);
1038         op->device_name = name;
1039         op->result = NULL;
1040         op->self = self;
1041         op->rait_name = device_name;
1042         g_ptr_array_add(device_open_ops, op);
1043     }
1044
1045     g_ptr_array_free(device_names, TRUE);
1046     do_rait_child_ops(self, device_open_do_op, device_open_ops);
1047
1048     failure = FALSE;
1049     failure_errmsgs = NULL;
1050     failure_flags = 0;
1051
1052     /* Check results of opening devices. */
1053     for (i = 0; i < device_open_ops->len; i ++) {
1054         OpenDeviceOp *op = g_ptr_array_index(device_open_ops, i);
1055
1056         if (op->result != NULL &&
1057             op->result->status == DEVICE_STATUS_SUCCESS) {
1058             g_ptr_array_add(self->private->children, op->result);
1059         } else {
1060             char * this_failure_errmsg =
1061                 g_strdup_printf("%s: %s", op->device_name,
1062                                 device_error_or_status(op->result));
1063             DeviceStatusFlags status =
1064                 op->result == NULL ?
1065                     DEVICE_STATUS_DEVICE_ERROR : op->result->status;
1066             append_message(&failure_errmsgs,
1067                            strdup(this_failure_errmsg));
1068             failure_flags |= status;
1069             if (self->private->status == RAIT_STATUS_COMPLETE) {
1070                 /* The first failure just puts us in degraded mode. */
1071                 g_warning("%s: %s",
1072                           device_name, this_failure_errmsg);
1073                 g_warning("%s: %s failed, entering degraded mode.",
1074                           device_name, op->device_name);
1075                 g_ptr_array_add(self->private->children, op->result);
1076                 self->private->status = RAIT_STATUS_DEGRADED;
1077                 self->private->failed = i;
1078             } else {
1079                 /* The second and further failures are fatal. */
1080                 failure = TRUE;
1081             }
1082         }
1083         amfree(op->device_name);
1084     }
1085
1086     g_ptr_array_free_full(device_open_ops);
1087
1088     if (failure) {
1089         self->private->status = RAIT_STATUS_FAILED;
1090         device_set_error(dself, failure_errmsgs, failure_flags);
1091         return;
1092     }
1093
1094     /* Chain up. */
1095     if (parent_class->open_device) {
1096         parent_class->open_device(dself, device_name, device_type, device_node);
1097     }
1098     return;
1099 }
1100
1101 /* A GFunc. */
1102 static void read_label_do_op(gpointer data,
1103                              gpointer user_data G_GNUC_UNUSED) {
1104     GenericOp * op = data;
1105     op->result = GINT_TO_POINTER(device_read_label(op->child));
1106 }
1107
1108 static DeviceStatusFlags rait_device_read_label(Device * dself) {
1109     RaitDevice * self;
1110     GPtrArray * ops;
1111     DeviceStatusFlags failed_result = 0;
1112     char *failed_errmsg = NULL;
1113     unsigned int i;
1114     Device * first_success = NULL;
1115
1116     self = RAIT_DEVICE(dself);
1117
1118     amfree(dself->volume_time);
1119     amfree(dself->volume_label);
1120     amfree(dself->volume_header);
1121
1122     if (rait_device_in_error(self))
1123         return dself->status | DEVICE_STATUS_DEVICE_ERROR;
1124
1125     /* nail down our block size, if we haven't already */
1126     if (!fix_block_size(self))
1127         return FALSE;
1128
1129     ops = make_generic_boolean_op_array(self);
1130     
1131     do_rait_child_ops(self, read_label_do_op, ops);
1132     
1133     for (i = 0; i < ops->len; i ++) {
1134         GenericOp * op = g_ptr_array_index(ops, i);
1135         DeviceStatusFlags result = GPOINTER_TO_INT(op->result);
1136         if (op->result == DEVICE_STATUS_SUCCESS) {
1137             if (first_success == NULL) {
1138                 /* This is the first successful device. */
1139                 first_success = op->child;
1140             } else if (!compare_volume_results(first_success, op->child)) {
1141                 /* Doesn't match. :-( */
1142                 failed_errmsg = vstrallocf("Inconsistent volume labels/datestamps: "
1143                         "Got %s/%s on %s against %s/%s on %s.",
1144                         first_success->volume_label,
1145                         first_success->volume_time,
1146                         first_success->device_name,
1147                         op->child->volume_label,
1148                         op->child->volume_time,
1149                         op->child->device_name);
1150                 g_warning("%s", failed_errmsg);
1151                 failed_result |= DEVICE_STATUS_VOLUME_ERROR;
1152             }
1153         } else {
1154             failed_result |= result;
1155         }
1156     }
1157
1158     if (failed_result != DEVICE_STATUS_SUCCESS) {
1159         /* We had multiple failures or an inconsistency. */
1160         device_set_error(dself, failed_errmsg, failed_result);
1161     } else {
1162         /* Everything peachy. */
1163         amfree(failed_errmsg);
1164
1165         g_assert(first_success != NULL);
1166         if (first_success->volume_label != NULL) {
1167             dself->volume_label = g_strdup(first_success->volume_label);
1168         }
1169         if (first_success->volume_time != NULL) {
1170             dself->volume_time = g_strdup(first_success->volume_time);
1171         }
1172         if (first_success->volume_header != NULL) {
1173             dself->volume_header = dumpfile_copy(first_success->volume_header);
1174         }
1175     }
1176     
1177     g_ptr_array_free_full(ops);
1178
1179     return dself->status;
1180 }
1181
1182 typedef struct {
1183     GenericOp base;
1184     DeviceAccessMode mode; /* IN */
1185     char * label;          /* IN */
1186     char * timestamp;      /* IN */
1187 } StartOp;
1188
1189 /* A GFunc. */
1190 static void start_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1191     DeviceClass *klass;
1192     StartOp * param = data;
1193     
1194     klass = DEVICE_GET_CLASS(param->base.child);
1195     if (klass->start) {
1196         param->base.result =
1197             GINT_TO_POINTER((klass->start)(param->base.child,
1198                                             param->mode, param->label,
1199                                             param->timestamp));
1200     } else {
1201         param->base.result = FALSE;
1202     }
1203 }
1204
1205 static gboolean
1206 rait_device_configure(Device * dself, gboolean use_global_config)
1207 {
1208     RaitDevice *self = RAIT_DEVICE(dself);
1209     guint i;
1210
1211     for (i = 0; i < self->private->children->len; i ++) {
1212         Device *child;
1213
1214         if ((signed)i == self->private->failed)
1215             continue;
1216
1217         child = g_ptr_array_index(self->private->children, i);
1218         /* unconditionally configure the child without the global
1219          * configuration */
1220         if (!device_configure(child, FALSE))
1221             return FALSE;
1222     }
1223
1224     if (parent_class->configure) {
1225         return parent_class->configure(dself, use_global_config);
1226     }
1227
1228     return TRUE;
1229 }
1230
1231 static gboolean 
1232 rait_device_start (Device * dself, DeviceAccessMode mode, char * label,
1233                    char * timestamp) {
1234     GPtrArray * ops;
1235     guint i;
1236     gboolean success;
1237     RaitDevice * self;
1238     DeviceStatusFlags total_status;
1239     char *failure_errmsgs = NULL;
1240     char * label_from_device = NULL;
1241
1242     self = RAIT_DEVICE(dself);
1243
1244     if (rait_device_in_error(self)) return FALSE;
1245
1246     /* No starting in degraded mode. */
1247     if (self->private->status != RAIT_STATUS_COMPLETE &&
1248         (mode == ACCESS_WRITE || mode == ACCESS_APPEND)) {
1249         device_set_error(dself,
1250                          g_strdup_printf(_("RAIT device %s is read-only "
1251                                            "because it is in degraded mode.\n"),
1252                                          dself->device_name),
1253                          DEVICE_STATUS_DEVICE_ERROR);
1254         return FALSE;
1255     }
1256
1257     /* nail down our block size, if we haven't already */
1258     if (!fix_block_size(self))
1259         return FALSE;
1260
1261     dself->access_mode = mode;
1262     dself->in_file = FALSE;
1263
1264     ops = g_ptr_array_sized_new(self->private->children->len);
1265     for (i = 0; i < self->private->children->len; i ++) {
1266         StartOp * op;
1267         
1268         if ((signed)i == self->private->failed) {
1269             continue;
1270         }
1271
1272         op = g_new(StartOp, 1);
1273         op->base.child = g_ptr_array_index(self->private->children, i);
1274         op->mode = mode;
1275         op->label = g_strdup(label);
1276         op->timestamp = g_strdup(timestamp);
1277         g_ptr_array_add(ops, op);
1278     }
1279     
1280     do_rait_child_ops(self, start_do_op, ops);
1281
1282     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1283
1284     /* Check results of starting devices; this is mostly about the
1285      * VOLUME_UNLABELED flag. */
1286     total_status = 0;
1287     for (i = 0; i < ops->len; i ++) {
1288         StartOp * op = g_ptr_array_index(ops, i);
1289         Device *child = op->base.child;
1290
1291         total_status |= child->status;
1292         if (child->status != DEVICE_STATUS_SUCCESS) {
1293             /* record the error message and move on. */
1294             append_message(&failure_errmsgs,
1295                            g_strdup_printf("%s: %s",
1296                                            child->device_name,
1297                                            device_error_or_status(child)));
1298         } else {
1299             if (child->volume_label != NULL && child->volume_time != NULL) {
1300                 if (dself->volume_label != NULL && dself->volume_time != NULL) {
1301                     if (strcmp(child->volume_label, dself->volume_label) != 0 ||
1302                         strcmp(child->volume_time, dself->volume_time) != 0) {
1303                         /* Mismatch! (Two devices provided different labels) */
1304                         char * this_message =
1305                             g_strdup_printf("%s: Label (%s/%s) is different "
1306                                             "from label (%s/%s) found at "
1307                                             "device %s",
1308                                             child->device_name,
1309                                             child->volume_label,
1310                                             child->volume_time,
1311                                             dself->volume_label,
1312                                             dself->volume_time,
1313                                             label_from_device);
1314                         append_message(&failure_errmsgs, this_message);
1315                         total_status |= DEVICE_STATUS_DEVICE_ERROR;
1316                     }
1317                 } else {
1318                     /* First device with a volume. */
1319                     dself->volume_label = g_strdup(child->volume_label);
1320                     dself->volume_time = g_strdup(child->volume_time);
1321                     label_from_device = g_strdup(child->device_name);
1322                 }
1323             } else {
1324                 /* Device problem, it says it succeeded but sets no label? */
1325                 char * this_message =
1326                     g_strdup_printf("%s: Says label read, but no volume "
1327                                      "label found.", child->device_name);
1328                 append_message(&failure_errmsgs, this_message);
1329                 total_status |= DEVICE_STATUS_DEVICE_ERROR;
1330             }
1331         }
1332     }
1333
1334     amfree(label_from_device);
1335     g_ptr_array_free_full(ops);
1336
1337     dself->status = total_status;
1338
1339     if (!success) {
1340         device_set_error(dself, failure_errmsgs, total_status);
1341         return FALSE;
1342     }
1343
1344     amfree(failure_errmsgs);
1345     return TRUE;
1346 }
1347
1348 typedef struct {
1349     GenericOp base;
1350     dumpfile_t * info; /* IN */
1351     int fileno;
1352 } StartFileOp;
1353
1354 /* a GFunc */
1355 static void start_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1356     StartFileOp * op = data;
1357     op->base.result = GINT_TO_POINTER(device_start_file(op->base.child,
1358                                                         op->info));
1359     op->fileno = op->base.child->file;
1360     if (op->fileno < 1) {
1361         op->base.result = FALSE;
1362     }
1363 }
1364
1365 static gboolean
1366 rait_device_start_file (Device * dself, dumpfile_t * info) {
1367     GPtrArray * ops;
1368     guint i;
1369     gboolean success;
1370     RaitDevice * self;
1371     int actual_file = -1;
1372
1373     self = RAIT_DEVICE(dself);
1374
1375     if (rait_device_in_error(self)) return FALSE;
1376     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1377
1378     ops = g_ptr_array_sized_new(self->private->children->len);
1379     for (i = 0; i < self->private->children->len; i ++) {
1380         StartFileOp * op;
1381         op = g_new(StartFileOp, 1);
1382         op->base.child = g_ptr_array_index(self->private->children, i);
1383         /* each child gets its own copy of the header, to munge as it
1384          * likes (setting blocksize, at least) */
1385         op->info = dumpfile_copy(info);
1386         g_ptr_array_add(ops, op);
1387     }
1388     
1389     do_rait_child_ops(self, start_file_do_op, ops);
1390
1391     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1392     
1393     for (i = 0; i < self->private->children->len && success; i ++) {
1394         StartFileOp * op = g_ptr_array_index(ops, i);
1395         if (!op->base.result)
1396             continue;
1397         g_assert(op->fileno >= 1);
1398         if (actual_file < 1) {
1399             actual_file = op->fileno;
1400         }
1401         if (actual_file != op->fileno) {
1402             /* File number mismatch! Aah, my hair is on fire! */
1403             device_set_error(dself,
1404                              g_strdup_printf("File number mismatch in "
1405                                              "rait_device_start_file(): "
1406                                              "Child %s reported file number "
1407                                              "%d, another child reported "
1408                                              "file number %d.",
1409                                              op->base.child->device_name,
1410                                              op->fileno, actual_file),
1411                              DEVICE_STATUS_DEVICE_ERROR);
1412             success = FALSE;
1413             op->base.result = FALSE;
1414         }
1415     }
1416
1417     for (i = 0; i < ops->len && success; i ++) {
1418         StartFileOp * op = g_ptr_array_index(ops, i);
1419         if (op->info) dumpfile_free(op->info);
1420     }
1421     g_ptr_array_free_full(ops);
1422
1423     if (!success) {
1424         if (!device_in_error(dself)) {
1425             device_set_error(dself, stralloc("One or more devices "
1426                                              "failed to start_file"),
1427                              DEVICE_STATUS_DEVICE_ERROR);
1428         }
1429         return FALSE;
1430     }
1431
1432     dself->in_file = TRUE;
1433     g_assert(actual_file >= 1);
1434     dself->file = actual_file;
1435
1436     return TRUE;
1437 }
1438
1439 static void find_simple_params(RaitDevice * self,
1440                                guint * num_children,
1441                                guint * data_children) {
1442     int num, data;
1443     
1444     num = self->private->children->len;
1445     if (num > 1)
1446         data = num - 1;
1447     else
1448         data = num;
1449     if (num_children != NULL)
1450         *num_children = num;
1451     if (data_children != NULL)
1452         *data_children = data;
1453 }
1454
1455 typedef struct {
1456     GenericOp base;
1457     guint size;           /* IN */
1458     gpointer data;        /* IN */
1459     gboolean data_needs_free; /* bookkeeping */
1460 } WriteBlockOp;
1461
1462 /* a GFunc. */
1463 static void write_block_do_op(gpointer data,
1464                               gpointer user_data G_GNUC_UNUSED) {
1465     WriteBlockOp * op = data;
1466
1467     op->base.result =
1468         GINT_TO_POINTER(device_write_block(op->base.child, op->size, op->data));
1469 }
1470
1471 /* Parity block generation. Performance of this function can be improved
1472    considerably by using larger-sized integers or
1473    assembly-coded vector instructions. Parameters are:
1474    % data       - All data chunks in series (chunk_size * num_chunks bytes)
1475    % parity     - Allocated space for parity block (chunk_size bytes)
1476  */
1477 static void make_parity_block(char * data, char * parity,
1478                               guint chunk_size, guint num_chunks) {
1479     guint i;
1480     bzero(parity, chunk_size);
1481     for (i = 0; i < num_chunks - 1; i ++) {
1482         guint j;
1483         for (j = 0; j < chunk_size; j ++) {
1484             parity[j] ^= data[chunk_size*i + j];
1485         }
1486     }
1487 }
1488
1489 /* Does the same thing as make_parity_block, but instead of using a
1490    single memory chunk holding all chunks, it takes a GPtrArray of
1491    chunks. */
1492 static void make_parity_block_extents(GPtrArray * data, char * parity,
1493                                       guint chunk_size) {
1494     guint i;
1495     bzero(parity, chunk_size);
1496     for (i = 0; i < data->len; i ++) {
1497         guint j;
1498         char * data_chunk;
1499         data_chunk = g_ptr_array_index(data, i);
1500         for (j = 0; j < chunk_size; j ++) {
1501             parity[j] ^= data_chunk[j];
1502         }
1503     }
1504 }
1505
1506 /* Does the parity creation algorithm. Allocates and returns a single
1507    device block from a larger RAIT block. chunks and chunk are 1-indexed. */
1508 static char * extract_data_block(char * data, guint size,
1509                                  guint chunks, guint chunk) {
1510     char * rval;
1511     guint chunk_size;
1512
1513     g_assert(chunks > 0 && chunk > 0 && chunk <= chunks);
1514     g_assert(data != NULL);
1515     g_assert(size > 0 && size % (chunks - 1) == 0);
1516
1517     chunk_size = size / (chunks - 1);
1518     rval = g_malloc(chunk_size);
1519     if (chunks != chunk) {
1520         /* data block. */
1521         memcpy(rval, data + chunk_size * (chunk - 1), chunk_size);
1522     } else {
1523         make_parity_block(data, rval, chunk_size, chunks);
1524     }
1525     
1526     return rval;
1527 }
1528
1529 static gboolean 
1530 rait_device_write_block (Device * dself, guint size, gpointer data) {
1531     GPtrArray * ops;
1532     guint i;
1533     gboolean success;
1534     guint data_children, num_children;
1535     gsize blocksize = dself->block_size;
1536     RaitDevice * self;
1537     gboolean last_block = (size < blocksize);
1538
1539     self = RAIT_DEVICE(dself);
1540
1541     if (rait_device_in_error(self)) return FALSE;
1542     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1543
1544     find_simple_params(RAIT_DEVICE(self), &num_children, &data_children);
1545     num_children = self->private->children->len;
1546     if (num_children != 1)
1547         data_children = num_children - 1;
1548     else
1549         data_children = num_children;
1550     
1551     g_assert(size % data_children == 0 || last_block);
1552
1553     /* zero out to the end of a short block -- tape devices only write
1554      * whole blocks. */
1555     if (last_block) {
1556         char *new_data;
1557
1558         new_data = g_malloc(blocksize);
1559         memcpy(new_data, data, size);
1560         bzero(new_data + size, blocksize - size);
1561
1562         data = new_data;
1563         size = blocksize;
1564     }
1565
1566     ops = g_ptr_array_sized_new(num_children);
1567     for (i = 0; i < self->private->children->len; i ++) {
1568         WriteBlockOp * op;
1569         op = g_malloc(sizeof(*op));
1570         op->base.child = g_ptr_array_index(self->private->children, i);
1571         op->size = size / data_children;
1572         if (num_children <= 2) {
1573             op->data = data;
1574             op->data_needs_free = FALSE;
1575         } else {
1576             op->data_needs_free = TRUE;
1577             op->data = extract_data_block(data, size, num_children, i + 1);
1578         }
1579         g_ptr_array_add(ops, op);
1580     }
1581
1582     do_rait_child_ops(self, write_block_do_op, ops);
1583
1584     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1585
1586     for (i = 0; i < self->private->children->len; i ++) {
1587         WriteBlockOp * op = g_ptr_array_index(ops, i);
1588         if (op->data_needs_free)
1589             free(op->data);
1590     }
1591
1592     if (last_block) {
1593         amfree(data);
1594     }
1595     
1596     g_ptr_array_free_full(ops);
1597
1598     if (!success) {
1599         /* TODO be more specific here */
1600         /* TODO: handle EOF here -- if one or more (or two or more??)
1601          * children have is_eof* set, then reflect that in our error
1602          * status, and finish_file all of the non-EOF children. What's
1603          * more fun is when one device fails and must be isolated at
1604          * the same time another hits EOF. */
1605         device_set_error(dself,
1606             stralloc("One or more devices failed to write_block"),
1607             DEVICE_STATUS_DEVICE_ERROR);
1608         return FALSE;
1609     } else {
1610         dself->block ++;
1611
1612         return TRUE;
1613     }
1614 }
1615
1616 /* A GFunc */
1617 static void finish_file_do_op(gpointer data,
1618                               gpointer user_data G_GNUC_UNUSED) {
1619     GenericOp * op = data;
1620     if (op->child) {
1621         op->result = GINT_TO_POINTER(device_finish_file(op->child));
1622     } else {
1623         op->result = FALSE;
1624     }
1625 }
1626
1627 static gboolean 
1628 rait_device_finish_file (Device * dself) {
1629     GPtrArray * ops;
1630     gboolean success;
1631     RaitDevice * self = RAIT_DEVICE(dself);
1632
1633     g_assert(self != NULL);
1634     if (rait_device_in_error(dself)) return FALSE;
1635     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1636
1637     ops = make_generic_boolean_op_array(self);
1638     
1639     do_rait_child_ops(self, finish_file_do_op, ops);
1640
1641     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1642
1643     g_ptr_array_free_full(ops);
1644
1645     if (!success) {
1646         /* TODO: be more specific here */
1647         device_set_error(dself,
1648                          g_strdup("One or more devices failed to finish_file"),
1649             DEVICE_STATUS_DEVICE_ERROR);
1650         return FALSE;
1651     }
1652
1653     dself->in_file = FALSE;
1654     return TRUE;
1655 }
1656
1657 typedef struct {
1658     GenericOp base;
1659     guint requested_file;                /* IN */
1660     guint actual_file;                   /* OUT */
1661 } SeekFileOp;
1662
1663 /* a GFunc. */
1664 static void seek_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1665     SeekFileOp * op = data;
1666     op->base.result = device_seek_file(op->base.child, op->requested_file);
1667     op->actual_file = op->base.child->file;
1668 }
1669
1670 static dumpfile_t * 
1671 rait_device_seek_file (Device * dself, guint file) {
1672     GPtrArray * ops;
1673     guint i;
1674     gboolean success;
1675     dumpfile_t * rval;
1676     RaitDevice * self = RAIT_DEVICE(dself);
1677     guint actual_file = 0;
1678     gboolean in_file = FALSE;
1679
1680     if (rait_device_in_error(self)) return NULL;
1681
1682     dself->in_file = FALSE;
1683     dself->is_eof = FALSE;
1684     dself->block = 0;
1685
1686     ops = g_ptr_array_sized_new(self->private->children->len);
1687     for (i = 0; i < self->private->children->len; i ++) {
1688         SeekFileOp * op;
1689         if ((int)i == self->private->failed)
1690             continue; /* This device is broken. */
1691         op = g_new(SeekFileOp, 1);
1692         op->base.child = g_ptr_array_index(self->private->children, i);
1693         op->base.child_index = i;
1694         op->requested_file = file;
1695         g_ptr_array_add(ops, op);
1696     }
1697     
1698     do_rait_child_ops(self, seek_file_do_op, ops);
1699
1700     /* This checks for NULL values, but we still have to check for
1701        consistant headers. */
1702     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1703                                        ops, extract_boolean_pointer_op);
1704
1705     rval = NULL;
1706     for (i = 0; i < ops->len; i ++) {
1707         SeekFileOp * this_op;
1708         dumpfile_t * this_result;
1709         guint this_actual_file;
1710         gboolean this_in_file;
1711         
1712         this_op = (SeekFileOp*)g_ptr_array_index(ops, i);
1713
1714         if ((signed)this_op->base.child_index == self->private->failed)
1715             continue;
1716
1717         this_result = this_op->base.result;
1718         this_actual_file = this_op->actual_file;
1719         this_in_file = this_op->base.child->in_file;
1720
1721         if (rval == NULL) {
1722             rval = this_result;
1723             actual_file = this_actual_file;
1724             in_file = this_in_file;
1725         } else {
1726             if (headers_are_equal(rval, this_result) &&
1727                 actual_file == this_actual_file &&
1728                 in_file == this_in_file) {
1729                 /* Do nothing. */
1730             } else {
1731                 success = FALSE;
1732             }
1733             free(this_result);
1734         }
1735     }
1736
1737     g_ptr_array_free_full(ops);
1738
1739     if (!success) {
1740         amfree(rval);
1741         /* TODO: be more specific here */
1742         device_set_error(dself,
1743                          g_strdup("One or more devices failed to seek_file"),
1744             DEVICE_STATUS_DEVICE_ERROR);
1745         return NULL;
1746     }
1747
1748     /* update our state */
1749     dself->in_file = in_file;
1750     dself->file = actual_file;
1751
1752     return rval;
1753 }
1754
1755 typedef struct {
1756     GenericOp base;
1757     guint64 block; /* IN */
1758 } SeekBlockOp;
1759
1760 /* a GFunc. */
1761 static void seek_block_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1762     SeekBlockOp * op = data;
1763     op->base.result =
1764         GINT_TO_POINTER(device_seek_block(op->base.child, op->block));
1765 }
1766
1767 static gboolean 
1768 rait_device_seek_block (Device * dself, guint64 block) {
1769     GPtrArray * ops;
1770     guint i;
1771     gboolean success;
1772
1773     RaitDevice * self = RAIT_DEVICE(dself);
1774
1775     if (rait_device_in_error(self)) return FALSE;
1776
1777     ops = g_ptr_array_sized_new(self->private->children->len);
1778     for (i = 0; i < self->private->children->len; i ++) {
1779         SeekBlockOp * op;
1780         if ((int)i == self->private->failed)
1781             continue; /* This device is broken. */
1782         op = g_new(SeekBlockOp, 1);
1783         op->base.child = g_ptr_array_index(self->private->children, i);
1784         op->base.child_index = i;
1785         op->block = block;
1786         g_ptr_array_add(ops, op);
1787     }
1788     
1789     do_rait_child_ops(self, seek_block_do_op, ops);
1790
1791     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1792                                        ops, extract_boolean_generic_op);
1793
1794     g_ptr_array_free_full(ops);
1795
1796     if (!success) {
1797         /* TODO: be more specific here */
1798         device_set_error(dself,
1799             stralloc("One or more devices failed to seek_block"),
1800             DEVICE_STATUS_DEVICE_ERROR);
1801         return FALSE;
1802     }
1803
1804     dself->block = block;
1805     return TRUE;
1806 }
1807
1808 typedef struct {
1809     GenericOp base;
1810     gpointer buffer; /* IN */
1811     int read_size;      /* IN/OUT -- note not a pointer */
1812     int desired_read_size; /* bookkeeping */
1813 } ReadBlockOp;
1814
1815 /* a GFunc. */
1816 static void read_block_do_op(gpointer data,
1817                              gpointer user_data G_GNUC_UNUSED) {
1818     ReadBlockOp * op = data;
1819     op->base.result =
1820         GINT_TO_POINTER(device_read_block(op->base.child, op->buffer,
1821                                           &(op->read_size)));
1822     if (op->read_size > op->desired_read_size) {
1823         g_warning("child device %s tried to return an oversized block, which the RAIT device does not support",
1824                   op->base.child->device_name);
1825     }
1826 }
1827
1828 /* A BooleanExtractor. This one checks for a successful read. */
1829 static gboolean extract_boolean_read_block_op_data(gpointer data) {
1830     ReadBlockOp * op = data;
1831     return GPOINTER_TO_INT(op->base.result) == op->desired_read_size;
1832 }
1833
1834 /* A BooleanExtractor. This one checks for EOF. */
1835 static gboolean extract_boolean_read_block_op_eof(gpointer data) {
1836     ReadBlockOp * op = data;
1837     return op->base.child->is_eof;
1838 }
1839
1840 /* Counts the number of elements in an array matching a given proposition. */
1841 static int g_ptr_array_count(GPtrArray * array, BooleanExtractor filter) {
1842     int rval;
1843     unsigned int i;
1844     rval = 0;
1845     for (i = 0; i < array->len ; i++) {
1846         if (filter(g_ptr_array_index(array, i)))
1847             rval ++;
1848     }
1849     return rval;
1850 }
1851
1852 static gboolean raid_block_reconstruction(RaitDevice * self, GPtrArray * ops,
1853                                       gpointer buf, size_t bufsize) {
1854     guint num_children, data_children;
1855     gsize blocksize;
1856     gsize child_blocksize;
1857     guint i;
1858     int parity_child;
1859     gpointer parity_block = NULL;
1860     gboolean success;
1861
1862     success = TRUE;
1863
1864     blocksize = DEVICE(self)->block_size;
1865     find_simple_params(self, &num_children, &data_children);
1866
1867     if (num_children > 1)
1868         parity_child = num_children - 1;
1869     else
1870         parity_child = -1;
1871
1872     child_blocksize = blocksize / data_children;
1873
1874     for (i = 0; i < ops->len; i ++) {
1875         ReadBlockOp * op = g_ptr_array_index(ops, i);
1876         if (!extract_boolean_read_block_op_data(op))
1877             continue;
1878         if ((int)(op->base.child_index) == parity_child) {
1879             parity_block = op->buffer;
1880         } else {
1881             g_assert(child_blocksize * (op->base.child_index+1) <= bufsize);
1882             memcpy((char *)buf + child_blocksize * op->base.child_index, op->buffer,
1883                    child_blocksize);
1884         }
1885     }
1886     g_assert(parity_block != NULL); /* should have found parity_child */
1887
1888     if (self->private->status == RAIT_STATUS_COMPLETE) {
1889         if (num_children >= 2) {
1890             /* Verify the parity block. This code is inefficient but
1891                does the job for the 2-device case, too. */
1892             gpointer constructed_parity;
1893             GPtrArray * data_extents;
1894             
1895             constructed_parity = g_malloc(child_blocksize);
1896             data_extents = g_ptr_array_sized_new(data_children);
1897             for (i = 0; i < data_children; i ++) {
1898                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1899                 g_assert(extract_boolean_read_block_op_data(op));
1900                 if ((int)op->base.child_index == parity_child)
1901                     continue;
1902                 g_ptr_array_add(data_extents, op->buffer);
1903             }
1904             make_parity_block_extents(data_extents, constructed_parity,
1905                                       child_blocksize);
1906             
1907             if (0 != memcmp(parity_block, constructed_parity,
1908                             child_blocksize)) {
1909                 device_set_error(DEVICE(self),
1910                     stralloc(_("RAIT is inconsistent: Parity block did not match data blocks.")),
1911                     DEVICE_STATUS_DEVICE_ERROR);
1912                 /* TODO: can't we just isolate the device in this case? */
1913                 success = FALSE;
1914             }
1915             g_ptr_array_free(data_extents, TRUE);
1916             amfree(constructed_parity);
1917         } else { /* do nothing. */ }
1918     } else if (self->private->status == RAIT_STATUS_DEGRADED) {
1919         g_assert(self->private->failed >= 0 && self->private->failed < (int)num_children);
1920         /* We are in degraded mode. What's missing? */
1921         if (self->private->failed == parity_child) {
1922             /* do nothing. */
1923         } else if (num_children >= 2) {
1924             /* Reconstruct failed block from parity block. */
1925             GPtrArray * data_extents = g_ptr_array_new();            
1926
1927             for (i = 0; i < data_children; i ++) {
1928                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1929                 if (!extract_boolean_read_block_op_data(op))
1930                     continue;
1931                 g_ptr_array_add(data_extents, op->buffer);
1932             }
1933
1934             /* Conveniently, the reconstruction is the same procedure
1935                as the parity generation. This even works if there is
1936                only one remaining device! */
1937             make_parity_block_extents(data_extents,
1938                                       (char *)buf + (child_blocksize *
1939                                              self->private->failed),
1940                                       child_blocksize);
1941
1942             /* The array members belong to our ops argument. */
1943             g_ptr_array_free(data_extents, TRUE);
1944         } else {
1945             g_assert_not_reached();
1946         }
1947     } else {
1948         /* device is already in FAILED state -- we shouldn't even be here */
1949         success = FALSE;
1950     }
1951     return success;
1952 }
1953
1954 static int
1955 rait_device_read_block (Device * dself, gpointer buf, int * size) {
1956     GPtrArray * ops;
1957     guint i;
1958     gboolean success;
1959     guint num_children, data_children;
1960     gsize blocksize = dself->block_size;
1961     gsize child_blocksize;
1962
1963     RaitDevice * self = RAIT_DEVICE(dself);
1964
1965     if (rait_device_in_error(self)) return -1;
1966
1967     find_simple_params(self, &num_children, &data_children);
1968
1969     /* tell caller they haven't given us a big enough buffer */
1970     if (blocksize > (gsize)*size) {
1971         g_assert(blocksize < INT_MAX);
1972         *size = (int)blocksize;
1973         return 0;
1974     }
1975
1976     g_assert(blocksize % data_children == 0); /* see find_block_size */
1977     child_blocksize = blocksize / data_children;
1978
1979     ops = g_ptr_array_sized_new(num_children);
1980     for (i = 0; i < num_children; i ++) {
1981         ReadBlockOp * op;
1982         if ((int)i == self->private->failed)
1983             continue; /* This device is broken. */
1984         op = g_new(ReadBlockOp, 1);
1985         op->base.child = g_ptr_array_index(self->private->children, i);
1986         op->base.child_index = i;
1987         op->buffer = g_malloc(child_blocksize);
1988         op->desired_read_size = op->read_size = child_blocksize;
1989         g_ptr_array_add(ops, op);
1990     }
1991     
1992     do_rait_child_ops(self, read_block_do_op, ops);
1993
1994     if (g_ptr_array_count(ops, extract_boolean_read_block_op_data)) {
1995         if (!g_ptr_array_union_robust(RAIT_DEVICE(self),
1996                                      ops,
1997                                      extract_boolean_read_block_op_data)) {
1998             /* TODO: be more specific */
1999             device_set_error(dself,
2000                 stralloc(_("Error occurred combining blocks from child devices")),
2001                 DEVICE_STATUS_DEVICE_ERROR);
2002             success = FALSE;
2003         } else {
2004             /* raid_block_reconstruction sets the error status if necessary */
2005             success = raid_block_reconstruction(RAIT_DEVICE(self),
2006                                                 ops, buf, (size_t)*size);
2007         }
2008     } else {
2009         success = FALSE;
2010         if (g_ptr_array_union_robust(RAIT_DEVICE(self),
2011                                      ops,
2012                                      extract_boolean_read_block_op_eof)) {
2013             device_set_error(dself,
2014                 stralloc(_("EOF")),
2015                 DEVICE_STATUS_SUCCESS);
2016             dself->is_eof = TRUE;
2017             dself->in_file = FALSE;
2018         } else {
2019             device_set_error(dself,
2020                 stralloc(_("All child devices failed to read, but not all are at eof")),
2021                 DEVICE_STATUS_DEVICE_ERROR);
2022         }
2023     }
2024
2025     for (i = 0; i < ops->len; i ++) {
2026         ReadBlockOp * op = g_ptr_array_index(ops, i);
2027         amfree(op->buffer);
2028     }
2029     g_ptr_array_free_full(ops);
2030
2031     if (success) {
2032         dself->block++;
2033         *size = blocksize;
2034         return blocksize;
2035     } else {
2036         return -1;
2037     }
2038 }
2039
2040 /* property utility functions */
2041
2042 typedef struct {
2043     GenericOp base;
2044     DevicePropertyId id;   /* IN */
2045     GValue value;          /* IN/OUT */
2046     PropertySurety surety; /* IN (for set) */
2047     PropertySource source; /* IN (for set) */
2048 } PropertyOp;
2049
2050 /* Creates a GPtrArray of PropertyOf for a get or set operation. */
2051 static GPtrArray * make_property_op_array(RaitDevice * self,
2052                                           DevicePropertyId id,
2053                                           GValue * value,
2054                                           PropertySurety surety,
2055                                           PropertySource source) {
2056     guint i;
2057     GPtrArray * ops;
2058     ops = g_ptr_array_sized_new(self->private->children->len);
2059     for (i = 0; i < self->private->children->len; i ++) {
2060         PropertyOp * op;
2061
2062         if ((signed)i == self->private->failed) {
2063             continue;
2064         }
2065
2066         op = g_new(PropertyOp, 1);
2067         op->base.child = g_ptr_array_index(self->private->children, i);
2068         op->id = id;
2069         bzero(&(op->value), sizeof(op->value));
2070         if (value != NULL) {
2071             g_value_unset_copy(value, &(op->value));
2072         }
2073         op->surety = surety;
2074         op->source = source;
2075         g_ptr_array_add(ops, op);
2076     }
2077
2078     return ops;
2079 }
2080
2081 /* A GFunc. */
2082 static void property_get_do_op(gpointer data,
2083                                gpointer user_data G_GNUC_UNUSED) {
2084     PropertyOp * op = data;
2085
2086     bzero(&(op->value), sizeof(op->value));
2087     op->base.result =
2088         GINT_TO_POINTER(device_property_get(op->base.child, op->id,
2089                                             &(op->value)));
2090 }
2091
2092 /* A GFunc. */
2093 static void property_set_do_op(gpointer data,
2094                                gpointer user_data G_GNUC_UNUSED) {
2095     PropertyOp * op = data;
2096
2097     op->base.result =
2098         GINT_TO_POINTER(device_property_set_ex(op->base.child, op->id,
2099                                                &(op->value), op->surety,
2100                                                op->source));
2101     g_value_unset(&(op->value));
2102 }
2103
2104 /* PropertyGetFns and PropertySetFns */
2105
2106 static gboolean
2107 property_get_block_size_fn(Device *dself,
2108     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2109     PropertySurety *surety, PropertySource *source)
2110 {
2111     RaitDevice *self = RAIT_DEVICE(dself);
2112     gsize my_block_size;
2113
2114     if (dself->block_size_source != PROPERTY_SOURCE_DEFAULT) {
2115         my_block_size = dself->block_size;
2116
2117         if (surety)
2118             *surety = dself->block_size_surety;
2119     } else {
2120         gsize child_block_size;
2121         child_block_size = calculate_block_size_from_children(self,
2122                                                     &my_block_size);
2123         if (child_block_size == 0)
2124             return FALSE;
2125
2126         if (surety)
2127             *surety = PROPERTY_SURETY_BAD; /* may still change */
2128     }
2129
2130     if (val) {
2131         g_value_unset_init(val, G_TYPE_INT);
2132         g_assert(my_block_size < G_MAXINT); /* gsize -> gint */
2133         g_value_set_int(val, (gint)my_block_size);
2134     }
2135
2136     if (source)
2137         *source = dself->block_size_source;
2138
2139     return TRUE;
2140 }
2141
2142 static gboolean
2143 property_set_block_size_fn(Device *dself,
2144     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2145     PropertySurety surety, PropertySource source)
2146 {
2147     RaitDevice *self = RAIT_DEVICE(dself);
2148     gint my_block_size = g_value_get_int(val);
2149     guint data_children;
2150
2151     find_simple_params(self, NULL, &data_children);
2152     if ((my_block_size % data_children) != 0) {
2153         device_set_error(dself,
2154             vstrallocf(_("Block size must be a multiple of %d"), data_children),
2155             DEVICE_STATUS_DEVICE_ERROR);
2156         return FALSE;
2157     }
2158
2159     dself->block_size = my_block_size;
2160     dself->block_size_source = source;
2161     dself->block_size_surety = surety;
2162
2163     if (!fix_block_size(self))
2164         return FALSE;
2165
2166     return TRUE;
2167 }
2168
2169 static gboolean
2170 property_get_canonical_name_fn(Device *dself,
2171     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2172     PropertySurety *surety, PropertySource *source)
2173 {
2174     RaitDevice *self = RAIT_DEVICE(dself);
2175     char *canonical = child_device_names_to_rait_name(self);
2176
2177     if (val) {
2178         g_value_unset_init(val, G_TYPE_STRING);
2179         g_value_set_string(val, canonical);
2180         g_free(canonical);
2181     }
2182
2183     if (surety)
2184         *surety = PROPERTY_SURETY_GOOD;
2185
2186     if (source)
2187         *source = PROPERTY_SOURCE_DETECTED;
2188
2189     return TRUE;
2190 }
2191
2192 static gboolean
2193 property_get_concurrency_fn(Device *dself,
2194     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2195     PropertySurety *surety, PropertySource *source)
2196 {
2197     RaitDevice *self = RAIT_DEVICE(dself);
2198     ConcurrencyParadigm result;
2199     guint i;
2200     GPtrArray * ops;
2201     gboolean success;
2202
2203     ops = make_property_op_array(self, PROPERTY_CONCURRENCY, NULL, 0, 0);
2204     do_rait_child_ops(self, property_get_do_op, ops);
2205
2206     /* find the most restrictive paradigm acceptable to all
2207      * child devices */
2208     result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
2209     success = TRUE;
2210     for (i = 0; i < ops->len; i ++) {
2211         ConcurrencyParadigm cur;
2212         PropertyOp * op = g_ptr_array_index(ops, i);
2213
2214         if (!op->base.result
2215             || G_VALUE_TYPE(&(op->value)) != CONCURRENCY_PARADIGM_TYPE) {
2216             success = FALSE;
2217             break;
2218         }
2219
2220         cur = g_value_get_enum(&(op->value));
2221         if (result == CONCURRENCY_PARADIGM_EXCLUSIVE ||
2222             cur == CONCURRENCY_PARADIGM_EXCLUSIVE) {
2223             result = CONCURRENCY_PARADIGM_EXCLUSIVE;
2224         } else if (result == CONCURRENCY_PARADIGM_SHARED_READ ||
2225                    cur == CONCURRENCY_PARADIGM_SHARED_READ) {
2226             result = CONCURRENCY_PARADIGM_SHARED_READ;
2227         } else if (result == CONCURRENCY_PARADIGM_RANDOM_ACCESS &&
2228                    cur == CONCURRENCY_PARADIGM_RANDOM_ACCESS) {
2229             result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
2230         } else {
2231             success = FALSE;
2232             break;
2233         }
2234     }
2235
2236     g_ptr_array_free_full(ops);
2237
2238     if (success) {
2239         if (val) {
2240             g_value_unset_init(val, CONCURRENCY_PARADIGM_TYPE);
2241             g_value_set_enum(val, result);
2242         }
2243
2244         if (surety)
2245             *surety = PROPERTY_SURETY_GOOD;
2246
2247         if (source)
2248             *source = PROPERTY_SOURCE_DETECTED;
2249     }
2250
2251     return success;
2252 }
2253
2254 static gboolean
2255 property_get_streaming_fn(Device *dself,
2256     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2257     PropertySurety *surety, PropertySource *source)
2258 {
2259     RaitDevice *self = RAIT_DEVICE(dself);
2260     StreamingRequirement result;
2261     guint i;
2262     GPtrArray * ops;
2263     gboolean success;
2264
2265     ops = make_property_op_array(self, PROPERTY_STREAMING, NULL, 0, 0);
2266     do_rait_child_ops(self, property_get_do_op, ops);
2267
2268     /* combine the child streaming requirements, selecting the strongest
2269      * requirement of the bunch. */
2270     result = STREAMING_REQUIREMENT_NONE;
2271     success = TRUE;
2272     for (i = 0; i < ops->len; i ++) {
2273         StreamingRequirement cur;
2274         PropertyOp * op = g_ptr_array_index(ops, i);
2275
2276         if (!op->base.result
2277             || G_VALUE_TYPE(&(op->value)) != STREAMING_REQUIREMENT_TYPE) {
2278             success = FALSE;
2279             break;
2280         }
2281
2282         cur = g_value_get_enum(&(op->value));
2283         if (result == STREAMING_REQUIREMENT_REQUIRED ||
2284             cur == STREAMING_REQUIREMENT_REQUIRED) {
2285             result = STREAMING_REQUIREMENT_REQUIRED;
2286         } else if (result == STREAMING_REQUIREMENT_DESIRED ||
2287                    cur == STREAMING_REQUIREMENT_DESIRED) {
2288             result = STREAMING_REQUIREMENT_DESIRED;
2289         } else if (result == STREAMING_REQUIREMENT_NONE &&
2290                    cur == STREAMING_REQUIREMENT_NONE) {
2291             result = STREAMING_REQUIREMENT_NONE;
2292         } else {
2293             success = FALSE;
2294             break;
2295         }
2296     }
2297
2298     g_ptr_array_free_full(ops);
2299
2300     if (success) {
2301         if (val) {
2302             g_value_unset_init(val, STREAMING_REQUIREMENT_TYPE);
2303             g_value_set_enum(val, result);
2304         }
2305
2306         if (surety)
2307             *surety = PROPERTY_SURETY_GOOD;
2308
2309         if (source)
2310             *source = PROPERTY_SOURCE_DETECTED;
2311     }
2312
2313     return success;
2314 }
2315
2316 static gboolean
2317 property_get_boolean_and_fn(Device *dself,
2318     DevicePropertyBase *base, GValue *val,
2319     PropertySurety *surety, PropertySource *source)
2320 {
2321     RaitDevice *self = RAIT_DEVICE(dself);
2322     gboolean result;
2323     guint i;
2324     GPtrArray * ops;
2325     gboolean success;
2326
2327     ops = make_property_op_array(self, base->ID, NULL, 0, 0);
2328     do_rait_child_ops(self, property_get_do_op, ops);
2329
2330     /* combine the child values, applying a simple AND */
2331     result = TRUE;
2332     success = TRUE;
2333     for (i = 0; i < ops->len; i ++) {
2334         PropertyOp * op = g_ptr_array_index(ops, i);
2335
2336         if (!op->base.result || !G_VALUE_HOLDS_BOOLEAN(&(op->value))) {
2337             success = FALSE;
2338             break;
2339         }
2340
2341         if (!g_value_get_boolean(&(op->value))) {
2342             result = FALSE;
2343             break;
2344         }
2345     }
2346
2347     g_ptr_array_free_full(ops);
2348
2349     if (success) {
2350         if (val) {
2351             g_value_unset_init(val, G_TYPE_BOOLEAN);
2352             g_value_set_boolean(val, result);
2353         }
2354
2355         if (surety)
2356             *surety = PROPERTY_SURETY_GOOD;
2357
2358         if (source)
2359             *source = PROPERTY_SOURCE_DETECTED;
2360     }
2361
2362     return success;
2363 }
2364
2365 static gboolean
2366 property_get_medium_access_type_fn(Device *dself,
2367     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2368     PropertySurety *surety, PropertySource *source)
2369 {
2370     RaitDevice *self = RAIT_DEVICE(dself);
2371     MediaAccessMode result;
2372     guint i;
2373     GPtrArray * ops;
2374     gboolean success;
2375
2376     ops = make_property_op_array(self, PROPERTY_MEDIUM_ACCESS_TYPE, NULL, 0, 0);
2377     do_rait_child_ops(self, property_get_do_op, ops);
2378
2379     /* combine the modes as best we can */
2380     result = 0;
2381     success = TRUE;
2382     for (i = 0; i < ops->len; i ++) {
2383         MediaAccessMode cur;
2384         PropertyOp * op = g_ptr_array_index(ops, i);
2385
2386         if (!op->base.result || G_VALUE_TYPE(&(op->value)) != MEDIA_ACCESS_MODE_TYPE) {
2387             success = FALSE;
2388             break;
2389         }
2390
2391         cur = g_value_get_enum(&(op->value));
2392
2393         if (i == 0) {
2394             result = cur;
2395         } else if ((result == MEDIA_ACCESS_MODE_READ_ONLY &&
2396                     cur == MEDIA_ACCESS_MODE_WRITE_ONLY) ||
2397                    (result == MEDIA_ACCESS_MODE_WRITE_ONLY &&
2398                     cur == MEDIA_ACCESS_MODE_READ_ONLY)) {
2399             /* Invalid combination; one device can only read, other
2400                can only write. */
2401             success = FALSE;
2402             break;
2403         } else if (result == MEDIA_ACCESS_MODE_READ_ONLY ||
2404                    cur == MEDIA_ACCESS_MODE_READ_ONLY) {
2405             result = MEDIA_ACCESS_MODE_READ_ONLY;
2406         } else if (result == MEDIA_ACCESS_MODE_WRITE_ONLY ||
2407                    cur == MEDIA_ACCESS_MODE_WRITE_ONLY) {
2408             result = MEDIA_ACCESS_MODE_WRITE_ONLY;
2409         } else if (result == MEDIA_ACCESS_MODE_WORM ||
2410                    cur == MEDIA_ACCESS_MODE_WORM) {
2411             result = MEDIA_ACCESS_MODE_WORM;
2412         } else if (result == MEDIA_ACCESS_MODE_READ_WRITE &&
2413                    cur == MEDIA_ACCESS_MODE_READ_WRITE) {
2414             result = MEDIA_ACCESS_MODE_READ_WRITE;
2415         } else {
2416             success = FALSE;
2417             break;
2418         }
2419     }
2420
2421     g_ptr_array_free_full(ops);
2422
2423     if (success) {
2424         if (val) {
2425             g_value_unset_init(val, MEDIA_ACCESS_MODE_TYPE);
2426             g_value_set_enum(val, result);
2427         }
2428
2429         if (surety)
2430             *surety = PROPERTY_SURETY_GOOD;
2431
2432         if (source)
2433             *source = PROPERTY_SOURCE_DETECTED;
2434     }
2435
2436     return success;
2437 }
2438
2439 static gboolean
2440 property_get_free_space_fn(Device *dself,
2441     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2442     PropertySurety *surety, PropertySource *source)
2443 {
2444     RaitDevice *self = RAIT_DEVICE(dself);
2445     QualifiedSize result;
2446     guint i;
2447     GPtrArray * ops;
2448     guint data_children;
2449
2450     ops = make_property_op_array(self, PROPERTY_MEDIUM_ACCESS_TYPE, NULL, 0, 0);
2451     do_rait_child_ops(self, property_get_do_op, ops);
2452
2453     /* Find the minimal available space of any child, with some funny business
2454      * to deal with varying degrees of accuracy. */
2455     result.accuracy = SIZE_ACCURACY_UNKNOWN;
2456     result.bytes = 0;
2457     for (i = 0; i < ops->len; i ++) {
2458         QualifiedSize cur;
2459         PropertyOp * op = g_ptr_array_index(ops, i);
2460
2461         if (!op->base.result || G_VALUE_TYPE(&(op->value)) != QUALIFIED_SIZE_TYPE) {
2462             /* maybe this child can't tell us .. so this is just an estimate */
2463             if (result.accuracy == SIZE_ACCURACY_REAL)
2464                 result.accuracy = SIZE_ACCURACY_ESTIMATE;
2465
2466             continue;
2467         }
2468
2469         cur = *(QualifiedSize*)(g_value_get_boxed(&(op->value)));
2470
2471         if (result.accuracy != cur.accuracy) {
2472             result.accuracy = SIZE_ACCURACY_ESTIMATE;
2473         }
2474
2475         if (result.accuracy == SIZE_ACCURACY_UNKNOWN &&
2476             cur.accuracy != SIZE_ACCURACY_UNKNOWN) {
2477             result.bytes = cur.bytes;
2478         } else if (result.accuracy != SIZE_ACCURACY_UNKNOWN &&
2479                    cur.accuracy == SIZE_ACCURACY_UNKNOWN) {
2480             /* result.bytes unchanged. */
2481         } else {
2482             result.bytes = MIN(result.bytes, cur.bytes);
2483         }
2484     }
2485
2486     g_ptr_array_free_full(ops);
2487
2488     /* result contains the minimum size available on any child.  We
2489      * can use that space on each of our data children, so the total
2490      * is larger */
2491     find_simple_params(self, NULL, &data_children);
2492     result.bytes *= data_children;
2493
2494     if (val) {
2495         g_value_unset_init(val, QUALIFIED_SIZE_TYPE);
2496         g_value_set_boxed(val, &result);
2497     }
2498
2499     if (surety)
2500         *surety = (result.accuracy == SIZE_ACCURACY_UNKNOWN)?
2501                     PROPERTY_SURETY_BAD : PROPERTY_SURETY_GOOD;
2502
2503     if (source)
2504         *source = PROPERTY_SOURCE_DETECTED;
2505
2506     return TRUE;
2507 }
2508
2509 static gboolean
2510 property_get_max_volume_usage_fn(Device *dself,
2511     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2512     PropertySurety *surety, PropertySource *source)
2513 {
2514     RaitDevice *self = RAIT_DEVICE(dself);
2515     guint64 result;
2516     guint i;
2517     GPtrArray * ops;
2518     guint data_children;
2519
2520     ops = make_property_op_array(self, PROPERTY_MAX_VOLUME_USAGE, NULL, 0, 0);
2521     do_rait_child_ops(self, property_get_do_op, ops);
2522
2523     /* look for the smallest value that is set */
2524     result = 0;
2525     for (i = 0; i < ops->len; i ++) {
2526         guint64 cur;
2527         PropertyOp * op = g_ptr_array_index(ops, i);
2528
2529         if (!op->base.result || !G_VALUE_HOLDS_UINT64(&(op->value))) {
2530             continue; /* ignore children without this property */
2531         }
2532
2533         cur = g_value_get_uint64(&(op->value));
2534
2535         result = MIN(cur, result);
2536     }
2537
2538     g_ptr_array_free_full(ops);
2539
2540     if (result) {
2541         /* result contains the minimum usage on any child.  We can use that space
2542          * on each of our data children, so the total is larger */
2543         find_simple_params(self, NULL, &data_children);
2544         result *= data_children;
2545
2546         if (val) {
2547             g_value_unset_init(val, G_TYPE_UINT64);
2548             g_value_set_uint64(val, result);
2549         }
2550
2551         if (surety)
2552             *surety = PROPERTY_SURETY_GOOD;
2553
2554         if (source)
2555             *source = PROPERTY_SOURCE_DETECTED;
2556
2557         return TRUE;
2558     } else {
2559         /* no result from any children, so we effectively don't have this property */
2560         return FALSE;
2561     }
2562 }
2563
2564 static gboolean
2565 property_set_max_volume_usage_fn(Device *dself,
2566     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2567     PropertySurety surety, PropertySource source)
2568 {
2569     RaitDevice *self = RAIT_DEVICE(dself);
2570     guint64 parent_usage;
2571     guint64 child_usage;
2572     GValue child_val;
2573     guint i;
2574     gboolean success;
2575     GPtrArray * ops;
2576     guint data_children;
2577
2578     parent_usage = g_value_get_uint64(val);
2579     find_simple_params(self, NULL, &data_children);
2580
2581     child_usage = parent_usage / data_children;
2582
2583     bzero(&child_val, sizeof(child_val));
2584     g_value_init(&child_val, G_TYPE_UINT64);
2585     g_value_set_uint64(&child_val, child_usage);
2586
2587     ops = make_property_op_array(self, PROPERTY_MAX_VOLUME_USAGE,
2588                                 &child_val, surety, source);
2589     do_rait_child_ops(self, property_set_do_op, ops);
2590
2591     /* if any of the kids succeeded, then we did too */
2592     success = FALSE;
2593     for (i = 0; i < ops->len; i ++) {
2594         PropertyOp * op = g_ptr_array_index(ops, i);
2595
2596         if (op->base.result) {
2597             success = TRUE;
2598             break;
2599         }
2600     }
2601
2602     g_ptr_array_free_full(ops);
2603
2604     return success;
2605 }
2606
2607 typedef struct {
2608     GenericOp base;
2609     guint filenum;
2610 } RecycleFileOp;
2611
2612 /* A GFunc */
2613 static void recycle_file_do_op(gpointer data,
2614                                gpointer user_data G_GNUC_UNUSED) {
2615     RecycleFileOp * op = data;
2616     op->base.result =
2617         GINT_TO_POINTER(device_recycle_file(op->base.child, op->filenum));
2618 }
2619
2620 static gboolean 
2621 rait_device_recycle_file (Device * dself, guint filenum) {
2622     GPtrArray * ops;
2623     guint i;
2624     gboolean success;
2625
2626     RaitDevice * self = RAIT_DEVICE(dself);
2627
2628     if (rait_device_in_error(self)) return FALSE;
2629
2630     ops = g_ptr_array_sized_new(self->private->children->len);
2631     for (i = 0; i < self->private->children->len; i ++) {
2632         RecycleFileOp * op;
2633         op = g_new(RecycleFileOp, 1);
2634         op->base.child = g_ptr_array_index(self->private->children, i);
2635         op->filenum = filenum;
2636         g_ptr_array_add(ops, op);
2637     }
2638     
2639     do_rait_child_ops(self, recycle_file_do_op, ops);
2640
2641     success = g_ptr_array_and(ops, extract_boolean_generic_op);
2642
2643     g_ptr_array_free_full(ops);
2644
2645     if (!success) {
2646         /* TODO: be more specific here */
2647         device_set_error(dself,
2648             stralloc(_("One or more devices failed to recycle_file")),
2649             DEVICE_STATUS_DEVICE_ERROR);
2650         return FALSE;
2651     }
2652     return TRUE;
2653 }
2654
2655 /* GFunc */
2656 static void finish_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
2657     GenericOp * op = data;
2658     op->result = GINT_TO_POINTER(device_finish(op->child));
2659 }
2660
2661 static gboolean 
2662 rait_device_finish (Device * self) {
2663     GPtrArray * ops;
2664     gboolean success;
2665
2666     if (rait_device_in_error(self)) return FALSE;
2667
2668     ops = make_generic_boolean_op_array(RAIT_DEVICE(self));
2669     
2670     do_rait_child_ops(RAIT_DEVICE(self), finish_do_op, ops);
2671
2672     success = g_ptr_array_and(ops, extract_boolean_generic_op);
2673
2674     g_ptr_array_free_full(ops);
2675
2676     self->access_mode = ACCESS_NULL;
2677
2678     if (!success)
2679         return FALSE;
2680
2681     return TRUE;
2682 }
2683
2684 static Device *
2685 rait_device_factory (char * device_name, char * device_type, char * device_node) {
2686     Device * rval;
2687     g_assert(0 == strcmp(device_type, "rait"));
2688     rval = DEVICE(g_object_new(TYPE_RAIT_DEVICE, NULL));
2689     device_open_device(rval, device_name, device_type, device_node);
2690     return rval;
2691 }
2692
2693 void 
2694 rait_device_register (void) {
2695     static const char * device_prefix_list[] = {"rait", NULL};
2696     register_device(rait_device_factory, device_prefix_list);
2697 }