daeb5c8855f09c7a1b33870df25e610f453e590d
[debian/amanda] / device-src / rait-device.c
1 /*
2  * Copyright (c) 2007,2008,2009 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  *
17  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 /* The RAIT device encapsulates some number of other devices into a single
22  * redundant device. */
23
24 #include <amanda.h>
25 #include "property.h"
26 #include "util.h"
27 #include <glib.h>
28 #include "glib-util.h"
29 #include "device.h"
30 #include "fileheader.h"
31 #include "semaphore.h"
32
33 /* Just a note about the failure mode of different operations:
34    - Recovers from a failure (enters degraded mode)
35      open_device()
36      seek_file() -- explodes if headers don't match.
37      seek_block() -- explodes if headers don't match.
38      read_block() -- explodes if data doesn't match.
39
40    - Operates in degraded mode (but dies if a new problem shows up)
41      read_label() -- but dies on label mismatch.
42      start() -- but dies when writing in degraded mode.
43      property functions
44      finish()
45
46    - Dies in degraded mode (even if remaining devices are OK)
47      start_file()
48      write_block()
49      finish_file()
50      recycle_file()
51 */
52
53 /*
54  * Type checking and casting macros
55  */
56 #define TYPE_RAIT_DEVICE        (rait_device_get_type())
57 #define RAIT_DEVICE(obj)        G_TYPE_CHECK_INSTANCE_CAST((obj), rait_device_get_type(), RaitDevice)
58 #define RAIT_DEVICE_CONST(obj)  G_TYPE_CHECK_INSTANCE_CAST((obj), rait_device_get_type(), RaitDevice const)
59 #define RAIT_DEVICE_CLASS(klass)        G_TYPE_CHECK_CLASS_CAST((klass), rait_device_get_type(), RaitDeviceClass)
60 #define IS_RAIT_DEVICE(obj)     G_TYPE_CHECK_INSTANCE_TYPE((obj), rait_device_get_type ())
61
62 #define RAIT_DEVICE_GET_CLASS(obj)      G_TYPE_INSTANCE_GET_CLASS((obj), rait_device_get_type(), RaitDeviceClass)
63 static GType    rait_device_get_type    (void);
64
65 /*
66  * Main object structure
67  */
68 typedef struct RaitDevice_s {
69     Device __parent__;
70
71     struct RaitDevicePrivate_s * private;
72 } RaitDevice;
73
74 /*
75  * Class definition
76  */
77 typedef struct _RaitDeviceClass RaitDeviceClass;
78 struct _RaitDeviceClass {
79     DeviceClass __parent__;
80 };
81
82 typedef enum {
83     RAIT_STATUS_COMPLETE, /* All subdevices OK. */
84     RAIT_STATUS_DEGRADED, /* One subdevice failed. */
85     RAIT_STATUS_FAILED    /* Two or more subdevices failed. */
86 } RaitStatus;
87
88 /* Older versions of glib have a deadlock in their thread pool implementations,
89  * so we include a simple thread-pool implementation here to replace it.
90  *
91  * This implementation assumes that threads are used for paralellizing a single
92  * operation, so all threads run a function to completion before the main thread
93  * continues.  This simplifies some of the locking semantics, and in particular
94  * there is no need to wait for stray threads to finish an operation when
95  * finalizing the RaitDevice object or when beginning a new operation.
96  */
97 #if !(GLIB_CHECK_VERSION(2,10,0))
98 #define USE_INTERNAL_THREADPOOL
99 #endif
100
101 typedef struct RaitDevicePrivate_s {
102     GPtrArray * children;
103     /* These flags are only relevant for reading. */
104     RaitStatus status;
105     /* If status == RAIT_STATUS_DEGRADED, this holds the index of the
106        failed node. It holds a negative number otherwise. */
107     int failed;
108
109     /* the child block size */
110     gsize child_block_size;
111
112 #ifdef USE_INTERNAL_THREADPOOL
113     /* array of ThreadInfo for performing parallel operations */
114     GArray *threads;
115
116     /* value of this semaphore is the number of threaded operations
117      * in progress */
118     semaphore_t *threads_sem;
119 #endif
120 } RaitDevicePrivate;
121
122 #ifdef USE_INTERNAL_THREADPOOL
123 typedef struct ThreadInfo {
124     GThread *thread;
125
126     /* struct fields below are protected by this mutex and condition variable */
127     GMutex *mutex;
128     GCond *cond;
129
130     gboolean die;
131     GFunc func;
132     gpointer data;
133
134     /* give threads access to active_threads and its mutex/cond */
135     struct RaitDevicePrivate_s *private;
136 } ThreadInfo;
137 #endif
138
139 /* This device uses a special sentinel node to indicate that the child devices
140  * will be set later (in rait_device_open).  It contains a control character to
141  * make it difficult to enter accidentally in an Amanda config. */
142 #define DEFER_CHILDREN_SENTINEL "DEFER\1"
143
144 #define PRIVATE(o) (o->private)
145
146 #define rait_device_in_error(dev) \
147     (device_in_error((dev)) || PRIVATE(RAIT_DEVICE((dev)))->status == RAIT_STATUS_FAILED)
148
149 void rait_device_register (void);
150
151 /* here are local prototypes */
152 static void rait_device_init (RaitDevice * o);
153 static void rait_device_class_init (RaitDeviceClass * c);
154 static void rait_device_base_init (RaitDeviceClass * c);
155 static void rait_device_open_device (Device * self, char * device_name, char * device_type, char * device_node);
156 static gboolean rait_device_start (Device * self, DeviceAccessMode mode,
157                                    char * label, char * timestamp);
158 static gboolean rait_device_configure(Device * self, gboolean use_global_config);
159 static gboolean rait_device_start_file(Device * self, dumpfile_t * info);
160 static gboolean rait_device_write_block (Device * self, guint size, gpointer data);
161 static gboolean rait_device_finish_file (Device * self);
162 static dumpfile_t * rait_device_seek_file (Device * self, guint file);
163 static gboolean rait_device_seek_block (Device * self, guint64 block);
164 static int      rait_device_read_block (Device * self, gpointer buf,
165                                         int * size);
166 static gboolean rait_device_recycle_file (Device * self, guint filenum);
167 static gboolean rait_device_finish (Device * self);
168 static DeviceStatusFlags rait_device_read_label(Device * dself);
169 static void find_simple_params(RaitDevice * self, guint * num_children,
170                                guint * data_children);
171
172 /* property handlers */
173
174 static gboolean property_get_block_size_fn(Device *self,
175     DevicePropertyBase *base, GValue *val,
176     PropertySurety *surety, PropertySource *source);
177
178 static gboolean property_set_block_size_fn(Device *self,
179     DevicePropertyBase *base, GValue *val,
180     PropertySurety surety, PropertySource source);
181
182 static gboolean property_get_canonical_name_fn(Device *self,
183     DevicePropertyBase *base, GValue *val,
184     PropertySurety *surety, PropertySource *source);
185
186 static gboolean property_get_concurrency_fn(Device *self,
187     DevicePropertyBase *base, GValue *val,
188     PropertySurety *surety, PropertySource *source);
189
190 static gboolean property_get_streaming_fn(Device *self,
191     DevicePropertyBase *base, GValue *val,
192     PropertySurety *surety, PropertySource *source);
193
194 static gboolean property_get_boolean_and_fn(Device *self,
195     DevicePropertyBase *base, GValue *val,
196     PropertySurety *surety, PropertySource *source);
197
198 static gboolean property_get_medium_access_type_fn(Device *self,
199     DevicePropertyBase *base, GValue *val,
200     PropertySurety *surety, PropertySource *source);
201
202 static gboolean property_get_free_space_fn(Device *self,
203     DevicePropertyBase *base, GValue *val,
204     PropertySurety *surety, PropertySource *source);
205
206 static gboolean property_get_max_volume_usage_fn(Device *self,
207     DevicePropertyBase *base, GValue *val,
208     PropertySurety *surety, PropertySource *source);
209
210 static gboolean property_set_max_volume_usage_fn(Device *self,
211     DevicePropertyBase *base, GValue *val,
212     PropertySurety surety, PropertySource source);
213
214
215 /* pointer to the class of our parent */
216 static DeviceClass *parent_class = NULL;
217
218 static GType
219 rait_device_get_type (void)
220 {
221     static GType type = 0;
222
223     if G_UNLIKELY(type == 0) {
224         static const GTypeInfo info = {
225             sizeof (RaitDeviceClass),
226             (GBaseInitFunc) rait_device_base_init,
227             (GBaseFinalizeFunc) NULL,
228             (GClassInitFunc) rait_device_class_init,
229             (GClassFinalizeFunc) NULL,
230             NULL /* class_data */,
231             sizeof (RaitDevice),
232             0 /* n_preallocs */,
233             (GInstanceInitFunc) rait_device_init,
234             NULL
235         };
236
237         type = g_type_register_static (TYPE_DEVICE, "RaitDevice", &info,
238                                        (GTypeFlags)0);
239         }
240
241     return type;
242 }
243
244 static void g_object_unref_foreach(gpointer data,
245                                    gpointer user_data G_GNUC_UNUSED) {
246     if (data != NULL && G_IS_OBJECT(data)) {
247         g_object_unref(data);
248     }
249 }
250
251 static void
252 rait_device_finalize(GObject *obj_self)
253 {
254     RaitDevice *self = RAIT_DEVICE (obj_self);
255     if(G_OBJECT_CLASS(parent_class)->finalize) \
256            (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
257     if(self->private->children) {
258         g_ptr_array_foreach(self->private->children,
259                             g_object_unref_foreach, NULL);
260         g_ptr_array_free (self->private->children, TRUE);
261         self->private->children = NULL;
262     }
263 #ifdef USE_INTERNAL_THREADPOOL
264     g_assert(PRIVATE(self)->threads_sem == NULL || PRIVATE(self)->threads_sem->value == 0);
265
266     if (PRIVATE(self)->threads) {
267         guint i;
268
269         for (i = 0; i < PRIVATE(self)->threads->len; i++) {
270             ThreadInfo *inf = &g_array_index(PRIVATE(self)->threads, ThreadInfo, i);
271             if (inf->thread) {
272                 /* NOTE: the thread is waiting on this condition right now, not
273                  * executing an operation. */
274
275                 /* ask the thread to die */
276                 g_mutex_lock(inf->mutex);
277                 inf->die = TRUE;
278                 g_cond_signal(inf->cond);
279                 g_mutex_unlock(inf->mutex);
280
281                 /* and wait for it to die, which should happen soon */
282                 g_thread_join(inf->thread);
283             }
284
285             if (inf->mutex)
286                 g_mutex_free(inf->mutex);
287             if (inf->cond)
288                 g_cond_free(inf->cond);
289         }
290     }
291
292     if (PRIVATE(self)->threads_sem)
293         semaphore_free(PRIVATE(self)->threads_sem);
294 #endif
295     amfree(self->private);
296 }
297
298 static void
299 rait_device_init (RaitDevice * o G_GNUC_UNUSED)
300 {
301     PRIVATE(o) = g_new(RaitDevicePrivate, 1);
302     PRIVATE(o)->children = g_ptr_array_new();
303     PRIVATE(o)->status = RAIT_STATUS_COMPLETE;
304     PRIVATE(o)->failed = -1;
305 #ifdef USE_INTERNAL_THREADPOOL
306     PRIVATE(o)->threads = NULL;
307     PRIVATE(o)->threads_sem = NULL;
308 #endif
309 }
310
311 static void
312 rait_device_class_init (RaitDeviceClass * c)
313 {
314     GObjectClass *g_object_class = (GObjectClass*) c;
315     DeviceClass *device_class = (DeviceClass *)c;
316
317     parent_class = g_type_class_ref (TYPE_DEVICE);
318
319     device_class->open_device = rait_device_open_device;
320     device_class->configure = rait_device_configure;
321     device_class->start = rait_device_start;
322     device_class->start_file = rait_device_start_file;
323     device_class->write_block = rait_device_write_block;
324     device_class->finish_file = rait_device_finish_file;
325     device_class->seek_file = rait_device_seek_file;
326     device_class->seek_block = rait_device_seek_block;
327     device_class->read_block = rait_device_read_block;
328     device_class->recycle_file = rait_device_recycle_file;
329     device_class->finish = rait_device_finish;
330     device_class->read_label = rait_device_read_label;
331
332     g_object_class->finalize = rait_device_finalize;
333
334 #ifndef USE_INTERNAL_THREADPOOL
335 #if !GLIB_CHECK_VERSION(2,10,2)
336     /* Versions of glib before 2.10.2 crash if
337      * g_thread_pool_set_max_unused_threads is called before the first
338      * invocation of g_thread_pool_new.  So we make up a thread pool, but don't
339      * start any threads in it, and free it */
340     {
341         GThreadPool *pool = g_thread_pool_new((GFunc)-1, NULL, -1, FALSE, NULL);
342         g_thread_pool_free(pool, TRUE, FALSE);
343     }
344 #endif
345
346     g_thread_pool_set_max_unused_threads(-1);
347 #endif
348 }
349
350 static void
351 rait_device_base_init (RaitDeviceClass * c)
352 {
353     DeviceClass *device_class = (DeviceClass *)c;
354
355     /* the RAIT device overrides most of the standard properties, so that it
356      * can calculate them by querying the same property on the children */
357     device_class_register_property(device_class, PROPERTY_BLOCK_SIZE,
358             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
359             property_get_block_size_fn,
360             property_set_block_size_fn);
361
362     device_class_register_property(device_class, PROPERTY_CANONICAL_NAME,
363             PROPERTY_ACCESS_GET_MASK,
364             property_get_canonical_name_fn, NULL);
365
366     device_class_register_property(device_class, PROPERTY_CONCURRENCY,
367             PROPERTY_ACCESS_GET_MASK,
368             property_get_concurrency_fn, NULL);
369
370     device_class_register_property(device_class, PROPERTY_STREAMING,
371             PROPERTY_ACCESS_GET_MASK,
372             property_get_streaming_fn, NULL);
373
374     device_class_register_property(device_class, PROPERTY_APPENDABLE,
375             PROPERTY_ACCESS_GET_MASK,
376             property_get_boolean_and_fn, NULL);
377
378     device_class_register_property(device_class, PROPERTY_PARTIAL_DELETION,
379             PROPERTY_ACCESS_GET_MASK,
380             property_get_boolean_and_fn, NULL);
381
382     device_class_register_property(device_class, PROPERTY_FULL_DELETION,
383             PROPERTY_ACCESS_GET_MASK,
384             property_get_boolean_and_fn, NULL);
385
386     device_class_register_property(device_class, PROPERTY_MEDIUM_ACCESS_TYPE,
387             PROPERTY_ACCESS_GET_MASK,
388             property_get_medium_access_type_fn, NULL);
389
390     device_class_register_property(device_class, PROPERTY_FREE_SPACE,
391             PROPERTY_ACCESS_GET_MASK,
392             property_get_free_space_fn, NULL);
393
394     device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
395             PROPERTY_ACCESS_GET_MASK,
396             property_get_max_volume_usage_fn,
397             property_set_max_volume_usage_fn);
398 }
399
400 /* This function does something a little clever and a little
401  * complicated. It takes an array of operations and runs the given
402  * function on each element in the array. The trick is that it runs them
403  * all in parallel, in different threads. This is more efficient than it
404  * sounds because we use a GThreadPool, which means calling this function
405  * will probably not start any new threads at all, but rather use
406  * existing ones. The func is called with two gpointer arguments: The
407  * first from the array, the second is the data argument.
408  *
409  * When it returns, all the operations have been successfully
410  * executed. If you want results from your operations, do it yourself
411  * through the array.
412  */
413
414 #ifdef USE_INTERNAL_THREADPOOL
415 static gpointer rait_thread_pool_func(gpointer data) {
416     ThreadInfo *inf = data;
417
418     g_mutex_lock(inf->mutex);
419     while (TRUE) {
420         while (!inf->die && !inf->func)
421             g_cond_wait(inf->cond, inf->mutex);
422
423         if (inf->die)
424             break;
425
426         if (inf->func) {
427             /* invoke the function */
428             inf->func(inf->data, NULL);
429             inf->func = NULL;
430             inf->data = NULL;
431
432             /* indicate that we're finished; will not block */
433             semaphore_down(inf->private->threads_sem);
434         }
435     }
436     g_mutex_unlock(inf->mutex);
437     return NULL;
438 }
439
440 static void do_thread_pool_op(RaitDevice *self, GFunc func, GPtrArray * ops) {
441     guint i;
442
443     if (PRIVATE(self)->threads_sem == NULL)
444         PRIVATE(self)->threads_sem = semaphore_new_with_value(0);
445
446     if (PRIVATE(self)->threads == NULL)
447         PRIVATE(self)->threads = g_array_sized_new(FALSE, TRUE,
448                                             sizeof(ThreadInfo), ops->len);
449
450     g_assert(PRIVATE(self)->threads_sem->value == 0);
451
452     if (PRIVATE(self)->threads->len < ops->len)
453         g_array_set_size(PRIVATE(self)->threads, ops->len);
454
455     /* the semaphore will hit zero when each thread has decremented it */
456     semaphore_force_set(PRIVATE(self)->threads_sem, ops->len);
457
458     for (i = 0; i < ops->len; i++) {
459         ThreadInfo *inf = &g_array_index(PRIVATE(self)->threads, ThreadInfo, i);
460         if (!inf->thread) {
461             inf->mutex = g_mutex_new();
462             inf->cond = g_cond_new();
463             inf->private = PRIVATE(self);
464             inf->thread = g_thread_create(rait_thread_pool_func, inf, TRUE, NULL);
465         }
466
467         /* set up the info the thread needs and trigger it to start */
468         g_mutex_lock(inf->mutex);
469         inf->data = g_ptr_array_index(ops, i);
470         inf->func = func;
471         g_cond_signal(inf->cond);
472         g_mutex_unlock(inf->mutex);
473     }
474
475     /* wait until semaphore hits zero */
476     semaphore_wait_empty(PRIVATE(self)->threads_sem);
477 }
478
479 #else /* USE_INTERNAL_THREADPOOL */
480
481 static void do_thread_pool_op(RaitDevice *self G_GNUC_UNUSED, GFunc func, GPtrArray * ops) {
482     GThreadPool * pool;
483     guint i;
484
485     pool = g_thread_pool_new(func, NULL, -1, FALSE, NULL);
486     for (i = 0; i < ops->len; i ++) {
487         g_thread_pool_push(pool, g_ptr_array_index(ops, i), NULL);
488     }
489
490     g_thread_pool_free(pool, FALSE, TRUE);
491 }
492
493 #endif /* USE_INTERNAL_THREADPOOL */
494
495 /* This does the above, in a serial fashion (and without using threads) */
496 static void do_unthreaded_ops(RaitDevice *self G_GNUC_UNUSED, GFunc func, GPtrArray * ops) {
497     guint i;
498
499     for (i = 0; i < ops->len; i ++) {
500         func(g_ptr_array_index(ops, i), NULL);
501     }
502 }
503
504 /* This is the one that code below should call. It switches
505    automatically between do_thread_pool_op and do_unthreaded_ops,
506    depending on g_thread_supported(). */
507 static void do_rait_child_ops(RaitDevice *self, GFunc func, GPtrArray * ops) {
508     if (g_thread_supported()) {
509         do_thread_pool_op(self, func, ops);
510     } else {
511         do_unthreaded_ops(self, func, ops);
512     }
513 }
514
515 static char *
516 child_device_names_to_rait_name(RaitDevice * self) {
517     GPtrArray *kids;
518     char *braced, *result;
519     guint i;
520
521     kids = g_ptr_array_sized_new(self->private->children->len);
522     for (i = 0; i < self->private->children->len; i ++) {
523         Device *child = g_ptr_array_index(self->private->children, i);
524         const char *child_name = NULL;
525         GValue val;
526         gboolean got_prop = FALSE;
527
528         bzero(&val, sizeof(val));
529
530         if ((signed)i != self->private->failed) {
531             if (device_property_get(child, PROPERTY_CANONICAL_NAME, &val)) {
532                 child_name = g_value_get_string(&val);
533                 got_prop = TRUE;
534             }
535         }
536
537         if (!got_prop)
538             child_name = "MISSING";
539
540         g_ptr_array_add(kids, g_strdup(child_name));
541
542         if (got_prop)
543             g_value_unset(&val);
544     }
545
546     braced = collapse_braced_alternates(kids);
547     result = g_strdup_printf("rait:%s", braced);
548     g_free(braced);
549
550     return result;
551 }
552
553 /* Find a workable child block size, based on the block size ranges of our
554  * child devices.
555  *
556  * The algorithm is to construct the intersection of all child devices'
557  * [min,max] block size ranges, and then pick the block size closest to 32k
558  * that is in the resulting range.  This avoids picking ridiculously small (1
559  * byte) or large (INT_MAX) block sizes when using devices with wide-open block
560  * size ranges.
561
562  * This function returns the calculated child block size directly, and the RAIT
563  * device's blocksize via rait_size, if not NULL.  It is resilient to errors in
564  * a single child device, but sets the device's error status and returns 0 if
565  * it cannot determine an agreeable block size.
566  */
567 static gsize
568 calculate_block_size_from_children(RaitDevice * self, gsize *rait_size)
569 {
570     gsize min = 0;
571     gsize max = SIZE_MAX;
572     gboolean found_one = FALSE;
573     gsize result;
574     guint i;
575
576     for (i = 0; i < self->private->children->len; i ++) {
577         gsize child_min = SIZE_MAX, child_max = 0;
578         Device *child;
579         GValue property_result;
580         PropertySource source;
581
582         bzero(&property_result, sizeof(property_result));
583
584         if ((signed)i == self->private->failed)
585             continue;
586
587         child = g_ptr_array_index(self->private->children, i);
588         if (!device_property_get_ex(child, PROPERTY_BLOCK_SIZE,
589                                  &property_result, NULL, &source)) {
590             g_warning("Error getting BLOCK_SIZE from %s: %s",
591                     child->device_name, device_error_or_status(child));
592             continue;
593         }
594
595         /* if the block size has been set explicitly, then we need to use that blocksize;
596          * otherwise (even if it was DETECTED), override it. */
597         if (source == PROPERTY_SOURCE_USER) {
598             child_min = child_max = g_value_get_int(&property_result);
599         } else {
600             if (!device_property_get(child, PROPERTY_MIN_BLOCK_SIZE,
601                                      &property_result)) {
602                 g_warning("Error getting MIN_BLOCK_SIZE from %s: %s",
603                         child->device_name, device_error_or_status(child));
604                 continue;
605             }
606             child_min = g_value_get_uint(&property_result);
607
608             if (!device_property_get(child, PROPERTY_MAX_BLOCK_SIZE,
609                                      &property_result)) {
610                 g_warning("Error getting MAX_BLOCK_SIZE from %s: %s",
611                         child->device_name, device_error_or_status(child));
612                 continue;
613             }
614             child_max = g_value_get_uint(&property_result);
615
616             if (child_min == 0 || child_max == 0 || (child_min > child_max)) {
617                 g_warning("Invalid min, max block sizes from %s", child->device_name);
618                 continue;
619             }
620         }
621
622         found_one = TRUE;
623         min = MAX(min, child_min);
624         max = MIN(max, child_max);
625     }
626
627     if (!found_one) {
628         device_set_error((Device*)self,
629             stralloc(_("Could not find any child devices' block size ranges")),
630             DEVICE_STATUS_DEVICE_ERROR);
631         return 0;
632     }
633
634     if (min > max) {
635         device_set_error((Device*)self,
636             stralloc(_("No block size is acceptable to all child devices")),
637             DEVICE_STATUS_DEVICE_ERROR);
638         return 0;
639     }
640
641     /* Now pick a number.  If 32k is in range, we use that; otherwise, we use
642      * the nearest acceptable size. */
643     result = CLAMP(32768, min, max);
644
645     if (rait_size) {
646         guint data_children;
647         find_simple_params(self, NULL, &data_children);
648         *rait_size = result * data_children;
649     }
650
651     return result;
652 }
653
654 /* Set BLOCK_SIZE on all children */
655 static gboolean
656 set_block_size_on_children(RaitDevice *self, gsize child_block_size)
657 {
658     GValue val;
659     guint i;
660     PropertySource source;
661
662     bzero(&val, sizeof(val));
663
664     g_assert(child_block_size < INT_MAX);
665     g_value_init(&val, G_TYPE_INT);
666     g_value_set_int(&val, (gint)child_block_size);
667
668     for (i = 0; i < self->private->children->len; i ++) {
669         Device *child;
670         GValue property_result;
671
672         bzero(&property_result, sizeof(property_result));
673
674         if ((signed)i == self->private->failed)
675             continue;
676
677         child = g_ptr_array_index(self->private->children, i);
678
679         /* first, make sure the block size is at its default, or is already
680          * correct */
681         if (device_property_get_ex(child, PROPERTY_BLOCK_SIZE,
682                                  &property_result, NULL, &source)) {
683             gsize from_child = g_value_get_int(&property_result);
684             g_value_unset(&property_result);
685             if (source != PROPERTY_SOURCE_DEFAULT
686                     && from_child != child_block_size) {
687                 device_set_error((Device *)self,
688                     vstrallocf(_("Child device %s already has its block size set to %zd, not %zd"),
689                                 child->device_name, from_child, child_block_size),
690                     DEVICE_STATUS_DEVICE_ERROR);
691                 return FALSE;
692             }
693         } else {
694             /* failing to get the block size isn't necessarily fatal.. */
695             g_warning("Error getting BLOCK_SIZE from %s: %s",
696                     child->device_name, device_error_or_status(child));
697         }
698
699         if (!device_property_set(child, PROPERTY_BLOCK_SIZE, &val)) {
700             device_set_error((Device *)self,
701                 vstrallocf(_("Error setting block size on %s"), child->device_name),
702                 DEVICE_STATUS_DEVICE_ERROR);
703             return FALSE;
704         }
705     }
706
707     return TRUE;
708 }
709
710 /* The time for users to specify block sizes has ended; set this device's
711  * block-size attributes for easy access by other RAIT functions.  Returns
712  * FALSE on error, with the device's error status already set. */
713 static gboolean
714 fix_block_size(RaitDevice *self)
715 {
716     Device *dself = (Device *)self;
717     gsize my_block_size, child_block_size;
718
719     if (dself->block_size_source == PROPERTY_SOURCE_DEFAULT) {
720         child_block_size = calculate_block_size_from_children(self, &my_block_size);
721         if (child_block_size == 0)
722             return FALSE;
723
724         self->private->child_block_size = child_block_size;
725         dself->block_size = my_block_size;
726         dself->block_size_surety = PROPERTY_SURETY_GOOD;
727         dself->block_size_source = PROPERTY_SOURCE_DETECTED;
728     } else {
729         guint data_children;
730
731         find_simple_params(self, NULL, &data_children);
732         g_assert((dself->block_size % data_children) == 0);
733         child_block_size = dself->block_size / data_children;
734     }
735
736     /* now tell the children we mean it */
737     if (!set_block_size_on_children(self, child_block_size))
738         return FALSE;
739
740     return TRUE;
741 }
742
743 /* This structure contains common fields for many operations. Not all
744    operations use all fields, however. */
745 typedef struct {
746     gpointer result; /* May be a pointer; may be an integer or boolean
747                         stored with GINT_TO_POINTER. */
748     Device * child;  /* The device in question. Used by all
749                         operations. */
750     guint child_index; /* For recoverable operations (read-related
751                           operations), this field provides the number
752                           of this child in the self->private->children
753                           array. */
754 } GenericOp;
755
756 typedef gboolean (*BooleanExtractor)(gpointer data);
757
758 /* A BooleanExtractor */
759 static gboolean extract_boolean_generic_op(gpointer data) {
760     GenericOp * op = data;
761     return GPOINTER_TO_INT(op->result);
762 }
763
764 /* A BooleanExtractor */
765 static gboolean extract_boolean_pointer_op(gpointer data) {
766     GenericOp * op = data;
767     return op->result != NULL;
768 }
769
770 /* Does the equivalent of this perl command:
771      ! (first { !extractor($_) } @_
772    That is, calls extractor on each element of the array, and returns
773    TRUE if and only if all calls to extractor return TRUE. This function
774    stops as soon as an extractor returns false, so it's best if extractor
775    functions have no side effects.
776 */
777 static gboolean g_ptr_array_and(GPtrArray * array,
778                                 BooleanExtractor extractor) {
779     guint i;
780     if (array == NULL || array->len <= 0)
781         return FALSE;
782
783     for (i = 0; i < array->len; i ++) {
784         if (!extractor(g_ptr_array_index(array, i)))
785             return FALSE;
786     }
787
788     return TRUE;
789 }
790
791 /* Takes a RaitDevice, and makes a GPtrArray of GenericOp. */
792 static GPtrArray * make_generic_boolean_op_array(RaitDevice* self) {
793     GPtrArray * rval;
794     guint i;
795
796     rval = g_ptr_array_sized_new(self->private->children->len);
797     for (i = 0; i < self->private->children->len; i ++) {
798         GenericOp * op;
799
800         if ((signed)i == self->private->failed) {
801             continue;
802         }
803
804         op = g_new(GenericOp, 1);
805         op->child = g_ptr_array_index(self->private->children, i);
806         op->child_index = i;
807         g_ptr_array_add(rval, op);
808     }
809
810     return rval;
811 }
812
813 /* Takes a GPtrArray of GenericOp, and a BooleanExtractor, and does
814    all the proper handling for the result of operations that allow
815    device isolation. Returns FALSE only if an unrecoverable error
816    occured. */
817 static gboolean g_ptr_array_union_robust(RaitDevice * self, GPtrArray * ops,
818                                          BooleanExtractor extractor) {
819     int nfailed = 0;
820     int lastfailed = 0;
821     guint i;
822
823     /* We found one or more failed elements.  See which elements failed, and
824      * isolate them*/
825     for (i = 0; i < ops->len; i ++) {
826         GenericOp * op = g_ptr_array_index(ops, i);
827         if (!extractor(op)) {
828             self->private->failed = op->child_index;
829             g_warning("RAIT array %s isolated device %s: %s",
830                     DEVICE(self)->device_name,
831                     op->child->device_name,
832                     device_error(op->child));
833             nfailed++;
834             lastfailed = i;
835         }
836     }
837
838     /* no failures? great! */
839     if (nfailed == 0)
840         return TRUE;
841
842     /* a single failure in COMPLETE just puts us in DEGRADED mode */
843     if (self->private->status == RAIT_STATUS_COMPLETE && nfailed == 1) {
844         self->private->status = RAIT_STATUS_DEGRADED;
845         self->private->failed = lastfailed;
846         g_warning("RAIT array %s DEGRADED", DEVICE(self)->device_name);
847         return TRUE;
848     } else {
849         self->private->status = RAIT_STATUS_FAILED;
850         g_warning("RAIT array %s FAILED", DEVICE(self)->device_name);
851         return FALSE;
852     }
853 }
854
855 typedef struct {
856     RaitDevice * self;
857     char *rait_name;
858     char * device_name; /* IN */
859     Device * result;    /* OUT */
860 } OpenDeviceOp;
861
862 /* A GFunc. */
863 static void device_open_do_op(gpointer data,
864                               gpointer user_data G_GNUC_UNUSED) {
865     OpenDeviceOp * op = data;
866
867     if (strcmp(op->device_name, "ERROR") == 0 ||
868         strcmp(op->device_name, "MISSING") == 0 ||
869         strcmp(op->device_name, "DEGRADED") == 0) {
870         g_warning("RAIT device %s contains a missing element, attempting "
871                   "degraded mode.\n", op->rait_name);
872         op->result = NULL;
873     } else {
874         op->result = device_open(op->device_name);
875     }
876 }
877
878 /* Returns TRUE if and only if the volume label and time are equal. */
879 static gboolean compare_volume_results(Device * a, Device * b) {
880     return (0 == compare_possibly_null_strings(a->volume_time, b->volume_time)
881          && 0 == compare_possibly_null_strings(a->volume_label, b->volume_label));
882 }
883
884 /* Stickes new_message at the end of *old_message; frees new_message and
885  * may change *old_message. */
886 static void append_message(char ** old_message, char * new_message) {
887     char * rval;
888     if (*old_message == NULL || **old_message == '\0') {
889         rval = new_message;
890     } else {
891         rval = g_strdup_printf("%s; %s", *old_message, new_message);
892         amfree(new_message);
893     }
894     amfree(*old_message);
895     *old_message = rval;
896 }
897
898 static gboolean
899 open_child_devices (Device * dself, char * device_name,
900             char * device_node) {
901     GPtrArray *device_names;
902     GPtrArray * device_open_ops;
903     guint i;
904     gboolean failure;
905     char *failure_errmsgs;
906     DeviceStatusFlags failure_flags;
907     RaitDevice * self;
908
909     self = RAIT_DEVICE(dself);
910
911     device_names = expand_braced_alternates(device_node);
912
913     if (device_names == NULL) {
914         device_set_error(dself,
915             vstrallocf(_("Invalid RAIT device name '%s'"), device_name),
916             DEVICE_STATUS_DEVICE_ERROR);
917         return FALSE;
918     }
919
920     /* Open devices in a separate thread, in case they have to rewind etc. */
921     device_open_ops = g_ptr_array_new();
922
923     for (i = 0; i < device_names->len; i++) {
924         OpenDeviceOp *op;
925         char *name = g_ptr_array_index(device_names, i);
926
927         op = g_new(OpenDeviceOp, 1);
928         op->device_name = name;
929         op->result = NULL;
930         op->self = self;
931         op->rait_name = device_name;
932         g_ptr_array_add(device_open_ops, op);
933     }
934
935     g_ptr_array_free(device_names, TRUE);
936     do_rait_child_ops(self, device_open_do_op, device_open_ops);
937
938     failure = FALSE;
939     failure_errmsgs = NULL;
940     failure_flags = 0;
941
942     /* Check results of opening devices. */
943     for (i = 0; i < device_open_ops->len; i ++) {
944         OpenDeviceOp *op = g_ptr_array_index(device_open_ops, i);
945
946         if (op->result != NULL &&
947             op->result->status == DEVICE_STATUS_SUCCESS) {
948             g_ptr_array_add(self->private->children, op->result);
949         } else {
950             char * this_failure_errmsg =
951                 g_strdup_printf("%s: %s", op->device_name,
952                                 device_error_or_status(op->result));
953             DeviceStatusFlags status =
954                 op->result == NULL ?
955                     DEVICE_STATUS_DEVICE_ERROR : op->result->status;
956             append_message(&failure_errmsgs,
957                            strdup(this_failure_errmsg));
958             failure_flags |= status;
959             if (self->private->status == RAIT_STATUS_COMPLETE) {
960                 /* The first failure just puts us in degraded mode. */
961                 g_warning("%s: %s",
962                           device_name, this_failure_errmsg);
963                 g_warning("%s: %s failed, entering degraded mode.",
964                           device_name, op->device_name);
965                 g_ptr_array_add(self->private->children, op->result);
966                 self->private->status = RAIT_STATUS_DEGRADED;
967                 self->private->failed = i;
968             } else {
969                 /* The second and further failures are fatal. */
970                 failure = TRUE;
971             }
972         }
973         amfree(op->device_name);
974     }
975
976     g_ptr_array_free_full(device_open_ops);
977
978     if (failure) {
979         self->private->status = RAIT_STATUS_FAILED;
980         device_set_error(dself, failure_errmsgs, failure_flags);
981         return FALSE;
982     }
983
984     return TRUE;
985 }
986
987 static void
988 rait_device_open_device (Device * dself, char * device_name,
989             char * device_type G_GNUC_UNUSED, char * device_node) {
990
991     if (0 != strcmp(device_node, DEFER_CHILDREN_SENTINEL)) {
992         if (!open_child_devices(dself, device_name, device_node))
993             return;
994
995         /* Chain up. */
996         if (parent_class->open_device) {
997             parent_class->open_device(dself, device_name, device_type, device_node);
998         }
999     }
1000 }
1001
1002 Device *
1003 rait_device_open_from_children (GSList *child_devices) {
1004     Device *dself;
1005     RaitDevice *self;
1006     GSList *iter;
1007     char *device_name;
1008     int nfailures;
1009     int i;
1010
1011     /* first, open a RAIT device using the DEFER_CHILDREN_SENTINEL */
1012     dself = device_open("rait:" DEFER_CHILDREN_SENTINEL);
1013     if (!IS_RAIT_DEVICE(dself)) {
1014         return dself;
1015     }
1016
1017     /* set its children */
1018     self = RAIT_DEVICE(dself);
1019     nfailures = 0;
1020     for (i=0, iter = child_devices; iter; i++, iter = iter->next) {
1021         Device *kid = iter->data;
1022
1023         /* a NULL kid is OK -- it opens the device in degraded mode */
1024         if (!kid) {
1025             nfailures++;
1026             self->private->failed = i;
1027         } else {
1028             g_assert(IS_DEVICE(kid));
1029             g_object_ref((GObject *)kid);
1030         }
1031
1032         g_ptr_array_add(self->private->children, kid);
1033     }
1034
1035     /* and set the status based on the children */
1036     switch (nfailures) {
1037         case 0:
1038             self->private->status = RAIT_STATUS_COMPLETE;
1039             break;
1040
1041         case 1:
1042             self->private->status = RAIT_STATUS_DEGRADED;
1043             break;
1044
1045         default:
1046             self->private->status = RAIT_STATUS_FAILED;
1047             device_set_error(dself,
1048                     _("more than one child device is missing"),
1049                     DEVICE_STATUS_DEVICE_ERROR);
1050             break;
1051     }
1052
1053     /* create a name from the children's names and use it to chain up
1054      * to open_device (we skipped this step in rait_device_open_device) */
1055     device_name = child_device_names_to_rait_name(self);
1056
1057     if (parent_class->open_device) {
1058         parent_class->open_device(dself,
1059             device_name, "rait",
1060             device_name+5); /* (+5 skips "rait:") */
1061     }
1062
1063     return dself;
1064 }
1065
1066 /* A GFunc. */
1067 static void read_label_do_op(gpointer data,
1068                              gpointer user_data G_GNUC_UNUSED) {
1069     GenericOp * op = data;
1070     op->result = GINT_TO_POINTER(device_read_label(op->child));
1071 }
1072
1073 static DeviceStatusFlags rait_device_read_label(Device * dself) {
1074     RaitDevice * self;
1075     GPtrArray * ops;
1076     DeviceStatusFlags failed_result = 0;
1077     char *failed_errmsg = NULL;
1078     unsigned int i;
1079     Device * first_success = NULL;
1080
1081     self = RAIT_DEVICE(dself);
1082
1083     amfree(dself->volume_time);
1084     amfree(dself->volume_label);
1085     dumpfile_free(dself->volume_header);
1086     dself->volume_header = NULL;
1087
1088     if (rait_device_in_error(self))
1089         return dself->status | DEVICE_STATUS_DEVICE_ERROR;
1090
1091     /* nail down our block size, if we haven't already */
1092     if (!fix_block_size(self))
1093         return FALSE;
1094
1095     ops = make_generic_boolean_op_array(self);
1096
1097     do_rait_child_ops(self, read_label_do_op, ops);
1098
1099     for (i = 0; i < ops->len; i ++) {
1100         GenericOp * op = g_ptr_array_index(ops, i);
1101         DeviceStatusFlags result = GPOINTER_TO_INT(op->result);
1102         if (op->result == DEVICE_STATUS_SUCCESS) {
1103             if (first_success == NULL) {
1104                 /* This is the first successful device. */
1105                 first_success = op->child;
1106             } else if (!compare_volume_results(first_success, op->child)) {
1107                 /* Doesn't match. :-( */
1108                 failed_errmsg = vstrallocf("Inconsistent volume labels/datestamps: "
1109                         "Got %s/%s on %s against %s/%s on %s.",
1110                         first_success->volume_label,
1111                         first_success->volume_time,
1112                         first_success->device_name,
1113                         op->child->volume_label,
1114                         op->child->volume_time,
1115                         op->child->device_name);
1116                 g_warning("%s", failed_errmsg);
1117                 failed_result |= DEVICE_STATUS_VOLUME_ERROR;
1118             }
1119         } else {
1120             failed_result |= result;
1121         }
1122     }
1123
1124     if (failed_result != DEVICE_STATUS_SUCCESS) {
1125         /* We had multiple failures or an inconsistency. */
1126         device_set_error(dself, failed_errmsg, failed_result);
1127     } else {
1128         /* Everything peachy. */
1129         amfree(failed_errmsg);
1130
1131         g_assert(first_success != NULL);
1132         if (first_success->volume_label != NULL) {
1133             dself->volume_label = g_strdup(first_success->volume_label);
1134         }
1135         if (first_success->volume_time != NULL) {
1136             dself->volume_time = g_strdup(first_success->volume_time);
1137         }
1138         if (first_success->volume_header != NULL) {
1139             dself->volume_header = dumpfile_copy(first_success->volume_header);
1140         }
1141     }
1142
1143     g_ptr_array_free_full(ops);
1144
1145     return dself->status;
1146 }
1147
1148 typedef struct {
1149     GenericOp base;
1150     DeviceAccessMode mode; /* IN */
1151     char * label;          /* IN */
1152     char * timestamp;      /* IN */
1153 } StartOp;
1154
1155 /* A GFunc. */
1156 static void start_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1157     DeviceClass *klass;
1158     StartOp * param = data;
1159
1160     klass = DEVICE_GET_CLASS(param->base.child);
1161     if (klass->start) {
1162         param->base.result =
1163             GINT_TO_POINTER((klass->start)(param->base.child,
1164                                             param->mode, param->label,
1165                                             param->timestamp));
1166     } else {
1167         param->base.result = FALSE;
1168     }
1169 }
1170
1171 static gboolean
1172 rait_device_configure(Device * dself, gboolean use_global_config)
1173 {
1174     RaitDevice *self = RAIT_DEVICE(dself);
1175     guint i;
1176
1177     for (i = 0; i < self->private->children->len; i ++) {
1178         Device *child;
1179
1180         if ((signed)i == self->private->failed)
1181             continue;
1182
1183         child = g_ptr_array_index(self->private->children, i);
1184         /* unconditionally configure the child without the global
1185          * configuration */
1186         if (!device_configure(child, FALSE))
1187             return FALSE;
1188     }
1189
1190     if (parent_class->configure) {
1191         return parent_class->configure(dself, use_global_config);
1192     }
1193
1194     return TRUE;
1195 }
1196
1197 static gboolean
1198 rait_device_start (Device * dself, DeviceAccessMode mode, char * label,
1199                    char * timestamp) {
1200     GPtrArray * ops;
1201     guint i;
1202     gboolean success;
1203     RaitDevice * self;
1204     DeviceStatusFlags total_status;
1205     char *failure_errmsgs = NULL;
1206     char * label_from_device = NULL;
1207
1208     self = RAIT_DEVICE(dself);
1209
1210     if (rait_device_in_error(self)) return FALSE;
1211
1212     /* No starting in degraded mode. */
1213     if (self->private->status != RAIT_STATUS_COMPLETE &&
1214         (mode == ACCESS_WRITE || mode == ACCESS_APPEND)) {
1215         device_set_error(dself,
1216                          g_strdup_printf(_("RAIT device %s is read-only "
1217                                            "because it is in degraded mode.\n"),
1218                                          dself->device_name),
1219                          DEVICE_STATUS_DEVICE_ERROR);
1220         return FALSE;
1221     }
1222
1223     /* nail down our block size, if we haven't already */
1224     if (!fix_block_size(self))
1225         return FALSE;
1226
1227     dself->access_mode = mode;
1228     dself->in_file = FALSE;
1229     amfree(dself->volume_label);
1230     amfree(dself->volume_time);
1231     dumpfile_free(dself->volume_header);
1232     dself->volume_header = NULL;
1233
1234     ops = g_ptr_array_sized_new(self->private->children->len);
1235     for (i = 0; i < self->private->children->len; i ++) {
1236         StartOp * op;
1237
1238         if ((signed)i == self->private->failed) {
1239             continue;
1240         }
1241
1242         op = g_new(StartOp, 1);
1243         op->base.child = g_ptr_array_index(self->private->children, i);
1244         op->mode = mode;
1245         op->label = g_strdup(label);
1246         op->timestamp = g_strdup(timestamp);
1247         g_ptr_array_add(ops, op);
1248     }
1249
1250     do_rait_child_ops(self, start_do_op, ops);
1251
1252     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1253
1254     /* Check results of starting devices; this is mostly about the
1255      * VOLUME_UNLABELED flag. */
1256     total_status = 0;
1257     for (i = 0; i < ops->len; i ++) {
1258         StartOp * op = g_ptr_array_index(ops, i);
1259         Device *child = op->base.child;
1260
1261         total_status |= child->status;
1262         if (child->status != DEVICE_STATUS_SUCCESS) {
1263             /* record the error message and move on. */
1264             append_message(&failure_errmsgs,
1265                            g_strdup_printf("%s: %s",
1266                                            child->device_name,
1267                                            device_error_or_status(child)));
1268         } else {
1269             if (child->volume_label != NULL && child->volume_time != NULL) {
1270                 if (label_from_device) {
1271                     if (strcmp(child->volume_label, dself->volume_label) != 0 ||
1272                         strcmp(child->volume_time, dself->volume_time) != 0) {
1273                         /* Mismatch! (Two devices provided different labels) */
1274                         char * this_message =
1275                             g_strdup_printf("%s: Label (%s/%s) is different "
1276                                             "from label (%s/%s) found at "
1277                                             "device %s",
1278                                             child->device_name,
1279                                             child->volume_label,
1280                                             child->volume_time,
1281                                             dself->volume_label,
1282                                             dself->volume_time,
1283                                             label_from_device);
1284                         append_message(&failure_errmsgs, this_message);
1285                         total_status |= DEVICE_STATUS_DEVICE_ERROR;
1286                         g_warning("RAIT device children have different labels or timestamps");
1287                     }
1288                 } else {
1289                     /* First device with a volume. */
1290                     dself->volume_label = g_strdup(child->volume_label);
1291                     dself->volume_time = g_strdup(child->volume_time);
1292                     dself->volume_header = dumpfile_copy(child->volume_header);
1293                     label_from_device = g_strdup(child->device_name);
1294                 }
1295             } else {
1296                 /* Device problem, it says it succeeded but sets no label? */
1297                 char * this_message =
1298                     g_strdup_printf("%s: Says label read, but no volume "
1299                                      "label found.", child->device_name);
1300                 g_warning("RAIT device child has NULL volume or label");
1301                 append_message(&failure_errmsgs, this_message);
1302                 total_status |= DEVICE_STATUS_DEVICE_ERROR;
1303             }
1304         }
1305     }
1306
1307     amfree(label_from_device);
1308     g_ptr_array_free_full(ops);
1309
1310     dself->status = total_status;
1311
1312     if (total_status != DEVICE_STATUS_SUCCESS || !success) {
1313         device_set_error(dself, failure_errmsgs, total_status);
1314         return FALSE;
1315     }
1316
1317     amfree(failure_errmsgs);
1318     return TRUE;
1319 }
1320
1321 typedef struct {
1322     GenericOp base;
1323     dumpfile_t * info; /* IN */
1324     int fileno;
1325 } StartFileOp;
1326
1327 /* a GFunc */
1328 static void start_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1329     StartFileOp * op = data;
1330     op->base.result = GINT_TO_POINTER(device_start_file(op->base.child,
1331                                                         op->info));
1332     op->fileno = op->base.child->file;
1333     if (op->fileno < 1) {
1334         op->base.result = FALSE;
1335     }
1336 }
1337
1338 static gboolean
1339 rait_device_start_file (Device * dself, dumpfile_t * info) {
1340     GPtrArray * ops;
1341     guint i;
1342     gboolean success;
1343     RaitDevice * self;
1344     int actual_file = -1;
1345
1346     self = RAIT_DEVICE(dself);
1347
1348     if (rait_device_in_error(self)) return FALSE;
1349     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1350
1351     ops = g_ptr_array_sized_new(self->private->children->len);
1352     for (i = 0; i < self->private->children->len; i ++) {
1353         StartFileOp * op;
1354         op = g_new(StartFileOp, 1);
1355         op->base.child = g_ptr_array_index(self->private->children, i);
1356         /* each child gets its own copy of the header, to munge as it
1357          * likes (setting blocksize, at least) */
1358         op->info = dumpfile_copy(info);
1359         g_ptr_array_add(ops, op);
1360     }
1361
1362     do_rait_child_ops(self, start_file_do_op, ops);
1363
1364     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1365
1366     for (i = 0; i < self->private->children->len && success; i ++) {
1367         StartFileOp * op = g_ptr_array_index(ops, i);
1368         if (!op->base.result)
1369             continue;
1370         g_assert(op->fileno >= 1);
1371         if (actual_file < 1) {
1372             actual_file = op->fileno;
1373         }
1374         if (actual_file != op->fileno) {
1375             /* File number mismatch! Aah, my hair is on fire! */
1376             device_set_error(dself,
1377                              g_strdup_printf("File number mismatch in "
1378                                              "rait_device_start_file(): "
1379                                              "Child %s reported file number "
1380                                              "%d, another child reported "
1381                                              "file number %d.",
1382                                              op->base.child->device_name,
1383                                              op->fileno, actual_file),
1384                              DEVICE_STATUS_DEVICE_ERROR);
1385             success = FALSE;
1386             op->base.result = FALSE;
1387         }
1388     }
1389
1390     for (i = 0; i < ops->len && success; i ++) {
1391         StartFileOp * op = g_ptr_array_index(ops, i);
1392         if (op->info) dumpfile_free(op->info);
1393     }
1394     g_ptr_array_free_full(ops);
1395
1396     if (!success) {
1397         if (!device_in_error(dself)) {
1398             device_set_error(dself, stralloc("One or more devices "
1399                                              "failed to start_file"),
1400                              DEVICE_STATUS_DEVICE_ERROR);
1401         }
1402         return FALSE;
1403     }
1404
1405     dself->in_file = TRUE;
1406     g_assert(actual_file >= 1);
1407     dself->file = actual_file;
1408
1409     return TRUE;
1410 }
1411
1412 static void find_simple_params(RaitDevice * self,
1413                                guint * num_children,
1414                                guint * data_children) {
1415     int num, data;
1416
1417     num = self->private->children->len;
1418     if (num > 1)
1419         data = num - 1;
1420     else
1421         data = num;
1422     if (num_children != NULL)
1423         *num_children = num;
1424     if (data_children != NULL)
1425         *data_children = data;
1426 }
1427
1428 typedef struct {
1429     GenericOp base;
1430     guint size;           /* IN */
1431     gpointer data;        /* IN */
1432     gboolean data_needs_free; /* bookkeeping */
1433 } WriteBlockOp;
1434
1435 /* a GFunc. */
1436 static void write_block_do_op(gpointer data,
1437                               gpointer user_data G_GNUC_UNUSED) {
1438     WriteBlockOp * op = data;
1439
1440     op->base.result =
1441         GINT_TO_POINTER(device_write_block(op->base.child, op->size, op->data));
1442 }
1443
1444 /* Parity block generation. Performance of this function can be improved
1445    considerably by using larger-sized integers or
1446    assembly-coded vector instructions. Parameters are:
1447    % data       - All data chunks in series (chunk_size * num_chunks bytes)
1448    % parity     - Allocated space for parity block (chunk_size bytes)
1449  */
1450 static void make_parity_block(char * data, char * parity,
1451                               guint chunk_size, guint num_chunks) {
1452     guint i;
1453     bzero(parity, chunk_size);
1454     for (i = 0; i < num_chunks - 1; i ++) {
1455         guint j;
1456         for (j = 0; j < chunk_size; j ++) {
1457             parity[j] ^= data[chunk_size*i + j];
1458         }
1459     }
1460 }
1461
1462 /* Does the same thing as make_parity_block, but instead of using a
1463    single memory chunk holding all chunks, it takes a GPtrArray of
1464    chunks. */
1465 static void make_parity_block_extents(GPtrArray * data, char * parity,
1466                                       guint chunk_size) {
1467     guint i;
1468     bzero(parity, chunk_size);
1469     for (i = 0; i < data->len; i ++) {
1470         guint j;
1471         char * data_chunk;
1472         data_chunk = g_ptr_array_index(data, i);
1473         for (j = 0; j < chunk_size; j ++) {
1474             parity[j] ^= data_chunk[j];
1475         }
1476     }
1477 }
1478
1479 /* Does the parity creation algorithm. Allocates and returns a single
1480    device block from a larger RAIT block. chunks and chunk are 1-indexed. */
1481 static char * extract_data_block(char * data, guint size,
1482                                  guint chunks, guint chunk) {
1483     char * rval;
1484     guint chunk_size;
1485
1486     g_assert(chunks > 0 && chunk > 0 && chunk <= chunks);
1487     g_assert(data != NULL);
1488     g_assert(size > 0 && size % (chunks - 1) == 0);
1489
1490     chunk_size = size / (chunks - 1);
1491     rval = g_malloc(chunk_size);
1492     if (chunks != chunk) {
1493         /* data block. */
1494         memcpy(rval, data + chunk_size * (chunk - 1), chunk_size);
1495     } else {
1496         make_parity_block(data, rval, chunk_size, chunks);
1497     }
1498
1499     return rval;
1500 }
1501
1502 static gboolean
1503 rait_device_write_block (Device * dself, guint size, gpointer data) {
1504     GPtrArray * ops;
1505     guint i;
1506     gboolean success;
1507     guint data_children, num_children;
1508     gsize blocksize = dself->block_size;
1509     RaitDevice * self;
1510     gboolean last_block = (size < blocksize);
1511
1512     self = RAIT_DEVICE(dself);
1513
1514     if (rait_device_in_error(self)) return FALSE;
1515     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1516
1517     find_simple_params(RAIT_DEVICE(self), &num_children, &data_children);
1518     num_children = self->private->children->len;
1519     if (num_children != 1)
1520         data_children = num_children - 1;
1521     else
1522         data_children = num_children;
1523
1524     g_assert(size % data_children == 0 || last_block);
1525
1526     /* zero out to the end of a short block -- tape devices only write
1527      * whole blocks. */
1528     if (last_block) {
1529         char *new_data;
1530
1531         new_data = g_malloc(blocksize);
1532         memcpy(new_data, data, size);
1533         bzero(new_data + size, blocksize - size);
1534
1535         data = new_data;
1536         size = blocksize;
1537     }
1538
1539     ops = g_ptr_array_sized_new(num_children);
1540     for (i = 0; i < self->private->children->len; i ++) {
1541         WriteBlockOp * op;
1542         op = g_malloc(sizeof(*op));
1543         op->base.child = g_ptr_array_index(self->private->children, i);
1544         op->size = size / data_children;
1545         if (num_children <= 2) {
1546             op->data = data;
1547             op->data_needs_free = FALSE;
1548         } else {
1549             op->data_needs_free = TRUE;
1550             op->data = extract_data_block(data, size, num_children, i + 1);
1551         }
1552         g_ptr_array_add(ops, op);
1553     }
1554
1555     do_rait_child_ops(self, write_block_do_op, ops);
1556
1557     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1558
1559     for (i = 0; i < self->private->children->len; i ++) {
1560         WriteBlockOp * op = g_ptr_array_index(ops, i);
1561         if (op->data_needs_free)
1562             free(op->data);
1563     }
1564
1565     if (last_block) {
1566         amfree(data);
1567     }
1568
1569     g_ptr_array_free_full(ops);
1570
1571     if (!success) {
1572         /* TODO be more specific here */
1573         /* TODO: handle EOM here -- if one or more (or two or more??)
1574          * children have is_eom set, then reflect that in our error
1575          * status. What's more fun is when one device fails and must be isolated at
1576          * the same time another hits EOF. */
1577         device_set_error(dself,
1578             stralloc("One or more devices failed to write_block"),
1579             DEVICE_STATUS_DEVICE_ERROR);
1580         /* this is EOM or an error, so call it EOM */
1581         dself->is_eom = TRUE;
1582         return FALSE;
1583     } else {
1584         dself->block ++;
1585
1586         return TRUE;
1587     }
1588 }
1589
1590 /* A GFunc */
1591 static void finish_file_do_op(gpointer data,
1592                               gpointer user_data G_GNUC_UNUSED) {
1593     GenericOp * op = data;
1594     if (op->child) {
1595         op->result = GINT_TO_POINTER(device_finish_file(op->child));
1596     } else {
1597         op->result = FALSE;
1598     }
1599 }
1600
1601 static gboolean
1602 rait_device_finish_file (Device * dself) {
1603     GPtrArray * ops;
1604     gboolean success;
1605     RaitDevice * self = RAIT_DEVICE(dself);
1606
1607     g_assert(self != NULL);
1608     if (rait_device_in_error(dself)) return FALSE;
1609     if (self->private->status != RAIT_STATUS_COMPLETE) return FALSE;
1610
1611     ops = make_generic_boolean_op_array(self);
1612
1613     do_rait_child_ops(self, finish_file_do_op, ops);
1614
1615     success = g_ptr_array_and(ops, extract_boolean_generic_op);
1616
1617     g_ptr_array_free_full(ops);
1618
1619     if (!success) {
1620         /* TODO: be more specific here */
1621         device_set_error(dself,
1622                          g_strdup("One or more devices failed to finish_file"),
1623             DEVICE_STATUS_DEVICE_ERROR);
1624         return FALSE;
1625     }
1626
1627     dself->in_file = FALSE;
1628     return TRUE;
1629 }
1630
1631 typedef struct {
1632     GenericOp base;
1633     guint requested_file;                /* IN */
1634     guint actual_file;                   /* OUT */
1635 } SeekFileOp;
1636
1637 /* a GFunc. */
1638 static void seek_file_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1639     SeekFileOp * op = data;
1640     op->base.result = device_seek_file(op->base.child, op->requested_file);
1641     op->actual_file = op->base.child->file;
1642 }
1643
1644 static dumpfile_t *
1645 rait_device_seek_file (Device * dself, guint file) {
1646     GPtrArray * ops;
1647     guint i;
1648     gboolean success;
1649     dumpfile_t * rval;
1650     RaitDevice * self = RAIT_DEVICE(dself);
1651     guint actual_file = 0;
1652     gboolean in_file = FALSE;
1653
1654     if (rait_device_in_error(self)) return NULL;
1655
1656     dself->in_file = FALSE;
1657     dself->is_eof = FALSE;
1658     dself->block = 0;
1659
1660     ops = g_ptr_array_sized_new(self->private->children->len);
1661     for (i = 0; i < self->private->children->len; i ++) {
1662         SeekFileOp * op;
1663         if ((int)i == self->private->failed)
1664             continue; /* This device is broken. */
1665         op = g_new(SeekFileOp, 1);
1666         op->base.child = g_ptr_array_index(self->private->children, i);
1667         op->base.child_index = i;
1668         op->requested_file = file;
1669         g_ptr_array_add(ops, op);
1670     }
1671
1672     do_rait_child_ops(self, seek_file_do_op, ops);
1673
1674     /* This checks for NULL values, but we still have to check for
1675        consistant headers. */
1676     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1677                                        ops, extract_boolean_pointer_op);
1678
1679     rval = NULL;
1680     for (i = 0; i < ops->len; i ++) {
1681         SeekFileOp * this_op;
1682         dumpfile_t * this_result;
1683         guint this_actual_file;
1684         gboolean this_in_file;
1685
1686         this_op = (SeekFileOp*)g_ptr_array_index(ops, i);
1687
1688         if ((signed)this_op->base.child_index == self->private->failed)
1689             continue;
1690
1691         this_result = this_op->base.result;
1692         this_actual_file = this_op->actual_file;
1693         this_in_file = this_op->base.child->in_file;
1694
1695         if (rval == NULL) {
1696             rval = this_result;
1697             actual_file = this_actual_file;
1698             in_file = this_in_file;
1699         } else {
1700             if (headers_are_equal(rval, this_result) &&
1701                 actual_file == this_actual_file &&
1702                 in_file == this_in_file) {
1703                 /* Do nothing. */
1704             } else {
1705                 success = FALSE;
1706             }
1707             free(this_result);
1708         }
1709     }
1710
1711     g_ptr_array_free_full(ops);
1712
1713     if (!success) {
1714         amfree(rval);
1715         /* TODO: be more specific here */
1716         device_set_error(dself,
1717                          g_strdup("One or more devices failed to seek_file"),
1718             DEVICE_STATUS_DEVICE_ERROR);
1719         return NULL;
1720     }
1721
1722     /* update our state */
1723     dself->in_file = in_file;
1724     dself->file = actual_file;
1725
1726     return rval;
1727 }
1728
1729 typedef struct {
1730     GenericOp base;
1731     guint64 block; /* IN */
1732 } SeekBlockOp;
1733
1734 /* a GFunc. */
1735 static void seek_block_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
1736     SeekBlockOp * op = data;
1737     op->base.result =
1738         GINT_TO_POINTER(device_seek_block(op->base.child, op->block));
1739 }
1740
1741 static gboolean
1742 rait_device_seek_block (Device * dself, guint64 block) {
1743     GPtrArray * ops;
1744     guint i;
1745     gboolean success;
1746
1747     RaitDevice * self = RAIT_DEVICE(dself);
1748
1749     if (rait_device_in_error(self)) return FALSE;
1750
1751     ops = g_ptr_array_sized_new(self->private->children->len);
1752     for (i = 0; i < self->private->children->len; i ++) {
1753         SeekBlockOp * op;
1754         if ((int)i == self->private->failed)
1755             continue; /* This device is broken. */
1756         op = g_new(SeekBlockOp, 1);
1757         op->base.child = g_ptr_array_index(self->private->children, i);
1758         op->base.child_index = i;
1759         op->block = block;
1760         g_ptr_array_add(ops, op);
1761     }
1762
1763     do_rait_child_ops(self, seek_block_do_op, ops);
1764
1765     success = g_ptr_array_union_robust(RAIT_DEVICE(self),
1766                                        ops, extract_boolean_generic_op);
1767
1768     g_ptr_array_free_full(ops);
1769
1770     if (!success) {
1771         /* TODO: be more specific here */
1772         device_set_error(dself,
1773             stralloc("One or more devices failed to seek_block"),
1774             DEVICE_STATUS_DEVICE_ERROR);
1775         return FALSE;
1776     }
1777
1778     dself->block = block;
1779     return TRUE;
1780 }
1781
1782 typedef struct {
1783     GenericOp base;
1784     gpointer buffer; /* IN */
1785     int read_size;      /* IN/OUT -- note not a pointer */
1786     int desired_read_size; /* bookkeeping */
1787 } ReadBlockOp;
1788
1789 /* a GFunc. */
1790 static void read_block_do_op(gpointer data,
1791                              gpointer user_data G_GNUC_UNUSED) {
1792     ReadBlockOp * op = data;
1793     op->base.result =
1794         GINT_TO_POINTER(device_read_block(op->base.child, op->buffer,
1795                                           &(op->read_size)));
1796     if (op->read_size > op->desired_read_size) {
1797         g_warning("child device %s tried to return an oversized block, which the RAIT device does not support",
1798                   op->base.child->device_name);
1799     }
1800 }
1801
1802 /* A BooleanExtractor. This one checks for a successful read. */
1803 static gboolean extract_boolean_read_block_op_data(gpointer data) {
1804     ReadBlockOp * op = data;
1805     return GPOINTER_TO_INT(op->base.result) == op->desired_read_size;
1806 }
1807
1808 /* A BooleanExtractor. This one checks for EOF. */
1809 static gboolean extract_boolean_read_block_op_eof(gpointer data) {
1810     ReadBlockOp * op = data;
1811     return op->base.child->is_eof;
1812 }
1813
1814 /* Counts the number of elements in an array matching a given proposition. */
1815 static int g_ptr_array_count(GPtrArray * array, BooleanExtractor filter) {
1816     int rval;
1817     unsigned int i;
1818     rval = 0;
1819     for (i = 0; i < array->len ; i++) {
1820         if (filter(g_ptr_array_index(array, i)))
1821             rval ++;
1822     }
1823     return rval;
1824 }
1825
1826 static gboolean raid_block_reconstruction(RaitDevice * self, GPtrArray * ops,
1827                                       gpointer buf, size_t bufsize) {
1828     guint num_children, data_children;
1829     gsize blocksize;
1830     gsize child_blocksize;
1831     guint i;
1832     int parity_child;
1833     gpointer parity_block = NULL;
1834     gboolean success;
1835
1836     success = TRUE;
1837
1838     blocksize = DEVICE(self)->block_size;
1839     find_simple_params(self, &num_children, &data_children);
1840
1841     if (num_children > 1)
1842         parity_child = num_children - 1;
1843     else
1844         parity_child = -1;
1845
1846     child_blocksize = blocksize / data_children;
1847
1848     for (i = 0; i < ops->len; i ++) {
1849         ReadBlockOp * op = g_ptr_array_index(ops, i);
1850         if (!extract_boolean_read_block_op_data(op))
1851             continue;
1852         if ((int)(op->base.child_index) == parity_child) {
1853             parity_block = op->buffer;
1854         } else {
1855             g_assert(child_blocksize * (op->base.child_index+1) <= bufsize);
1856             memcpy((char *)buf + child_blocksize * op->base.child_index, op->buffer,
1857                    child_blocksize);
1858         }
1859     }
1860     if (self->private->status == RAIT_STATUS_COMPLETE) {
1861         g_assert(parity_block != NULL); /* should have found parity_child */
1862
1863         if (num_children >= 2) {
1864             /* Verify the parity block. This code is inefficient but
1865                does the job for the 2-device case, too. */
1866             gpointer constructed_parity;
1867             GPtrArray * data_extents;
1868
1869             constructed_parity = g_malloc(child_blocksize);
1870             data_extents = g_ptr_array_sized_new(data_children);
1871             for (i = 0; i < data_children; i ++) {
1872                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1873                 g_assert(extract_boolean_read_block_op_data(op));
1874                 if ((int)op->base.child_index == parity_child)
1875                     continue;
1876                 g_ptr_array_add(data_extents, op->buffer);
1877             }
1878             make_parity_block_extents(data_extents, constructed_parity,
1879                                       child_blocksize);
1880
1881             if (0 != memcmp(parity_block, constructed_parity,
1882                             child_blocksize)) {
1883                 device_set_error(DEVICE(self),
1884                     stralloc(_("RAIT is inconsistent: Parity block did not match data blocks.")),
1885                     DEVICE_STATUS_DEVICE_ERROR);
1886                 /* TODO: can't we just isolate the device in this case? */
1887                 success = FALSE;
1888             }
1889             g_ptr_array_free(data_extents, TRUE);
1890             amfree(constructed_parity);
1891         } else { /* do nothing. */ }
1892     } else if (self->private->status == RAIT_STATUS_DEGRADED) {
1893         g_assert(self->private->failed >= 0 && self->private->failed < (int)num_children);
1894         /* We are in degraded mode. What's missing? */
1895         if (self->private->failed == parity_child) {
1896             /* do nothing. */
1897         } else if (num_children >= 2) {
1898             /* Reconstruct failed block from parity block. */
1899             GPtrArray * data_extents = g_ptr_array_new();
1900
1901             for (i = 0; i < data_children; i ++) {
1902                 ReadBlockOp * op = g_ptr_array_index(ops, i);
1903                 if (!extract_boolean_read_block_op_data(op))
1904                     continue;
1905                 g_ptr_array_add(data_extents, op->buffer);
1906             }
1907
1908             /* Conveniently, the reconstruction is the same procedure
1909                as the parity generation. This even works if there is
1910                only one remaining device! */
1911             make_parity_block_extents(data_extents,
1912                                       (char *)buf + (child_blocksize *
1913                                              self->private->failed),
1914                                       child_blocksize);
1915
1916             /* The array members belong to our ops argument. */
1917             g_ptr_array_free(data_extents, TRUE);
1918         } else {
1919             g_assert_not_reached();
1920         }
1921     } else {
1922         /* device is already in FAILED state -- we shouldn't even be here */
1923         success = FALSE;
1924     }
1925     return success;
1926 }
1927
1928 static int
1929 rait_device_read_block (Device * dself, gpointer buf, int * size) {
1930     GPtrArray * ops;
1931     guint i;
1932     gboolean success;
1933     guint num_children, data_children;
1934     gsize blocksize = dself->block_size;
1935     gsize child_blocksize;
1936
1937     RaitDevice * self = RAIT_DEVICE(dself);
1938
1939     if (rait_device_in_error(self)) return -1;
1940
1941     find_simple_params(self, &num_children, &data_children);
1942
1943     /* tell caller they haven't given us a big enough buffer */
1944     if (blocksize > (gsize)*size) {
1945         g_assert(blocksize < INT_MAX);
1946         *size = (int)blocksize;
1947         return 0;
1948     }
1949
1950     g_assert(blocksize % data_children == 0); /* see find_block_size */
1951     child_blocksize = blocksize / data_children;
1952
1953     ops = g_ptr_array_sized_new(num_children);
1954     for (i = 0; i < num_children; i ++) {
1955         ReadBlockOp * op;
1956         if ((int)i == self->private->failed)
1957             continue; /* This device is broken. */
1958         op = g_new(ReadBlockOp, 1);
1959         op->base.child = g_ptr_array_index(self->private->children, i);
1960         op->base.child_index = i;
1961         op->buffer = g_malloc(child_blocksize);
1962         op->desired_read_size = op->read_size = child_blocksize;
1963         g_ptr_array_add(ops, op);
1964     }
1965
1966     do_rait_child_ops(self, read_block_do_op, ops);
1967
1968     if (g_ptr_array_count(ops, extract_boolean_read_block_op_data)) {
1969         if (!g_ptr_array_union_robust(RAIT_DEVICE(self),
1970                                      ops,
1971                                      extract_boolean_read_block_op_data)) {
1972             /* TODO: be more specific */
1973             device_set_error(dself,
1974                 stralloc(_("Error occurred combining blocks from child devices")),
1975                 DEVICE_STATUS_DEVICE_ERROR);
1976             success = FALSE;
1977         } else {
1978             /* raid_block_reconstruction sets the error status if necessary */
1979             success = raid_block_reconstruction(RAIT_DEVICE(self),
1980                                                 ops, buf, (size_t)*size);
1981         }
1982     } else {
1983         success = FALSE;
1984         if (g_ptr_array_union_robust(RAIT_DEVICE(self),
1985                                      ops,
1986                                      extract_boolean_read_block_op_eof)) {
1987             device_set_error(dself,
1988                 stralloc(_("EOF")),
1989                 DEVICE_STATUS_SUCCESS);
1990             dself->is_eof = TRUE;
1991             dself->in_file = FALSE;
1992         } else {
1993             device_set_error(dself,
1994                 stralloc(_("All child devices failed to read, but not all are at eof")),
1995                 DEVICE_STATUS_DEVICE_ERROR);
1996         }
1997     }
1998
1999     for (i = 0; i < ops->len; i ++) {
2000         ReadBlockOp * op = g_ptr_array_index(ops, i);
2001         amfree(op->buffer);
2002     }
2003     g_ptr_array_free_full(ops);
2004
2005     if (success) {
2006         dself->block++;
2007         *size = blocksize;
2008         return blocksize;
2009     } else {
2010         return -1;
2011     }
2012 }
2013
2014 /* property utility functions */
2015
2016 typedef struct {
2017     GenericOp base;
2018     DevicePropertyId id;   /* IN */
2019     GValue value;          /* IN/OUT */
2020     PropertySurety surety; /* IN (for set) */
2021     PropertySource source; /* IN (for set) */
2022 } PropertyOp;
2023
2024 /* Creates a GPtrArray of PropertyOf for a get or set operation. */
2025 static GPtrArray * make_property_op_array(RaitDevice * self,
2026                                           DevicePropertyId id,
2027                                           GValue * value,
2028                                           PropertySurety surety,
2029                                           PropertySource source) {
2030     guint i;
2031     GPtrArray * ops;
2032     ops = g_ptr_array_sized_new(self->private->children->len);
2033     for (i = 0; i < self->private->children->len; i ++) {
2034         PropertyOp * op;
2035
2036         if ((signed)i == self->private->failed) {
2037             continue;
2038         }
2039
2040         op = g_new(PropertyOp, 1);
2041         op->base.child = g_ptr_array_index(self->private->children, i);
2042         op->id = id;
2043         bzero(&(op->value), sizeof(op->value));
2044         if (value != NULL) {
2045             g_value_unset_copy(value, &(op->value));
2046         }
2047         op->surety = surety;
2048         op->source = source;
2049         g_ptr_array_add(ops, op);
2050     }
2051
2052     return ops;
2053 }
2054
2055 /* A GFunc. */
2056 static void property_get_do_op(gpointer data,
2057                                gpointer user_data G_GNUC_UNUSED) {
2058     PropertyOp * op = data;
2059
2060     bzero(&(op->value), sizeof(op->value));
2061     op->base.result =
2062         GINT_TO_POINTER(device_property_get(op->base.child, op->id,
2063                                             &(op->value)));
2064 }
2065
2066 /* A GFunc. */
2067 static void property_set_do_op(gpointer data,
2068                                gpointer user_data G_GNUC_UNUSED) {
2069     PropertyOp * op = data;
2070
2071     op->base.result =
2072         GINT_TO_POINTER(device_property_set_ex(op->base.child, op->id,
2073                                                &(op->value), op->surety,
2074                                                op->source));
2075     g_value_unset(&(op->value));
2076 }
2077
2078 /* PropertyGetFns and PropertySetFns */
2079
2080 static gboolean
2081 property_get_block_size_fn(Device *dself,
2082     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2083     PropertySurety *surety, PropertySource *source)
2084 {
2085     RaitDevice *self = RAIT_DEVICE(dself);
2086     gsize my_block_size;
2087
2088     if (dself->block_size_source != PROPERTY_SOURCE_DEFAULT) {
2089         my_block_size = dself->block_size;
2090
2091         if (surety)
2092             *surety = dself->block_size_surety;
2093     } else {
2094         gsize child_block_size;
2095         child_block_size = calculate_block_size_from_children(self,
2096                                                     &my_block_size);
2097         if (child_block_size == 0)
2098             return FALSE;
2099
2100         if (surety)
2101             *surety = PROPERTY_SURETY_BAD; /* may still change */
2102     }
2103
2104     if (val) {
2105         g_value_unset_init(val, G_TYPE_INT);
2106         g_assert(my_block_size < G_MAXINT); /* gsize -> gint */
2107         g_value_set_int(val, (gint)my_block_size);
2108     }
2109
2110     if (source)
2111         *source = dself->block_size_source;
2112
2113     return TRUE;
2114 }
2115
2116 static gboolean
2117 property_set_block_size_fn(Device *dself,
2118     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2119     PropertySurety surety, PropertySource source)
2120 {
2121     RaitDevice *self = RAIT_DEVICE(dself);
2122     gint my_block_size = g_value_get_int(val);
2123     guint data_children;
2124
2125     find_simple_params(self, NULL, &data_children);
2126     if ((my_block_size % data_children) != 0) {
2127         device_set_error(dself,
2128             vstrallocf(_("Block size must be a multiple of %d"), data_children),
2129             DEVICE_STATUS_DEVICE_ERROR);
2130         return FALSE;
2131     }
2132
2133     dself->block_size = my_block_size;
2134     dself->block_size_source = source;
2135     dself->block_size_surety = surety;
2136
2137     if (!fix_block_size(self))
2138         return FALSE;
2139
2140     return TRUE;
2141 }
2142
2143 static gboolean
2144 property_get_canonical_name_fn(Device *dself,
2145     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2146     PropertySurety *surety, PropertySource *source)
2147 {
2148     RaitDevice *self = RAIT_DEVICE(dself);
2149     char *canonical = child_device_names_to_rait_name(self);
2150
2151     if (val) {
2152         g_value_unset_init(val, G_TYPE_STRING);
2153         g_value_set_string(val, canonical);
2154         g_free(canonical);
2155     }
2156
2157     if (surety)
2158         *surety = PROPERTY_SURETY_GOOD;
2159
2160     if (source)
2161         *source = PROPERTY_SOURCE_DETECTED;
2162
2163     return TRUE;
2164 }
2165
2166 static gboolean
2167 property_get_concurrency_fn(Device *dself,
2168     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2169     PropertySurety *surety, PropertySource *source)
2170 {
2171     RaitDevice *self = RAIT_DEVICE(dself);
2172     ConcurrencyParadigm result;
2173     guint i;
2174     GPtrArray * ops;
2175     gboolean success;
2176
2177     ops = make_property_op_array(self, PROPERTY_CONCURRENCY, NULL, 0, 0);
2178     do_rait_child_ops(self, property_get_do_op, ops);
2179
2180     /* find the most restrictive paradigm acceptable to all
2181      * child devices */
2182     result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
2183     success = TRUE;
2184     for (i = 0; i < ops->len; i ++) {
2185         ConcurrencyParadigm cur;
2186         PropertyOp * op = g_ptr_array_index(ops, i);
2187
2188         if (!op->base.result
2189             || G_VALUE_TYPE(&(op->value)) != CONCURRENCY_PARADIGM_TYPE) {
2190             success = FALSE;
2191             break;
2192         }
2193
2194         cur = g_value_get_enum(&(op->value));
2195         if (result == CONCURRENCY_PARADIGM_EXCLUSIVE ||
2196             cur == CONCURRENCY_PARADIGM_EXCLUSIVE) {
2197             result = CONCURRENCY_PARADIGM_EXCLUSIVE;
2198         } else if (result == CONCURRENCY_PARADIGM_SHARED_READ ||
2199                    cur == CONCURRENCY_PARADIGM_SHARED_READ) {
2200             result = CONCURRENCY_PARADIGM_SHARED_READ;
2201         } else if (result == CONCURRENCY_PARADIGM_RANDOM_ACCESS &&
2202                    cur == CONCURRENCY_PARADIGM_RANDOM_ACCESS) {
2203             result = CONCURRENCY_PARADIGM_RANDOM_ACCESS;
2204         } else {
2205             success = FALSE;
2206             break;
2207         }
2208     }
2209
2210     g_ptr_array_free_full(ops);
2211
2212     if (success) {
2213         if (val) {
2214             g_value_unset_init(val, CONCURRENCY_PARADIGM_TYPE);
2215             g_value_set_enum(val, result);
2216         }
2217
2218         if (surety)
2219             *surety = PROPERTY_SURETY_GOOD;
2220
2221         if (source)
2222             *source = PROPERTY_SOURCE_DETECTED;
2223     }
2224
2225     return success;
2226 }
2227
2228 static gboolean
2229 property_get_streaming_fn(Device *dself,
2230     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2231     PropertySurety *surety, PropertySource *source)
2232 {
2233     RaitDevice *self = RAIT_DEVICE(dself);
2234     StreamingRequirement result;
2235     guint i;
2236     GPtrArray * ops;
2237     gboolean success;
2238
2239     ops = make_property_op_array(self, PROPERTY_STREAMING, NULL, 0, 0);
2240     do_rait_child_ops(self, property_get_do_op, ops);
2241
2242     /* combine the child streaming requirements, selecting the strongest
2243      * requirement of the bunch. */
2244     result = STREAMING_REQUIREMENT_NONE;
2245     success = TRUE;
2246     for (i = 0; i < ops->len; i ++) {
2247         StreamingRequirement cur;
2248         PropertyOp * op = g_ptr_array_index(ops, i);
2249
2250         if (!op->base.result
2251             || G_VALUE_TYPE(&(op->value)) != STREAMING_REQUIREMENT_TYPE) {
2252             success = FALSE;
2253             break;
2254         }
2255
2256         cur = g_value_get_enum(&(op->value));
2257         if (result == STREAMING_REQUIREMENT_REQUIRED ||
2258             cur == STREAMING_REQUIREMENT_REQUIRED) {
2259             result = STREAMING_REQUIREMENT_REQUIRED;
2260         } else if (result == STREAMING_REQUIREMENT_DESIRED ||
2261                    cur == STREAMING_REQUIREMENT_DESIRED) {
2262             result = STREAMING_REQUIREMENT_DESIRED;
2263         } else if (result == STREAMING_REQUIREMENT_NONE &&
2264                    cur == STREAMING_REQUIREMENT_NONE) {
2265             result = STREAMING_REQUIREMENT_NONE;
2266         } else {
2267             success = FALSE;
2268             break;
2269         }
2270     }
2271
2272     g_ptr_array_free_full(ops);
2273
2274     if (success) {
2275         if (val) {
2276             g_value_unset_init(val, STREAMING_REQUIREMENT_TYPE);
2277             g_value_set_enum(val, result);
2278         }
2279
2280         if (surety)
2281             *surety = PROPERTY_SURETY_GOOD;
2282
2283         if (source)
2284             *source = PROPERTY_SOURCE_DETECTED;
2285     }
2286
2287     return success;
2288 }
2289
2290 static gboolean
2291 property_get_boolean_and_fn(Device *dself,
2292     DevicePropertyBase *base, GValue *val,
2293     PropertySurety *surety, PropertySource *source)
2294 {
2295     RaitDevice *self = RAIT_DEVICE(dself);
2296     gboolean result;
2297     guint i;
2298     GPtrArray * ops;
2299     gboolean success;
2300
2301     ops = make_property_op_array(self, base->ID, NULL, 0, 0);
2302     do_rait_child_ops(self, property_get_do_op, ops);
2303
2304     /* combine the child values, applying a simple AND */
2305     result = TRUE;
2306     success = TRUE;
2307     for (i = 0; i < ops->len; i ++) {
2308         PropertyOp * op = g_ptr_array_index(ops, i);
2309
2310         if (!op->base.result || !G_VALUE_HOLDS_BOOLEAN(&(op->value))) {
2311             success = FALSE;
2312             break;
2313         }
2314
2315         if (!g_value_get_boolean(&(op->value))) {
2316             result = FALSE;
2317             break;
2318         }
2319     }
2320
2321     g_ptr_array_free_full(ops);
2322
2323     if (success) {
2324         if (val) {
2325             g_value_unset_init(val, G_TYPE_BOOLEAN);
2326             g_value_set_boolean(val, result);
2327         }
2328
2329         if (surety)
2330             *surety = PROPERTY_SURETY_GOOD;
2331
2332         if (source)
2333             *source = PROPERTY_SOURCE_DETECTED;
2334     }
2335
2336     return success;
2337 }
2338
2339 static gboolean
2340 property_get_medium_access_type_fn(Device *dself,
2341     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2342     PropertySurety *surety, PropertySource *source)
2343 {
2344     RaitDevice *self = RAIT_DEVICE(dself);
2345     MediaAccessMode result;
2346     guint i;
2347     GPtrArray * ops;
2348     gboolean success;
2349
2350     ops = make_property_op_array(self, PROPERTY_MEDIUM_ACCESS_TYPE, NULL, 0, 0);
2351     do_rait_child_ops(self, property_get_do_op, ops);
2352
2353     /* combine the modes as best we can */
2354     result = 0;
2355     success = TRUE;
2356     for (i = 0; i < ops->len; i ++) {
2357         MediaAccessMode cur;
2358         PropertyOp * op = g_ptr_array_index(ops, i);
2359
2360         if (!op->base.result || G_VALUE_TYPE(&(op->value)) != MEDIA_ACCESS_MODE_TYPE) {
2361             success = FALSE;
2362             break;
2363         }
2364
2365         cur = g_value_get_enum(&(op->value));
2366
2367         if (i == 0) {
2368             result = cur;
2369         } else if ((result == MEDIA_ACCESS_MODE_READ_ONLY &&
2370                     cur == MEDIA_ACCESS_MODE_WRITE_ONLY) ||
2371                    (result == MEDIA_ACCESS_MODE_WRITE_ONLY &&
2372                     cur == MEDIA_ACCESS_MODE_READ_ONLY)) {
2373             /* Invalid combination; one device can only read, other
2374                can only write. */
2375             success = FALSE;
2376             break;
2377         } else if (result == MEDIA_ACCESS_MODE_READ_ONLY ||
2378                    cur == MEDIA_ACCESS_MODE_READ_ONLY) {
2379             result = MEDIA_ACCESS_MODE_READ_ONLY;
2380         } else if (result == MEDIA_ACCESS_MODE_WRITE_ONLY ||
2381                    cur == MEDIA_ACCESS_MODE_WRITE_ONLY) {
2382             result = MEDIA_ACCESS_MODE_WRITE_ONLY;
2383         } else if (result == MEDIA_ACCESS_MODE_WORM ||
2384                    cur == MEDIA_ACCESS_MODE_WORM) {
2385             result = MEDIA_ACCESS_MODE_WORM;
2386         } else if (result == MEDIA_ACCESS_MODE_READ_WRITE &&
2387                    cur == MEDIA_ACCESS_MODE_READ_WRITE) {
2388             result = MEDIA_ACCESS_MODE_READ_WRITE;
2389         } else {
2390             success = FALSE;
2391             break;
2392         }
2393     }
2394
2395     g_ptr_array_free_full(ops);
2396
2397     if (success) {
2398         if (val) {
2399             g_value_unset_init(val, MEDIA_ACCESS_MODE_TYPE);
2400             g_value_set_enum(val, result);
2401         }
2402
2403         if (surety)
2404             *surety = PROPERTY_SURETY_GOOD;
2405
2406         if (source)
2407             *source = PROPERTY_SOURCE_DETECTED;
2408     }
2409
2410     return success;
2411 }
2412
2413 static gboolean
2414 property_get_free_space_fn(Device *dself,
2415     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2416     PropertySurety *surety, PropertySource *source)
2417 {
2418     RaitDevice *self = RAIT_DEVICE(dself);
2419     QualifiedSize result;
2420     guint i;
2421     GPtrArray * ops;
2422     guint data_children;
2423
2424     ops = make_property_op_array(self, PROPERTY_MEDIUM_ACCESS_TYPE, NULL, 0, 0);
2425     do_rait_child_ops(self, property_get_do_op, ops);
2426
2427     /* Find the minimal available space of any child, with some funny business
2428      * to deal with varying degrees of accuracy. */
2429     result.accuracy = SIZE_ACCURACY_UNKNOWN;
2430     result.bytes = 0;
2431     for (i = 0; i < ops->len; i ++) {
2432         QualifiedSize cur;
2433         PropertyOp * op = g_ptr_array_index(ops, i);
2434
2435         if (!op->base.result || G_VALUE_TYPE(&(op->value)) != QUALIFIED_SIZE_TYPE) {
2436             /* maybe this child can't tell us .. so this is just an estimate */
2437             if (result.accuracy == SIZE_ACCURACY_REAL)
2438                 result.accuracy = SIZE_ACCURACY_ESTIMATE;
2439
2440             continue;
2441         }
2442
2443         cur = *(QualifiedSize*)(g_value_get_boxed(&(op->value)));
2444
2445         if (result.accuracy != cur.accuracy) {
2446             result.accuracy = SIZE_ACCURACY_ESTIMATE;
2447         }
2448
2449         if (result.accuracy == SIZE_ACCURACY_UNKNOWN &&
2450             cur.accuracy != SIZE_ACCURACY_UNKNOWN) {
2451             result.bytes = cur.bytes;
2452         } else if (result.accuracy != SIZE_ACCURACY_UNKNOWN &&
2453                    cur.accuracy == SIZE_ACCURACY_UNKNOWN) {
2454             /* result.bytes unchanged. */
2455         } else {
2456             result.bytes = MIN(result.bytes, cur.bytes);
2457         }
2458     }
2459
2460     g_ptr_array_free_full(ops);
2461
2462     /* result contains the minimum size available on any child.  We
2463      * can use that space on each of our data children, so the total
2464      * is larger */
2465     find_simple_params(self, NULL, &data_children);
2466     result.bytes *= data_children;
2467
2468     if (val) {
2469         g_value_unset_init(val, QUALIFIED_SIZE_TYPE);
2470         g_value_set_boxed(val, &result);
2471     }
2472
2473     if (surety)
2474         *surety = (result.accuracy == SIZE_ACCURACY_UNKNOWN)?
2475                     PROPERTY_SURETY_BAD : PROPERTY_SURETY_GOOD;
2476
2477     if (source)
2478         *source = PROPERTY_SOURCE_DETECTED;
2479
2480     return TRUE;
2481 }
2482
2483 static gboolean
2484 property_get_max_volume_usage_fn(Device *dself,
2485     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2486     PropertySurety *surety, PropertySource *source)
2487 {
2488     RaitDevice *self = RAIT_DEVICE(dself);
2489     guint64 result;
2490     guint i;
2491     GPtrArray * ops;
2492     guint data_children;
2493
2494     ops = make_property_op_array(self, PROPERTY_MAX_VOLUME_USAGE, NULL, 0, 0);
2495     do_rait_child_ops(self, property_get_do_op, ops);
2496
2497     /* look for the smallest value that is set */
2498     result = 0;
2499     for (i = 0; i < ops->len; i ++) {
2500         guint64 cur;
2501         PropertyOp * op = g_ptr_array_index(ops, i);
2502
2503         if (!op->base.result || !G_VALUE_HOLDS_UINT64(&(op->value))) {
2504             continue; /* ignore children without this property */
2505         }
2506
2507         cur = g_value_get_uint64(&(op->value));
2508
2509         result = MIN(cur, result);
2510     }
2511
2512     g_ptr_array_free_full(ops);
2513
2514     if (result) {
2515         /* result contains the minimum usage on any child.  We can use that space
2516          * on each of our data children, so the total is larger */
2517         find_simple_params(self, NULL, &data_children);
2518         result *= data_children;
2519
2520         if (val) {
2521             g_value_unset_init(val, G_TYPE_UINT64);
2522             g_value_set_uint64(val, result);
2523         }
2524
2525         if (surety)
2526             *surety = PROPERTY_SURETY_GOOD;
2527
2528         if (source)
2529             *source = PROPERTY_SOURCE_DETECTED;
2530
2531         return TRUE;
2532     } else {
2533         /* no result from any children, so we effectively don't have this property */
2534         return FALSE;
2535     }
2536 }
2537
2538 static gboolean
2539 property_set_max_volume_usage_fn(Device *dself,
2540     DevicePropertyBase *base G_GNUC_UNUSED, GValue *val,
2541     PropertySurety surety, PropertySource source)
2542 {
2543     RaitDevice *self = RAIT_DEVICE(dself);
2544     guint64 parent_usage;
2545     guint64 child_usage;
2546     GValue child_val;
2547     guint i;
2548     gboolean success;
2549     GPtrArray * ops;
2550     guint data_children;
2551
2552     parent_usage = g_value_get_uint64(val);
2553     find_simple_params(self, NULL, &data_children);
2554
2555     child_usage = parent_usage / data_children;
2556
2557     bzero(&child_val, sizeof(child_val));
2558     g_value_init(&child_val, G_TYPE_UINT64);
2559     g_value_set_uint64(&child_val, child_usage);
2560
2561     ops = make_property_op_array(self, PROPERTY_MAX_VOLUME_USAGE,
2562                                 &child_val, surety, source);
2563     do_rait_child_ops(self, property_set_do_op, ops);
2564
2565     /* if any of the kids succeeded, then we did too */
2566     success = FALSE;
2567     for (i = 0; i < ops->len; i ++) {
2568         PropertyOp * op = g_ptr_array_index(ops, i);
2569
2570         if (op->base.result) {
2571             success = TRUE;
2572             break;
2573         }
2574     }
2575
2576     g_ptr_array_free_full(ops);
2577
2578     return success;
2579 }
2580
2581 typedef struct {
2582     GenericOp base;
2583     guint filenum;
2584 } RecycleFileOp;
2585
2586 /* A GFunc */
2587 static void recycle_file_do_op(gpointer data,
2588                                gpointer user_data G_GNUC_UNUSED) {
2589     RecycleFileOp * op = data;
2590     op->base.result =
2591         GINT_TO_POINTER(device_recycle_file(op->base.child, op->filenum));
2592 }
2593
2594 static gboolean
2595 rait_device_recycle_file (Device * dself, guint filenum) {
2596     GPtrArray * ops;
2597     guint i;
2598     gboolean success;
2599
2600     RaitDevice * self = RAIT_DEVICE(dself);
2601
2602     if (rait_device_in_error(self)) return FALSE;
2603
2604     ops = g_ptr_array_sized_new(self->private->children->len);
2605     for (i = 0; i < self->private->children->len; i ++) {
2606         RecycleFileOp * op;
2607         op = g_new(RecycleFileOp, 1);
2608         op->base.child = g_ptr_array_index(self->private->children, i);
2609         op->filenum = filenum;
2610         g_ptr_array_add(ops, op);
2611     }
2612
2613     do_rait_child_ops(self, recycle_file_do_op, ops);
2614
2615     success = g_ptr_array_and(ops, extract_boolean_generic_op);
2616
2617     g_ptr_array_free_full(ops);
2618
2619     if (!success) {
2620         /* TODO: be more specific here */
2621         device_set_error(dself,
2622             stralloc(_("One or more devices failed to recycle_file")),
2623             DEVICE_STATUS_DEVICE_ERROR);
2624         return FALSE;
2625     }
2626     return TRUE;
2627 }
2628
2629 /* GFunc */
2630 static void finish_do_op(gpointer data, gpointer user_data G_GNUC_UNUSED) {
2631     GenericOp * op = data;
2632     op->result = GINT_TO_POINTER(device_finish(op->child));
2633 }
2634
2635 static gboolean
2636 rait_device_finish (Device * self) {
2637     GPtrArray * ops;
2638     gboolean success;
2639
2640     if (rait_device_in_error(self)) return FALSE;
2641
2642     ops = make_generic_boolean_op_array(RAIT_DEVICE(self));
2643
2644     do_rait_child_ops(RAIT_DEVICE(self), finish_do_op, ops);
2645
2646     success = g_ptr_array_and(ops, extract_boolean_generic_op);
2647
2648     g_ptr_array_free_full(ops);
2649
2650     self->access_mode = ACCESS_NULL;
2651
2652     if (!success)
2653         return FALSE;
2654
2655     return TRUE;
2656 }
2657
2658 static Device *
2659 rait_device_factory (char * device_name, char * device_type, char * device_node) {
2660     Device * rval;
2661     g_assert(0 == strcmp(device_type, "rait"));
2662     rval = DEVICE(g_object_new(TYPE_RAIT_DEVICE, NULL));
2663     device_open_device(rval, device_name, device_type, device_node);
2664     return rval;
2665 }
2666
2667 void
2668 rait_device_register (void) {
2669     static const char * device_prefix_list[] = {"rait", NULL};
2670     register_device(rait_device_factory, device_prefix_list);
2671 }