Imported Upstream version 3.3.0
[debian/amanda] / device-src / s3-device.c
1 /*
2  * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  *
17  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 /* An S3 device uses Amazon's S3 service (http://www.amazon.com/s3) to store
22  * data.  It stores data in keys named with a user-specified prefix, inside a
23  * user-specified bucket.  Data is stored in the form of numbered (large)
24  * blocks.
25  */
26
27 #include "amanda.h"
28 #include <string.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <unistd.h>
32 #include <dirent.h>
33 #include <regex.h>
34 #include <time.h>
35 #include "util.h"
36 #include "conffile.h"
37 #include "device.h"
38 #include "s3.h"
39 #include <curl/curl.h>
40 #ifdef HAVE_OPENSSL_HMAC_H
41 # include <openssl/hmac.h>
42 #else
43 # ifdef HAVE_CRYPTO_HMAC_H
44 #  include <crypto/hmac.h>
45 # else
46 #  ifdef HAVE_HMAC_H
47 #   include <hmac.h>
48 #  endif
49 # endif
50 #endif
51
52 /*
53  * Type checking and casting macros
54  */
55 #define TYPE_S3_DEVICE  (s3_device_get_type())
56 #define S3_DEVICE(obj)  G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device)
57 #define S3_DEVICE_CONST(obj)    G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device const)
58 #define S3_DEVICE_CLASS(klass)  G_TYPE_CHECK_CLASS_CAST((klass), s3_device_get_type(), S3DeviceClass)
59 #define IS_S3_DEVICE(obj)       G_TYPE_CHECK_INSTANCE_TYPE((obj), s3_device_get_type ())
60
61 #define S3_DEVICE_GET_CLASS(obj)        G_TYPE_INSTANCE_GET_CLASS((obj), s3_device_get_type(), S3DeviceClass)
62 static GType    s3_device_get_type      (void);
63
64 /*
65  * Main object structure
66  */
67 typedef struct _S3MetadataFile S3MetadataFile;
68
69 typedef struct _S3_by_thread S3_by_thread;
70 struct _S3_by_thread {
71     S3Handle * volatile          s3;
72     CurlBuffer volatile          curl_buffer;
73     guint volatile               buffer_len;
74     int volatile                 idle;
75     int volatile                 eof;
76     int volatile                 done;
77     char volatile * volatile     filename;
78     DeviceStatusFlags volatile   errflags;      /* device_status */
79     char volatile * volatile     errmsg;        /* device error message */
80 };
81
82 typedef struct _S3Device S3Device;
83 struct _S3Device {
84     Device __parent__;
85
86     /* The "easy" curl handle we use to access Amazon S3 */
87     S3_by_thread *s3t;
88
89     /* S3 access information */
90     char *bucket;
91     char *prefix;
92
93     /* The S3 access information. */
94     char *secret_key;
95     char *access_key;
96     char *user_token;
97
98     char *bucket_location;
99     char *storage_class;
100     char *host;
101     char *service_path;
102
103     char *ca_info;
104
105     /* a cache for unsuccessful reads (where we get the file but the caller
106      * doesn't have space for it or doesn't want it), where we expect the
107      * next call will request the same file.
108      */
109     char *cached_buf;
110     char *cached_key;
111     int cached_size;
112
113     /* Produce verbose output? */
114     gboolean verbose;
115
116     /* Use SSL? */
117     gboolean use_ssl;
118
119     /* Throttling */
120     guint64 max_send_speed;
121     guint64 max_recv_speed;
122
123     gboolean leom;
124     guint64 volume_bytes;
125     guint64 volume_limit;
126     gboolean enforce_volume_limit;
127     gboolean use_subdomain;
128
129     int          nb_threads;
130     int          nb_threads_backup;
131     int          nb_threads_recovery;
132     GThreadPool *thread_pool_write;
133     GThreadPool *thread_pool_read;
134     GCond       *thread_idle_cond;
135     GMutex      *thread_idle_mutex;
136     int          next_block_to_read;
137 };
138
139 /*
140  * Class definition
141  */
142 typedef struct _S3DeviceClass S3DeviceClass;
143 struct _S3DeviceClass {
144     DeviceClass __parent__;
145 };
146
147
148 /*
149  * Constants and static data
150  */
151
152 #define S3_DEVICE_NAME "s3"
153
154 /* Maximum key length as specified in the S3 documentation
155  * (*excluding* null terminator) */
156 #define S3_MAX_KEY_LENGTH 1024
157
158 /* Note: for compatability, min can only be decreased and max increased */
159 #define S3_DEVICE_MIN_BLOCK_SIZE 1024
160 #define S3_DEVICE_MAX_BLOCK_SIZE (100*1024*1024)
161 #define S3_DEVICE_DEFAULT_BLOCK_SIZE (10*1024*1024)
162 #define EOM_EARLY_WARNING_ZONE_BLOCKS 4
163
164 /* This goes in lieu of file number for metadata. */
165 #define SPECIAL_INFIX "special-"
166
167 /* pointer to the class of our parent */
168 static DeviceClass *parent_class = NULL;
169
170 /*
171  * device-specific properties
172  */
173
174 /* Authentication information for Amazon S3. Both of these are strings. */
175 static DevicePropertyBase device_property_s3_access_key;
176 static DevicePropertyBase device_property_s3_secret_key;
177 #define PROPERTY_S3_SECRET_KEY (device_property_s3_secret_key.ID)
178 #define PROPERTY_S3_ACCESS_KEY (device_property_s3_access_key.ID)
179
180 /* Host and path */
181 static DevicePropertyBase device_property_s3_host;
182 static DevicePropertyBase device_property_s3_service_path;
183 #define PROPERTY_S3_HOST (device_property_s3_host.ID)
184 #define PROPERTY_S3_SERVICE_PATH (device_property_s3_service_path.ID)
185
186 /* Same, but for S3 with DevPay. */
187 static DevicePropertyBase device_property_s3_user_token;
188 #define PROPERTY_S3_USER_TOKEN (device_property_s3_user_token.ID)
189
190 /* Location constraint for new buckets created on Amazon S3. */
191 static DevicePropertyBase device_property_s3_bucket_location;
192 #define PROPERTY_S3_BUCKET_LOCATION (device_property_s3_bucket_location.ID)
193
194 /* Storage class */
195 static DevicePropertyBase device_property_s3_storage_class;
196 #define PROPERTY_S3_STORAGE_CLASS (device_property_s3_storage_class.ID)
197
198 /* Path to certificate authority certificate */
199 static DevicePropertyBase device_property_ssl_ca_info;
200 #define PROPERTY_SSL_CA_INFO (device_property_ssl_ca_info.ID)
201
202 /* Whether to use SSL with Amazon S3. */
203 static DevicePropertyBase device_property_s3_ssl;
204 #define PROPERTY_S3_SSL (device_property_s3_ssl.ID)
205
206 /* Speed limits for sending and receiving */
207 static DevicePropertyBase device_property_max_send_speed;
208 static DevicePropertyBase device_property_max_recv_speed;
209 #define PROPERTY_MAX_SEND_SPEED (device_property_max_send_speed.ID)
210 #define PROPERTY_MAX_RECV_SPEED (device_property_max_recv_speed.ID)
211
212 /* Whether to use subdomain */
213 static DevicePropertyBase device_property_s3_subdomain;
214 #define PROPERTY_S3_SUBDOMAIN (device_property_s3_subdomain.ID)
215
216 /* Number of threads to use */
217 static DevicePropertyBase device_property_nb_threads_backup;
218 #define PROPERTY_NB_THREADS_BACKUP (device_property_nb_threads_backup.ID)
219 static DevicePropertyBase device_property_nb_threads_recovery;
220 #define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID)
221
222 /*
223  * prototypes
224  */
225
226 void s3_device_register(void);
227
228 /*
229  * utility functions */
230
231 /* Given file and block numbers, return an S3 key.
232  *
233  * @param self: the S3Device object
234  * @param file: the file number
235  * @param block: the block within that file
236  * @returns: a newly allocated string containing an S3 key.
237  */
238 static char *
239 file_and_block_to_key(S3Device *self,
240                       int file,
241                       guint64 block);
242
243 /* Given the name of a special file (such as 'tapestart'), generate
244  * the S3 key to use for that file.
245  *
246  * @param self: the S3Device object
247  * @param special_name: name of the special file
248  * @param file: a file number to include; omitted if -1
249  * @returns: a newly alocated string containing an S3 key.
250  */
251 static char *
252 special_file_to_key(S3Device *self,
253                     char *special_name,
254                     int file);
255 /* Write an amanda header file to S3.
256  *
257  * @param self: the S3Device object
258  * @param label: the volume label
259  * @param timestamp: the volume timestamp
260  */
261 static gboolean
262 write_amanda_header(S3Device *self,
263                     char *label,
264                     char * timestamp);
265
266 /* "Fast forward" this device to the end by looking up the largest file number
267  * present and setting the current file number one greater.
268  *
269  * @param self: the S3Device object
270  */
271 static gboolean
272 seek_to_end(S3Device *self);
273
274 /* Find the number of the last file that contains any data (even just a header).
275  *
276  * @param self: the S3Device object
277  * @returns: the last file, or -1 in event of an error
278  */
279 static int
280 find_last_file(S3Device *self);
281
282 /* Delete all blocks in the given file, including the filestart block
283  *
284  * @param self: the S3Device object
285  * @param file: the file to delete
286  */
287 static gboolean
288 delete_file(S3Device *self,
289             int file);
290
291
292 /* Delete all files in the given device
293  *
294  * @param self: the S3Device object
295  */
296 static gboolean
297 delete_all_files(S3Device *self);
298
299 /* Set up self->s3t as best as possible.
300  *
301  * The return value is TRUE iff self->s3t is useable.
302  *
303  * @param self: the S3Device object
304  * @returns: TRUE if the handle is set up
305  */
306 static gboolean
307 setup_handle(S3Device * self);
308
309 /*
310  * class mechanics */
311
312 static void
313 s3_device_init(S3Device * o);
314
315 static void
316 s3_device_class_init(S3DeviceClass * c);
317
318 static void
319 s3_device_finalize(GObject * o);
320
321 static Device*
322 s3_device_factory(char * device_name, char * device_type, char * device_node);
323
324 /*
325  * Property{Get,Set}Fns */
326
327 static gboolean s3_device_set_access_key_fn(Device *self,
328     DevicePropertyBase *base, GValue *val,
329     PropertySurety surety, PropertySource source);
330
331 static gboolean s3_device_set_secret_key_fn(Device *self,
332     DevicePropertyBase *base, GValue *val,
333     PropertySurety surety, PropertySource source);
334
335 static gboolean s3_device_set_user_token_fn(Device *self,
336     DevicePropertyBase *base, GValue *val,
337     PropertySurety surety, PropertySource source);
338
339 static gboolean s3_device_set_bucket_location_fn(Device *self,
340     DevicePropertyBase *base, GValue *val,
341     PropertySurety surety, PropertySource source);
342
343 static gboolean s3_device_set_storage_class_fn(Device *self,
344     DevicePropertyBase *base, GValue *val,
345     PropertySurety surety, PropertySource source);
346
347 static gboolean s3_device_set_ca_info_fn(Device *self,
348     DevicePropertyBase *base, GValue *val,
349     PropertySurety surety, PropertySource source);
350
351 static gboolean s3_device_set_verbose_fn(Device *self,
352     DevicePropertyBase *base, GValue *val,
353     PropertySurety surety, PropertySource source);
354
355 static gboolean s3_device_set_ssl_fn(Device *self,
356     DevicePropertyBase *base, GValue *val,
357     PropertySurety surety, PropertySource source);
358
359 static gboolean s3_device_set_max_send_speed_fn(Device *self,
360     DevicePropertyBase *base, GValue *val,
361     PropertySurety surety, PropertySource source);
362
363 static gboolean s3_device_set_max_recv_speed_fn(Device *self,
364     DevicePropertyBase *base, GValue *val,
365     PropertySurety surety, PropertySource source);
366
367 static gboolean s3_device_set_nb_threads_backup(Device *self,
368     DevicePropertyBase *base, GValue *val,
369     PropertySurety surety, PropertySource source);
370
371 static gboolean s3_device_set_nb_threads_recovery(Device *self,
372     DevicePropertyBase *base, GValue *val,
373     PropertySurety surety, PropertySource source);
374
375 static gboolean s3_device_set_max_volume_usage_fn(Device *p_self,
376     DevicePropertyBase *base, GValue *val,
377     PropertySurety surety, PropertySource source);
378
379 static gboolean property_set_leom_fn(Device *p_self,
380     DevicePropertyBase *base, GValue *val,
381     PropertySurety surety, PropertySource source);
382
383 static gboolean s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
384     DevicePropertyBase *base, GValue *val,
385     PropertySurety surety, PropertySource source);
386
387 static gboolean s3_device_set_use_subdomain_fn(Device *p_self,
388     DevicePropertyBase *base, GValue *val,
389     PropertySurety surety, PropertySource source);
390
391 static gboolean s3_device_set_host_fn(Device *p_self,
392     DevicePropertyBase *base, GValue *val,
393     PropertySurety surety, PropertySource source);
394
395 static gboolean s3_device_set_service_path_fn(Device *p_self,
396     DevicePropertyBase *base, GValue *val,
397     PropertySurety surety, PropertySource source);
398
399 static void s3_thread_read_block(gpointer thread_data,
400                                  gpointer data);
401 static void s3_thread_write_block(gpointer thread_data,
402                                   gpointer data);
403
404 /* Wait that all threads are done */
405 static void reset_thread(S3Device *self);
406
407 /*
408  * virtual functions */
409
410 static void
411 s3_device_open_device(Device *pself, char *device_name,
412                   char * device_type, char * device_node);
413
414 static DeviceStatusFlags s3_device_read_label(Device * self);
415
416 static gboolean
417 s3_device_start(Device * self,
418                 DeviceAccessMode mode,
419                 char * label,
420                 char * timestamp);
421
422 static gboolean
423 s3_device_finish(Device * self);
424
425 static gboolean
426 s3_device_start_file(Device * self,
427                      dumpfile_t * jobInfo);
428
429 static gboolean
430 s3_device_write_block(Device * self,
431                       guint size,
432                       gpointer data);
433
434 static gboolean
435 s3_device_finish_file(Device * self);
436
437 static dumpfile_t*
438 s3_device_seek_file(Device *pself,
439                     guint file);
440
441 static gboolean
442 s3_device_seek_block(Device *pself,
443                      guint64 block);
444
445 static int
446 s3_device_read_block(Device * pself,
447                      gpointer data,
448                      int *size_req);
449
450 static gboolean
451 s3_device_recycle_file(Device *pself,
452                        guint file);
453
454 static gboolean
455 s3_device_erase(Device *pself);
456
457 static gboolean
458 check_at_leom(S3Device *self,
459                 guint64 size);
460
461 static gboolean
462 check_at_peom(S3Device *self,
463                 guint64 size);
464
465 /*
466  * Private functions
467  */
468
469 static char *
470 file_and_block_to_key(S3Device *self,
471                       int file,
472                       guint64 block)
473 {
474     char *s3_key = g_strdup_printf("%sf%08x-b%016llx.data",
475                                    self->prefix, file, (long long unsigned int)block);
476     g_assert(strlen(s3_key) <= S3_MAX_KEY_LENGTH);
477     return s3_key;
478 }
479
480 static char *
481 special_file_to_key(S3Device *self,
482                     char *special_name,
483                     int file)
484 {
485     if (file == -1)
486         return g_strdup_printf("%s" SPECIAL_INFIX "%s", self->prefix, special_name);
487     else
488         return g_strdup_printf("%sf%08x-%s", self->prefix, file, special_name);
489 }
490
491 static gboolean
492 write_amanda_header(S3Device *self,
493                     char *label,
494                     char * timestamp)
495 {
496     CurlBuffer amanda_header = {NULL, 0, 0, 0};
497     char * key = NULL;
498     gboolean result;
499     dumpfile_t * dumpinfo = NULL;
500     Device *d_self = DEVICE(self);
501     size_t header_size;
502
503     /* build the header */
504     header_size = 0; /* no minimum size */
505     dumpinfo = make_tapestart_header(DEVICE(self), label, timestamp);
506     amanda_header.buffer = device_build_amanda_header(DEVICE(self), dumpinfo,
507         &header_size);
508     if (amanda_header.buffer == NULL) {
509         device_set_error(d_self,
510             stralloc(_("Amanda tapestart header won't fit in a single block!")),
511             DEVICE_STATUS_DEVICE_ERROR);
512         dumpfile_free(dumpinfo);
513         g_free(amanda_header.buffer);
514         return FALSE;
515     }
516
517     if(check_at_leom(self, header_size))
518         d_self->is_eom = TRUE;
519
520     if(check_at_peom(self, header_size)) {
521         d_self->is_eom = TRUE;
522         device_set_error(d_self,
523             stralloc(_("No space left on device")),
524             DEVICE_STATUS_DEVICE_ERROR);
525         g_free(amanda_header.buffer);
526         return FALSE;
527     }
528
529     /* write out the header and flush the uploads. */
530     key = special_file_to_key(self, "tapestart", -1);
531     g_assert(header_size < G_MAXUINT); /* for cast to guint */
532     amanda_header.buffer_len = (guint)header_size;
533     result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
534                        &amanda_header, NULL, NULL);
535     g_free(amanda_header.buffer);
536     g_free(key);
537
538     if (!result) {
539         device_set_error(d_self,
540             vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3t[0].s3)),
541             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
542         dumpfile_free(dumpinfo);
543     } else {
544         dumpfile_free(d_self->volume_header);
545         d_self->volume_header = dumpinfo;
546         self->volume_bytes += header_size;
547     }
548
549     return result;
550 }
551
552 static gboolean
553 seek_to_end(S3Device *self) {
554     int last_file;
555
556     Device *pself = DEVICE(self);
557
558     last_file = find_last_file(self);
559     if (last_file < 0)
560         return FALSE;
561
562     pself->file = last_file;
563
564     return TRUE;
565 }
566
567 /* Convert an object name into a file number, assuming the given prefix
568  * length. Returns -1 if the object name is invalid, or 0 if the object name
569  * is a "special" key. */
570 static int key_to_file(guint prefix_len, const char * key) {
571     int file;
572     int i;
573
574     /* skip the prefix */
575     if (strlen(key) <= prefix_len)
576         return -1;
577
578     key += prefix_len;
579
580     if (strncmp(key, SPECIAL_INFIX, strlen(SPECIAL_INFIX)) == 0) {
581         return 0;
582     }
583
584     /* check that key starts with 'f' */
585     if (key[0] != 'f')
586         return -1;
587     key++;
588
589     /* check that key is of the form "%08x-" */
590     for (i = 0; i < 8; i++) {
591         if (!(key[i] >= '0' && key[i] <= '9') &&
592             !(key[i] >= 'a' && key[i] <= 'f') &&
593             !(key[i] >= 'A' && key[i] <= 'F')) break;
594     }
595     if (key[i] != '-') return -1;
596     if (i < 8) return -1;
597
598     /* convert the file number */
599     errno = 0;
600     file = strtoul(key, NULL, 16);
601     if (errno != 0) {
602         g_warning(_("unparseable file number '%s'"), key);
603         return -1;
604     }
605
606     return file;
607 }
608
609 /* Find the number of the last file that contains any data (even just a header).
610  * Returns -1 in event of an error
611  */
612 static int
613 find_last_file(S3Device *self) {
614     gboolean result;
615     GSList *keys;
616     unsigned int prefix_len = strlen(self->prefix);
617     int last_file = 0;
618     Device *d_self = DEVICE(self);
619
620     /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
621     result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-", &keys, NULL);
622     if (!result) {
623         device_set_error(d_self,
624             vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
625             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
626         return -1;
627     }
628
629     for (; keys; keys = g_slist_remove(keys, keys->data)) {
630         int file = key_to_file(prefix_len, keys->data);
631
632         /* and if it's the last, keep it */
633         if (file > last_file)
634             last_file = file;
635     }
636
637     return last_file;
638 }
639
640 /* Find the number of the file following the requested one, if any.
641  * Returns 0 if there is no such file or -1 in event of an error
642  */
643 static int
644 find_next_file(S3Device *self, int last_file) {
645     gboolean result;
646     GSList *keys;
647     unsigned int prefix_len = strlen(self->prefix);
648     int next_file = 0;
649     Device *d_self = DEVICE(self);
650
651     /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
652     result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-",
653                           &keys, NULL);
654     if (!result) {
655         device_set_error(d_self,
656             vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
657             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
658         return -1;
659     }
660
661     for (; keys; keys = g_slist_remove(keys, keys->data)) {
662         int file;
663
664         file = key_to_file(prefix_len, (char*)keys->data);
665
666         if (file < 0) {
667             /* Set this in case we don't find a next file; this is not a
668              * hard error, so if we can find a next file we'll return that
669              * instead. */
670             next_file = -1;
671         }
672
673         if (file < next_file && file > last_file) {
674             next_file = file;
675         }
676     }
677
678     return next_file;
679 }
680
681 static gboolean
682 delete_file(S3Device *self,
683             int file)
684 {
685     gboolean result;
686     GSList *keys;
687     guint64 total_size = 0;
688     char *my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
689     Device *d_self = DEVICE(self);
690
691     result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL, &keys,
692                           &total_size);
693     if (!result) {
694         device_set_error(d_self,
695             vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
696             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
697         return FALSE;
698     }
699
700     /* this will likely be a *lot* of keys */
701     for (; keys; keys = g_slist_remove(keys, keys->data)) {
702         if (self->verbose) g_debug(_("Deleting %s"), (char*)keys->data);
703         if (!s3_delete(self->s3t[0].s3, self->bucket, keys->data)) {
704             device_set_error(d_self,
705                 vstrallocf(_("While deleting key '%s': %s"),
706                             (char*)keys->data, s3_strerror(self->s3t[0].s3)),
707                 DEVICE_STATUS_DEVICE_ERROR);
708             g_slist_free(keys);
709             return FALSE;
710         }
711     }
712     self->volume_bytes = total_size;
713
714     return TRUE;
715 }
716
717 static gboolean
718 delete_all_files(S3Device *self)
719 {
720     int file, last_file;
721
722     /*
723      * Note: this has to be allowed to retry for a while because the bucket
724      * may have been created and not yet appeared
725      */
726     last_file = find_last_file(self);
727     if (last_file < 0) {
728         guint response_code;
729         s3_error_code_t s3_error_code;
730         s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
731
732         /*
733          * if the bucket doesn't exist, it doesn't conatin any files,
734          * so the operation is a success
735          */
736         if ((response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket)) {
737             /* find_last_file set an error; clear it */
738             device_set_error(DEVICE(self), NULL, DEVICE_STATUS_SUCCESS);
739             return TRUE;
740         } else {
741             /* find_last_file already set the error */
742             return FALSE;
743         }
744     }
745
746     for (file = 1; file <= last_file; file++) {
747         if (!delete_file(self, file))
748             /* delete_file already set our error message */
749             return FALSE;
750     }
751
752     return TRUE;
753 }
754
755 /*
756  * Class mechanics
757  */
758
759 void
760 s3_device_register(void)
761 {
762     static const char * device_prefix_list[] = { S3_DEVICE_NAME, NULL };
763     g_assert(s3_init());
764
765     /* set up our properties */
766     device_property_fill_and_register(&device_property_s3_secret_key,
767                                       G_TYPE_STRING, "s3_secret_key",
768        "Secret access key to authenticate with Amazon S3");
769     device_property_fill_and_register(&device_property_s3_access_key,
770                                       G_TYPE_STRING, "s3_access_key",
771        "Access key ID to authenticate with Amazon S3");
772     device_property_fill_and_register(&device_property_s3_host,
773                                       G_TYPE_STRING, "s3_host",
774        "hostname:port of the server");
775     device_property_fill_and_register(&device_property_s3_service_path,
776                                       G_TYPE_STRING, "s3_service_path",
777        "path to add in the url");
778     device_property_fill_and_register(&device_property_s3_user_token,
779                                       G_TYPE_STRING, "s3_user_token",
780        "User token for authentication Amazon devpay requests");
781     device_property_fill_and_register(&device_property_s3_bucket_location,
782                                       G_TYPE_STRING, "s3_bucket_location",
783        "Location constraint for buckets on Amazon S3");
784     device_property_fill_and_register(&device_property_s3_storage_class,
785                                       G_TYPE_STRING, "s3_storage_class",
786        "Storage class as specified by Amazon (STANDARD or REDUCED_REDUNDANCY)");
787     device_property_fill_and_register(&device_property_ssl_ca_info,
788                                       G_TYPE_STRING, "ssl_ca_info",
789        "Path to certificate authority certificate");
790     device_property_fill_and_register(&device_property_s3_ssl,
791                                       G_TYPE_BOOLEAN, "s3_ssl",
792        "Whether to use SSL with Amazon S3");
793     device_property_fill_and_register(&device_property_s3_subdomain,
794                                       G_TYPE_BOOLEAN, "s3_subdomain",
795        "Whether to use subdomain");
796     device_property_fill_and_register(&device_property_max_send_speed,
797                                       G_TYPE_UINT64, "max_send_speed",
798        "Maximum average upload speed (bytes/sec)");
799     device_property_fill_and_register(&device_property_max_recv_speed,
800                                       G_TYPE_UINT64, "max_recv_speed",
801        "Maximum average download speed (bytes/sec)");
802     device_property_fill_and_register(&device_property_nb_threads_backup,
803                                       G_TYPE_UINT64, "nb_threads_backup",
804        "Number of writer thread");
805     device_property_fill_and_register(&device_property_nb_threads_recovery,
806                                       G_TYPE_UINT64, "nb_threads_recovery",
807        "Number of reader thread");
808
809     /* register the device itself */
810     register_device(s3_device_factory, device_prefix_list);
811 }
812
813 static GType
814 s3_device_get_type(void)
815 {
816     static GType type = 0;
817
818     if G_UNLIKELY(type == 0) {
819         static const GTypeInfo info = {
820             sizeof (S3DeviceClass),
821             (GBaseInitFunc) NULL,
822             (GBaseFinalizeFunc) NULL,
823             (GClassInitFunc) s3_device_class_init,
824             (GClassFinalizeFunc) NULL,
825             NULL /* class_data */,
826             sizeof (S3Device),
827             0 /* n_preallocs */,
828             (GInstanceInitFunc) s3_device_init,
829             NULL
830         };
831
832         type = g_type_register_static (TYPE_DEVICE, "S3Device", &info,
833                                        (GTypeFlags)0);
834     }
835
836     return type;
837 }
838
839 static void
840 s3_device_init(S3Device * self)
841 {
842     Device * dself = DEVICE(self);
843     GValue response;
844
845     self->volume_bytes = 0;
846     self->volume_limit = 0;
847     self->leom = TRUE;
848     self->enforce_volume_limit = FALSE;
849     self->use_subdomain = FALSE;
850     self->nb_threads = 1;
851     self->nb_threads_backup = 1;
852     self->nb_threads_recovery = 1;
853     self->thread_pool_write = NULL;
854     self->thread_pool_read = NULL;
855     self->thread_idle_cond = NULL;
856     self->thread_idle_mutex = NULL;
857
858     /* Register property values
859      * Note: Some aren't added until s3_device_open_device()
860      */
861     bzero(&response, sizeof(response));
862
863     g_value_init(&response, CONCURRENCY_PARADIGM_TYPE);
864     g_value_set_enum(&response, CONCURRENCY_PARADIGM_SHARED_READ);
865     device_set_simple_property(dself, PROPERTY_CONCURRENCY,
866             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
867     g_value_unset(&response);
868
869     g_value_init(&response, STREAMING_REQUIREMENT_TYPE);
870     g_value_set_enum(&response, STREAMING_REQUIREMENT_NONE);
871     device_set_simple_property(dself, PROPERTY_STREAMING,
872             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
873     g_value_unset(&response);
874
875     g_value_init(&response, G_TYPE_BOOLEAN);
876     g_value_set_boolean(&response, TRUE);
877     device_set_simple_property(dself, PROPERTY_APPENDABLE,
878             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
879     g_value_unset(&response);
880
881     g_value_init(&response, G_TYPE_BOOLEAN);
882     g_value_set_boolean(&response, TRUE);
883     device_set_simple_property(dself, PROPERTY_PARTIAL_DELETION,
884             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
885     g_value_unset(&response);
886
887     g_value_init(&response, G_TYPE_BOOLEAN);
888     g_value_set_boolean(&response, TRUE);
889     device_set_simple_property(dself, PROPERTY_FULL_DELETION,
890             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
891     g_value_unset(&response);
892
893     g_value_init(&response, G_TYPE_BOOLEAN);
894     g_value_set_boolean(&response, TRUE); /* well, there *is* no EOM on S3 .. */
895     device_set_simple_property(dself, PROPERTY_LEOM,
896             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
897     g_value_unset(&response);
898
899     g_value_init(&response, G_TYPE_BOOLEAN);
900     g_value_set_boolean(&response, FALSE);
901     device_set_simple_property(dself, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
902             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
903     g_value_unset(&response);
904
905     g_value_init(&response, G_TYPE_BOOLEAN);
906     g_value_set_boolean(&response, FALSE);
907     device_set_simple_property(dself, PROPERTY_S3_SUBDOMAIN,
908             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
909     g_value_unset(&response);
910
911     g_value_init(&response, G_TYPE_BOOLEAN);
912     g_value_set_boolean(&response, FALSE);
913     device_set_simple_property(dself, PROPERTY_COMPRESSION,
914             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
915     g_value_unset(&response);
916
917     g_value_init(&response, MEDIA_ACCESS_MODE_TYPE);
918     g_value_set_enum(&response, MEDIA_ACCESS_MODE_READ_WRITE);
919     device_set_simple_property(dself, PROPERTY_MEDIUM_ACCESS_TYPE,
920             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
921     g_value_unset(&response);
922
923 }
924
925 static void
926 s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED)
927 {
928     GObjectClass *g_object_class = (GObjectClass*) c;
929     DeviceClass *device_class = (DeviceClass *)c;
930
931     parent_class = g_type_class_ref (TYPE_DEVICE);
932
933     device_class->open_device = s3_device_open_device;
934     device_class->read_label = s3_device_read_label;
935     device_class->start = s3_device_start;
936     device_class->finish = s3_device_finish;
937
938     device_class->start_file = s3_device_start_file;
939     device_class->write_block = s3_device_write_block;
940     device_class->finish_file = s3_device_finish_file;
941
942     device_class->seek_file = s3_device_seek_file;
943     device_class->seek_block = s3_device_seek_block;
944     device_class->read_block = s3_device_read_block;
945     device_class->recycle_file = s3_device_recycle_file;
946
947     device_class->erase = s3_device_erase;
948
949     g_object_class->finalize = s3_device_finalize;
950
951     device_class_register_property(device_class, PROPERTY_S3_ACCESS_KEY,
952             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
953             device_simple_property_get_fn,
954             s3_device_set_access_key_fn);
955
956     device_class_register_property(device_class, PROPERTY_S3_SECRET_KEY,
957             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
958             device_simple_property_get_fn,
959             s3_device_set_secret_key_fn);
960
961     device_class_register_property(device_class, PROPERTY_S3_HOST,
962             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
963             device_simple_property_get_fn,
964             s3_device_set_host_fn);
965
966     device_class_register_property(device_class, PROPERTY_S3_SERVICE_PATH,
967             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
968             device_simple_property_get_fn,
969             s3_device_set_service_path_fn);
970
971     device_class_register_property(device_class, PROPERTY_S3_USER_TOKEN,
972             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
973             device_simple_property_get_fn,
974             s3_device_set_user_token_fn);
975
976     device_class_register_property(device_class, PROPERTY_S3_BUCKET_LOCATION,
977             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
978             device_simple_property_get_fn,
979             s3_device_set_bucket_location_fn);
980
981     device_class_register_property(device_class, PROPERTY_S3_STORAGE_CLASS,
982             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
983             device_simple_property_get_fn,
984             s3_device_set_storage_class_fn);
985
986     device_class_register_property(device_class, PROPERTY_SSL_CA_INFO,
987             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
988             device_simple_property_get_fn,
989             s3_device_set_ca_info_fn);
990
991     device_class_register_property(device_class, PROPERTY_VERBOSE,
992             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
993             device_simple_property_get_fn,
994             s3_device_set_verbose_fn);
995
996     device_class_register_property(device_class, PROPERTY_S3_SSL,
997             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
998             device_simple_property_get_fn,
999             s3_device_set_ssl_fn);
1000
1001     device_class_register_property(device_class, PROPERTY_MAX_SEND_SPEED,
1002             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1003             device_simple_property_get_fn,
1004             s3_device_set_max_send_speed_fn);
1005
1006     device_class_register_property(device_class, PROPERTY_MAX_RECV_SPEED,
1007             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1008             device_simple_property_get_fn,
1009             s3_device_set_max_recv_speed_fn);
1010
1011     device_class_register_property(device_class, PROPERTY_NB_THREADS_BACKUP,
1012             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1013             device_simple_property_get_fn,
1014             s3_device_set_nb_threads_backup);
1015
1016     device_class_register_property(device_class, PROPERTY_NB_THREADS_RECOVERY,
1017             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1018             device_simple_property_get_fn,
1019             s3_device_set_nb_threads_recovery);
1020
1021     device_class_register_property(device_class, PROPERTY_COMPRESSION,
1022             PROPERTY_ACCESS_GET_MASK,
1023             device_simple_property_get_fn,
1024             NULL);
1025
1026     device_class_register_property(device_class, PROPERTY_LEOM,
1027             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1028             device_simple_property_get_fn,
1029             property_set_leom_fn);
1030
1031     device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
1032             (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1033                 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1034             device_simple_property_get_fn,
1035             s3_device_set_max_volume_usage_fn);
1036
1037     device_class_register_property(device_class, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
1038             (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1039                 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1040             device_simple_property_get_fn,
1041             s3_device_set_enforce_max_volume_usage_fn);
1042
1043     device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN,
1044             (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1045                 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1046             device_simple_property_get_fn,
1047             s3_device_set_use_subdomain_fn);
1048 }
1049
1050 static gboolean
1051 s3_device_set_access_key_fn(Device *p_self, DevicePropertyBase *base,
1052     GValue *val, PropertySurety surety, PropertySource source)
1053 {
1054     S3Device *self = S3_DEVICE(p_self);
1055
1056     amfree(self->access_key);
1057     self->access_key = g_value_dup_string(val);
1058     device_clear_volume_details(p_self);
1059
1060     return device_simple_property_set_fn(p_self, base, val, surety, source);
1061 }
1062
1063 static gboolean
1064 s3_device_set_secret_key_fn(Device *p_self, DevicePropertyBase *base,
1065     GValue *val, PropertySurety surety, PropertySource source)
1066 {
1067     S3Device *self = S3_DEVICE(p_self);
1068
1069     amfree(self->secret_key);
1070     self->secret_key = g_value_dup_string(val);
1071     device_clear_volume_details(p_self);
1072
1073     return device_simple_property_set_fn(p_self, base, val, surety, source);
1074 }
1075
1076 static gboolean
1077 s3_device_set_host_fn(Device *p_self,
1078     DevicePropertyBase *base, GValue *val,
1079     PropertySurety surety, PropertySource source)
1080 {
1081     S3Device *self = S3_DEVICE(p_self);
1082
1083     amfree(self->host);
1084     self->host = g_value_dup_string(val);
1085     device_clear_volume_details(p_self);
1086
1087     return device_simple_property_set_fn(p_self, base, val, surety, source);
1088 }
1089
1090 static gboolean
1091 s3_device_set_service_path_fn(Device *p_self,
1092     DevicePropertyBase *base, GValue *val,
1093     PropertySurety surety, PropertySource source)
1094 {
1095     S3Device *self = S3_DEVICE(p_self);
1096
1097     amfree(self->service_path);
1098     self->service_path = g_value_dup_string(val);
1099     device_clear_volume_details(p_self);
1100
1101     return device_simple_property_set_fn(p_self, base, val, surety, source);
1102 }
1103
1104 static gboolean
1105 s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base,
1106     GValue *val, PropertySurety surety, PropertySource source)
1107 {
1108     S3Device *self = S3_DEVICE(p_self);
1109
1110     amfree(self->user_token);
1111     self->user_token = g_value_dup_string(val);
1112     device_clear_volume_details(p_self);
1113
1114     return device_simple_property_set_fn(p_self, base, val, surety, source);
1115 }
1116
1117 static gboolean
1118 s3_device_set_bucket_location_fn(Device *p_self, DevicePropertyBase *base,
1119     GValue *val, PropertySurety surety, PropertySource source)
1120 {
1121     S3Device *self = S3_DEVICE(p_self);
1122     char *str_val = g_value_dup_string(val);
1123
1124     if (str_val[0] && self->use_ssl && !s3_curl_location_compat()) {
1125         device_set_error(p_self, stralloc(_(
1126                 "Location constraint given for Amazon S3 bucket, "
1127                 "but libcurl is too old support wildcard certificates.")),
1128             DEVICE_STATUS_DEVICE_ERROR);
1129         goto fail;
1130     }
1131
1132     if (str_val[0] && !s3_bucket_location_compat(self->bucket)) {
1133         device_set_error(p_self, g_strdup_printf(_(
1134                 "Location constraint given for Amazon S3 bucket, "
1135                 "but the bucket name (%s) is not usable as a subdomain."),
1136                 self->bucket),
1137             DEVICE_STATUS_DEVICE_ERROR);
1138         goto fail;
1139     }
1140
1141     amfree(self->bucket_location);
1142     self->bucket_location = str_val;
1143     device_clear_volume_details(p_self);
1144
1145     return device_simple_property_set_fn(p_self, base, val, surety, source);
1146 fail:
1147     g_free(str_val);
1148     return FALSE;
1149 }
1150
1151 static gboolean
1152 s3_device_set_storage_class_fn(Device *p_self, DevicePropertyBase *base,
1153     GValue *val, PropertySurety surety, PropertySource source)
1154 {
1155     S3Device *self = S3_DEVICE(p_self);
1156     char *str_val = g_value_dup_string(val);
1157
1158     amfree(self->storage_class);
1159     self->storage_class = str_val;
1160     device_clear_volume_details(p_self);
1161
1162     return device_simple_property_set_fn(p_self, base, val, surety, source);
1163 }
1164
1165 static gboolean
1166 s3_device_set_ca_info_fn(Device *p_self, DevicePropertyBase *base,
1167     GValue *val, PropertySurety surety, PropertySource source)
1168 {
1169     S3Device *self = S3_DEVICE(p_self);
1170
1171     amfree(self->ca_info);
1172     self->ca_info = g_value_dup_string(val);
1173     device_clear_volume_details(p_self);
1174
1175     return device_simple_property_set_fn(p_self, base, val, surety, source);
1176 }
1177
1178 static gboolean
1179 s3_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base,
1180     GValue *val, PropertySurety surety, PropertySource source)
1181 {
1182     S3Device *self = S3_DEVICE(p_self);
1183     int       thread;
1184
1185     self->verbose = g_value_get_boolean(val);
1186     /* Our S3 handle may not yet have been instantiated; if so, it will
1187      * get the proper verbose setting when it is created */
1188     if (self->s3t) {
1189         for (thread = 0; thread < self->nb_threads; thread++) {
1190             if (self->s3t[thread].s3)
1191                 s3_verbose(self->s3t[thread].s3, self->verbose);
1192         }
1193     }
1194
1195     return device_simple_property_set_fn(p_self, base, val, surety, source);
1196 }
1197
1198 static gboolean
1199 s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base,
1200     GValue *val, PropertySurety surety, PropertySource source)
1201 {
1202     S3Device *self = S3_DEVICE(p_self);
1203     gboolean new_val;
1204     int      thread;
1205
1206     new_val = g_value_get_boolean(val);
1207     /* Our S3 handle may not yet have been instantiated; if so, it will
1208      * get the proper use_ssl setting when it is created */
1209     if (self->s3t) {
1210         for (thread = 0; thread < self->nb_threads; thread++) {
1211             if (self->s3t[thread].s3 && !s3_use_ssl(self->s3t[thread].s3, new_val)) {
1212                 device_set_error(p_self, g_strdup_printf(_(
1213                         "Error setting S3 SSL/TLS use "
1214                         "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
1215                     DEVICE_STATUS_DEVICE_ERROR);
1216                 return FALSE;
1217             }
1218         }
1219     }
1220     self->use_ssl = new_val;
1221
1222     return device_simple_property_set_fn(p_self, base, val, surety, source);
1223 }
1224
1225 static gboolean
1226 s3_device_set_max_send_speed_fn(Device *p_self,
1227     DevicePropertyBase *base, GValue *val,
1228     PropertySurety surety, PropertySource source)
1229 {
1230     S3Device *self = S3_DEVICE(p_self);
1231     guint64 new_val;
1232     int     thread;
1233
1234     new_val = g_value_get_uint64(val);
1235     if (self->s3t) {
1236         for (thread = 0; thread < self->nb_threads; thread++) {
1237             if (self->s3t[thread].s3 && !s3_set_max_send_speed(self->s3t[thread].s3, new_val)) {
1238                 device_set_error(p_self,
1239                         g_strdup("Could not set S3 maximum send speed"),
1240                         DEVICE_STATUS_DEVICE_ERROR);
1241                 return FALSE;
1242             }
1243         }
1244     }
1245     self->max_send_speed = new_val;
1246
1247     return device_simple_property_set_fn(p_self, base, val, surety, source);
1248 }
1249
1250 static gboolean
1251 s3_device_set_max_recv_speed_fn(Device *p_self,
1252     DevicePropertyBase *base, GValue *val,
1253     PropertySurety surety, PropertySource source)
1254 {
1255     S3Device *self = S3_DEVICE(p_self);
1256     guint64 new_val;
1257     int     thread;
1258
1259     new_val = g_value_get_uint64(val);
1260     if (self->s3t) {
1261         for (thread = 0; thread < self->nb_threads; thread++) {
1262             if (self->s3t[thread].s3 &&
1263                 !s3_set_max_recv_speed(self->s3t[thread].s3, new_val)) {
1264                 device_set_error(p_self,
1265                         g_strdup("Could not set S3 maximum recv speed"),
1266                         DEVICE_STATUS_DEVICE_ERROR);
1267                 return FALSE;
1268             }
1269         }
1270     }
1271     self->max_recv_speed = new_val;
1272
1273     return device_simple_property_set_fn(p_self, base, val, surety, source);
1274 }
1275
1276 static gboolean
1277 s3_device_set_nb_threads_backup(Device *p_self,
1278     DevicePropertyBase *base, GValue *val,
1279     PropertySurety surety, PropertySource source)
1280 {
1281     S3Device *self = S3_DEVICE(p_self);
1282     guint64 new_val;
1283
1284     new_val = g_value_get_uint64(val);
1285     self->nb_threads_backup = new_val;
1286     if (self->nb_threads_backup > self->nb_threads) {
1287         self->nb_threads = self->nb_threads_backup;
1288     }
1289
1290     return device_simple_property_set_fn(p_self, base, val, surety, source);
1291 }
1292
1293 static gboolean
1294 s3_device_set_nb_threads_recovery(Device *p_self,
1295     DevicePropertyBase *base, GValue *val,
1296     PropertySurety surety, PropertySource source)
1297 {
1298     S3Device *self = S3_DEVICE(p_self);
1299     guint64 new_val;
1300
1301     new_val = g_value_get_uint64(val);
1302     self->nb_threads_recovery = new_val;
1303     if (self->nb_threads_recovery > self->nb_threads) {
1304         self->nb_threads = self->nb_threads_recovery;
1305     }
1306
1307     return device_simple_property_set_fn(p_self, base, val, surety, source);
1308 }
1309
1310 static gboolean
1311 s3_device_set_max_volume_usage_fn(Device *p_self,
1312     DevicePropertyBase *base, GValue *val,
1313     PropertySurety surety, PropertySource source)
1314 {
1315     S3Device *self = S3_DEVICE(p_self);
1316
1317     self->volume_limit = g_value_get_uint64(val);
1318
1319     return device_simple_property_set_fn(p_self, base, val, surety, source);
1320
1321 }
1322
1323 static gboolean
1324 s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
1325     DevicePropertyBase *base, GValue *val,
1326     PropertySurety surety, PropertySource source)
1327 {
1328     S3Device *self = S3_DEVICE(p_self);
1329
1330     self->enforce_volume_limit = g_value_get_boolean(val);
1331
1332     return device_simple_property_set_fn(p_self, base, val, surety, source);
1333
1334 }
1335
1336 static gboolean
1337 s3_device_set_use_subdomain_fn(Device *p_self,
1338     DevicePropertyBase *base, GValue *val,
1339     PropertySurety surety, PropertySource source)
1340 {
1341     S3Device *self = S3_DEVICE(p_self);
1342
1343     self->use_subdomain = g_value_get_boolean(val);
1344
1345     return device_simple_property_set_fn(p_self, base, val, surety, source);
1346 }
1347
1348 static gboolean
1349 property_set_leom_fn(Device *p_self,
1350     DevicePropertyBase *base, GValue *val,
1351     PropertySurety surety, PropertySource source)
1352 {
1353     S3Device *self = S3_DEVICE(p_self);
1354
1355     self->leom = g_value_get_boolean(val);
1356
1357     return device_simple_property_set_fn(p_self, base, val, surety, source);
1358 }
1359 static Device*
1360 s3_device_factory(char * device_name, char * device_type, char * device_node)
1361 {
1362     Device *rval;
1363     S3Device * s3_rval;
1364     g_assert(0 == strcmp(device_type, S3_DEVICE_NAME));
1365     rval = DEVICE(g_object_new(TYPE_S3_DEVICE, NULL));
1366     s3_rval = (S3Device*)rval;
1367
1368     device_open_device(rval, device_name, device_type, device_node);
1369     return rval;
1370 }
1371
1372 /*
1373  * Virtual function overrides
1374  */
1375
1376 static void
1377 s3_device_open_device(Device *pself, char *device_name,
1378                         char * device_type, char * device_node)
1379 {
1380     S3Device *self = S3_DEVICE(pself);
1381     char * name_colon;
1382     GValue tmp_value;
1383
1384     pself->min_block_size = S3_DEVICE_MIN_BLOCK_SIZE;
1385     pself->max_block_size = S3_DEVICE_MAX_BLOCK_SIZE;
1386     pself->block_size = S3_DEVICE_DEFAULT_BLOCK_SIZE;
1387
1388     /* Device name may be bucket/prefix, to support multiple volumes in a
1389      * single bucket. */
1390     name_colon = strchr(device_node, '/');
1391     if (name_colon == NULL) {
1392         self->bucket = g_strdup(device_node);
1393         self->prefix = g_strdup("");
1394     } else {
1395         self->bucket = g_strndup(device_node, name_colon - device_node);
1396         self->prefix = g_strdup(name_colon + 1);
1397     }
1398
1399     if (self->bucket == NULL || self->bucket[0] == '\0') {
1400         device_set_error(pself,
1401             vstrallocf(_("Empty bucket name in device %s"), device_name),
1402             DEVICE_STATUS_DEVICE_ERROR);
1403         amfree(self->bucket);
1404         amfree(self->prefix);
1405         return;
1406     }
1407
1408     g_debug(_("S3 driver using bucket '%s', prefix '%s'"), self->bucket, self->prefix);
1409
1410     /* default values */
1411     self->verbose = FALSE;
1412
1413     /* use SSL if available */
1414     self->use_ssl = s3_curl_supports_ssl();
1415     bzero(&tmp_value, sizeof(GValue));
1416     g_value_init(&tmp_value, G_TYPE_BOOLEAN);
1417     g_value_set_boolean(&tmp_value, self->use_ssl);
1418     device_set_simple_property(pself, device_property_s3_ssl.ID,
1419         &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
1420
1421     if (parent_class->open_device) {
1422         parent_class->open_device(pself, device_name, device_type, device_node);
1423     }
1424 }
1425
1426 static void s3_device_finalize(GObject * obj_self) {
1427     S3Device *self = S3_DEVICE (obj_self);
1428     int thread;
1429
1430     if(G_OBJECT_CLASS(parent_class)->finalize)
1431         (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
1432
1433     if (self->thread_pool_write) {
1434         g_thread_pool_free(self->thread_pool_write, 1, 1);
1435         self->thread_pool_write = NULL;
1436     }
1437     if (self->thread_pool_read) {
1438         g_thread_pool_free(self->thread_pool_read, 1, 1);
1439         self->thread_pool_read = NULL;
1440     }
1441     if (self->thread_idle_mutex) {
1442         g_mutex_free(self->thread_idle_mutex);
1443         self->thread_idle_mutex = NULL;
1444     }
1445     if (self->thread_idle_cond) {
1446         g_cond_free(self->thread_idle_cond);
1447         self->thread_idle_cond = NULL;
1448     }
1449     if (self->s3t) {
1450         for (thread = 0; thread < self->nb_threads; thread++) {
1451             if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3);
1452         }
1453         g_free(self->s3t);
1454     }
1455     if(self->bucket) g_free(self->bucket);
1456     if(self->prefix) g_free(self->prefix);
1457     if(self->access_key) g_free(self->access_key);
1458     if(self->secret_key) g_free(self->secret_key);
1459     if(self->host) g_free(self->host);
1460     if(self->service_path) g_free(self->service_path);
1461     if(self->user_token) g_free(self->user_token);
1462     if(self->bucket_location) g_free(self->bucket_location);
1463     if(self->storage_class) g_free(self->storage_class);
1464     if(self->ca_info) g_free(self->ca_info);
1465 }
1466
1467 static gboolean setup_handle(S3Device * self) {
1468     Device *d_self = DEVICE(self);
1469     int thread;
1470
1471     if (self->s3t == NULL) {
1472         self->s3t = g_new(S3_by_thread, self->nb_threads);
1473         if (self->s3t == NULL) {
1474             device_set_error(d_self,
1475                 stralloc(_("Can't allocate S3Handle array")),
1476                 DEVICE_STATUS_DEVICE_ERROR);
1477             return FALSE;
1478         }
1479         if (self->access_key == NULL || self->access_key[0] == '\0') {
1480             device_set_error(d_self,
1481                 stralloc(_("No Amazon access key specified")),
1482                 DEVICE_STATUS_DEVICE_ERROR);
1483             return FALSE;
1484         }
1485
1486         if (self->secret_key == NULL || self->secret_key[0] == '\0') {
1487             device_set_error(d_self,
1488                 stralloc(_("No Amazon secret key specified")),
1489                 DEVICE_STATUS_DEVICE_ERROR);
1490             return FALSE;
1491         }
1492
1493         if (!self->use_ssl && self->ca_info) {
1494             amfree(self->ca_info);
1495         }
1496
1497         for (thread = 0; thread < self->nb_threads; thread++) {
1498             self->s3t[thread].idle = 1;
1499             self->s3t[thread].done = 1;
1500             self->s3t[thread].eof = FALSE;
1501             self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
1502             self->s3t[thread].errmsg = NULL;
1503             self->s3t[thread].filename = NULL;
1504             self->s3t[thread].curl_buffer.buffer = NULL;
1505             self->s3t[thread].curl_buffer.buffer_len = 0;
1506             self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key,
1507                                            self->host, self->service_path,
1508                                            self->use_subdomain,
1509                                            self->user_token, self->bucket_location,
1510                                            self->storage_class, self->ca_info);
1511             if (self->s3t[thread].s3 == NULL) {
1512                 device_set_error(d_self,
1513                     stralloc(_("Internal error creating S3 handle")),
1514                     DEVICE_STATUS_DEVICE_ERROR);
1515                 return FALSE;
1516             }
1517         }
1518
1519         g_debug("Create %d threads", self->nb_threads);
1520         self->thread_pool_write = g_thread_pool_new(s3_thread_write_block, self,
1521                                               self->nb_threads, 0, NULL);
1522         self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self,
1523                                               self->nb_threads, 0, NULL);
1524         self->thread_idle_cond = g_cond_new();
1525         self->thread_idle_mutex = g_mutex_new();
1526     }
1527
1528     for (thread = 0; thread < self->nb_threads; thread++) {
1529         s3_verbose(self->s3t[thread].s3, self->verbose);
1530
1531         if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
1532             device_set_error(d_self, g_strdup_printf(_(
1533                 "Error setting S3 SSL/TLS use "
1534                 "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
1535                 DEVICE_STATUS_DEVICE_ERROR);
1536             return FALSE;
1537         }
1538
1539         if (self->max_send_speed &&
1540             !s3_set_max_send_speed(self->s3t[thread].s3, self->max_send_speed)) {
1541             device_set_error(d_self,
1542                 g_strdup("Could not set S3 maximum send speed"),
1543                 DEVICE_STATUS_DEVICE_ERROR);
1544             return FALSE;
1545         }
1546
1547         if (self->max_recv_speed &&
1548             !s3_set_max_recv_speed(self->s3t[thread].s3, self->max_recv_speed)) {
1549             device_set_error(d_self,
1550                 g_strdup("Could not set S3 maximum recv speed"),
1551                 DEVICE_STATUS_DEVICE_ERROR);
1552             return FALSE;
1553         }
1554     }
1555
1556     return TRUE;
1557 }
1558
1559 static DeviceStatusFlags
1560 s3_device_read_label(Device *pself) {
1561     S3Device *self = S3_DEVICE(pself);
1562     char *key;
1563     CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
1564     dumpfile_t *amanda_header;
1565     /* note that this may be called from s3_device_start, when
1566      * self->access_mode is not ACCESS_NULL */
1567
1568     amfree(pself->volume_label);
1569     amfree(pself->volume_time);
1570     dumpfile_free(pself->volume_header);
1571     pself->volume_header = NULL;
1572
1573     if (device_in_error(self)) return pself->status;
1574
1575     if (!setup_handle(self)) {
1576         /* setup_handle already set our error message */
1577         return pself->status;
1578     }
1579     reset_thread(self);
1580
1581     key = special_file_to_key(self, "tapestart", -1);
1582     if (!s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) {
1583         guint response_code;
1584         s3_error_code_t s3_error_code;
1585         s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
1586
1587         /* if it's an expected error (not found), just return FALSE */
1588         if (response_code == 404 &&
1589              (s3_error_code == S3_ERROR_NoSuchKey ||
1590               s3_error_code == S3_ERROR_NoSuchEntity ||
1591               s3_error_code == S3_ERROR_NoSuchBucket)) {
1592             g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)"));
1593             device_set_error(pself,
1594                 stralloc(_("Amanda header not found -- unlabeled volume?")),
1595                   DEVICE_STATUS_DEVICE_ERROR
1596                 | DEVICE_STATUS_VOLUME_ERROR
1597                 | DEVICE_STATUS_VOLUME_UNLABELED);
1598             return pself->status;
1599         }
1600
1601         /* otherwise, log it and return */
1602         device_set_error(pself,
1603             vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3t[0].s3)),
1604             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
1605         return pself->status;
1606     }
1607
1608     /* handle an empty file gracefully */
1609     if (buf.buffer_len == 0) {
1610         device_set_error(pself, stralloc(_("Empty header file")), DEVICE_STATUS_VOLUME_ERROR);
1611         return pself->status;
1612     }
1613
1614     g_assert(buf.buffer != NULL);
1615     amanda_header = g_new(dumpfile_t, 1);
1616     parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
1617     pself->volume_header = amanda_header;
1618     g_free(buf.buffer);
1619
1620     if (amanda_header->type != F_TAPESTART) {
1621         device_set_error(pself, stralloc(_("Invalid amanda header")), DEVICE_STATUS_VOLUME_ERROR);
1622         return pself->status;
1623     }
1624
1625     pself->volume_label = g_strdup(amanda_header->name);
1626     pself->volume_time = g_strdup(amanda_header->datestamp);
1627     /* pself->volume_header is already set */
1628
1629     device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
1630
1631     return pself->status;
1632 }
1633
1634 static gboolean
1635 s3_device_start (Device * pself, DeviceAccessMode mode,
1636                  char * label, char * timestamp) {
1637     S3Device * self;
1638     GSList *keys;
1639     guint64 total_size = 0;
1640     gboolean result;
1641
1642     self = S3_DEVICE(pself);
1643
1644     if (device_in_error(self)) return FALSE;
1645
1646     if (!setup_handle(self)) {
1647         /* setup_handle already set our error message */
1648         return FALSE;
1649     }
1650
1651     reset_thread(self);
1652     pself->access_mode = mode;
1653     pself->in_file = FALSE;
1654
1655     /* try creating the bucket, in case it doesn't exist */
1656     if (mode != ACCESS_READ && !s3_make_bucket(self->s3t[0].s3, self->bucket)) {
1657         guint response_code;
1658         s3_error_code_t s3_error_code;
1659         s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
1660
1661         /* if it isn't an expected error (bucket already exists),
1662          * return FALSE */
1663         if (response_code != 409 ||
1664             (s3_error_code != S3_ERROR_BucketAlreadyExists &&
1665              s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) {
1666             device_set_error(pself,
1667                 vstrallocf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)),
1668                 DEVICE_STATUS_DEVICE_ERROR);
1669             return FALSE;
1670         }
1671     }
1672
1673     /* take care of any dirty work for this mode */
1674     switch (mode) {
1675         case ACCESS_READ:
1676             if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
1677                 /* s3_device_read_label already set our error message */
1678                 return FALSE;
1679             }
1680             break;
1681
1682         case ACCESS_WRITE:
1683             delete_all_files(self);
1684
1685             /* write a new amanda header */
1686             if (!write_amanda_header(self, label, timestamp)) {
1687                 return FALSE;
1688             }
1689
1690             pself->volume_label = newstralloc(pself->volume_label, label);
1691             pself->volume_time = newstralloc(pself->volume_time, timestamp);
1692
1693             /* unset the VOLUME_UNLABELED flag, if it was set */
1694             device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
1695             break;
1696
1697         case ACCESS_APPEND:
1698             if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
1699                 /* s3_device_read_label already set our error message */
1700                 return FALSE;
1701             } else {
1702                 result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, NULL, &keys, &total_size);
1703                 if(!result) {
1704                     device_set_error(pself,
1705                                  vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
1706                                  DEVICE_STATUS_DEVICE_ERROR|DEVICE_STATUS_VOLUME_ERROR);
1707                     return FALSE;
1708                 } else {
1709                     self->volume_bytes = total_size;
1710                 }
1711             }
1712             return seek_to_end(self);
1713             break;
1714
1715         case ACCESS_NULL:
1716             g_assert_not_reached();
1717     }
1718
1719     return TRUE;
1720 }
1721
1722 static gboolean
1723 s3_device_finish (
1724     Device * pself)
1725 {
1726     S3Device *self = S3_DEVICE(pself);
1727
1728     reset_thread(self);
1729
1730     /* we're not in a file anymore */
1731     pself->access_mode = ACCESS_NULL;
1732
1733     if (device_in_error(pself)) return FALSE;
1734
1735     return TRUE;
1736 }
1737
1738 /* functions for writing */
1739
1740
1741 static gboolean
1742 s3_device_start_file (Device *pself, dumpfile_t *jobInfo) {
1743     S3Device *self = S3_DEVICE(pself);
1744     CurlBuffer amanda_header = {NULL, 0, 0, 0};
1745     gboolean result;
1746     size_t header_size;
1747     char  *key;
1748     int    thread;
1749
1750     if (device_in_error(self)) return FALSE;
1751
1752     reset_thread(self);
1753     pself->is_eom = FALSE;
1754
1755     /* Set the blocksize to zero, since there's no header to skip (it's stored
1756      * in a distinct file, rather than block zero) */
1757     jobInfo->blocksize = 0;
1758
1759     /* Build the amanda header. */
1760     header_size = 0; /* no minimum size */
1761     amanda_header.buffer = device_build_amanda_header(pself, jobInfo,
1762         &header_size);
1763     if (amanda_header.buffer == NULL) {
1764         device_set_error(pself,
1765             stralloc(_("Amanda file header won't fit in a single block!")),
1766             DEVICE_STATUS_DEVICE_ERROR);
1767         return FALSE;
1768     }
1769     amanda_header.buffer_len = header_size;
1770
1771     if(check_at_leom(self, header_size))
1772         pself->is_eom = TRUE;
1773
1774     if(check_at_peom(self, header_size)) {
1775         pself->is_eom = TRUE;
1776         device_set_error(pself,
1777             stralloc(_("No space left on device")),
1778             DEVICE_STATUS_DEVICE_ERROR);
1779         g_free(amanda_header.buffer);
1780         return FALSE;
1781     }
1782     /* set the file and block numbers correctly */
1783     pself->file = (pself->file > 0)? pself->file+1 : 1;
1784     pself->block = 0;
1785     pself->in_file = TRUE;
1786     /* write it out as a special block (not the 0th) */
1787     key = special_file_to_key(self, "filestart", pself->file);
1788     result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
1789                        &amanda_header, NULL, NULL);
1790     g_free(amanda_header.buffer);
1791     g_free(key);
1792     if (!result) {
1793         device_set_error(pself,
1794             vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3t[0].s3)),
1795             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
1796         return FALSE;
1797     }
1798
1799     self->volume_bytes += header_size;
1800     for (thread = 0; thread < self->nb_threads; thread++)  {
1801         self->s3t[thread].idle = 1;
1802     }
1803
1804     return TRUE;
1805 }
1806
1807 static gboolean
1808 s3_device_write_block (Device * pself, guint size, gpointer data) {
1809     char *filename;
1810     S3Device * self = S3_DEVICE(pself);
1811     int idle_thread = 0;
1812     int thread = -1;
1813     int first_idle = -1;
1814
1815     g_assert (self != NULL);
1816     g_assert (data != NULL);
1817     if (device_in_error(self)) return FALSE;
1818
1819     if(check_at_leom(self, size))
1820         pself->is_eom = TRUE;
1821
1822     if(check_at_peom(self, size)) {
1823         pself->is_eom = TRUE;
1824         device_set_error(pself,
1825             stralloc(_("No space left on device")),
1826             DEVICE_STATUS_DEVICE_ERROR);
1827         return FALSE;
1828     }
1829
1830     filename = file_and_block_to_key(self, pself->file, pself->block);
1831
1832     g_mutex_lock(self->thread_idle_mutex);
1833     while (!idle_thread) {
1834         idle_thread = 0;
1835         for (thread = 0; thread < self->nb_threads_backup; thread++)  {
1836             if (self->s3t[thread].idle == 1) {
1837                 idle_thread++;
1838                 if (first_idle == -1)
1839                     first_idle = thread;
1840                 /* Check if the thread is in error */
1841                 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
1842                     device_set_error(pself, (char *)self->s3t[thread].errmsg,
1843                                      self->s3t[thread].errflags);
1844                     self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
1845                     self->s3t[thread].errmsg = NULL;
1846                     g_mutex_unlock(self->thread_idle_mutex);
1847                     return FALSE;
1848                 }
1849             }
1850         }
1851         if (!idle_thread) {
1852             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
1853         }
1854     }
1855     thread = first_idle;
1856
1857     self->s3t[thread].idle = 0;
1858     self->s3t[thread].done = 0;
1859     if (self->s3t[thread].curl_buffer.buffer &&
1860         self->s3t[thread].curl_buffer.buffer_len < size) {
1861         g_free((char *)self->s3t[thread].curl_buffer.buffer);
1862         self->s3t[thread].curl_buffer.buffer = NULL;
1863         self->s3t[thread].curl_buffer.buffer_len = 0;
1864         self->s3t[thread].buffer_len = 0;
1865     }
1866     if (self->s3t[thread].curl_buffer.buffer == NULL) {
1867         self->s3t[thread].curl_buffer.buffer = g_malloc(size);
1868         self->s3t[thread].curl_buffer.buffer_len = size;
1869         self->s3t[thread].buffer_len = size;
1870     }
1871     memcpy((char *)self->s3t[thread].curl_buffer.buffer, data, size);
1872     self->s3t[thread].curl_buffer.buffer_pos = 0;
1873     self->s3t[thread].curl_buffer.buffer_len = size;
1874     self->s3t[thread].curl_buffer.max_buffer_size = 0;
1875     self->s3t[thread].filename = filename;
1876     g_thread_pool_push(self->thread_pool_write, &self->s3t[thread], NULL);
1877     g_mutex_unlock(self->thread_idle_mutex);
1878
1879     pself->block++;
1880     self->volume_bytes += size;
1881     return TRUE;
1882 }
1883
1884 static void
1885 s3_thread_write_block(
1886     gpointer thread_data,
1887     gpointer data)
1888 {
1889     S3_by_thread *s3t = (S3_by_thread *)thread_data;
1890     Device *pself = (Device *)data;
1891     S3Device *self = S3_DEVICE(pself);
1892     gboolean result;
1893
1894     result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename,
1895                        S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
1896     g_free((void *)s3t->filename);
1897     s3t->filename = NULL;
1898     if (!result) {
1899         s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
1900         s3t->errmsg = g_strdup_printf(_("While writing data block to S3: %s"), s3_strerror(s3t->s3));
1901     }
1902     g_mutex_lock(self->thread_idle_mutex);
1903     s3t->idle = 1;
1904     s3t->done = 1;
1905     s3t->curl_buffer.buffer_len = s3t->buffer_len;
1906     g_cond_broadcast(self->thread_idle_cond);
1907     g_mutex_unlock(self->thread_idle_mutex);
1908 }
1909
1910 static gboolean
1911 s3_device_finish_file (Device * pself) {
1912     S3Device *self = S3_DEVICE(pself);
1913
1914     /* Check all threads are done */
1915     int idle_thread = 0;
1916     int thread;
1917
1918     g_mutex_lock(self->thread_idle_mutex);
1919     while (idle_thread != self->nb_threads) {
1920         idle_thread = 0;
1921         for (thread = 0; thread < self->nb_threads; thread++)  {
1922             if (self->s3t[thread].idle == 1) {
1923                 idle_thread++;
1924             }
1925             /* check thread status */
1926             if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
1927                 device_set_error(pself, (char *)self->s3t[thread].errmsg,
1928                                  self->s3t[thread].errflags);
1929                 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
1930                 self->s3t[thread].errmsg = NULL;
1931             }
1932         }
1933         if (idle_thread != self->nb_threads) {
1934             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
1935         }
1936     }
1937     g_mutex_unlock(self->thread_idle_mutex);
1938
1939     if (device_in_error(pself)) return FALSE;
1940
1941     /* we're not in a file anymore */
1942     pself->in_file = FALSE;
1943
1944     return TRUE;
1945 }
1946
1947 static gboolean
1948 s3_device_recycle_file(Device *pself, guint file) {
1949     S3Device *self = S3_DEVICE(pself);
1950     if (device_in_error(self)) return FALSE;
1951
1952     reset_thread(self);
1953     return delete_file(self, file);
1954     /* delete_file already set our error message if necessary */
1955 }
1956
1957 static gboolean
1958 s3_device_erase(Device *pself) {
1959     S3Device *self = S3_DEVICE(pself);
1960     char *key = NULL;
1961     const char *errmsg = NULL;
1962     guint response_code;
1963     s3_error_code_t s3_error_code;
1964
1965     if (!setup_handle(self)) {
1966         /* error set by setup_handle */
1967         return FALSE;
1968     }
1969
1970     reset_thread(self);
1971     key = special_file_to_key(self, "tapestart", -1);
1972     if (!s3_delete(self->s3t[0].s3, self->bucket, key)) {
1973         s3_error(self->s3t[0].s3, &errmsg, NULL, NULL, NULL, NULL, NULL);
1974         device_set_error(pself,
1975             stralloc(errmsg),
1976             DEVICE_STATUS_DEVICE_ERROR);
1977         return FALSE;
1978     }
1979     g_free(key);
1980
1981     if (!delete_all_files(self))
1982         return FALSE;
1983
1984     if (!s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
1985         s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
1986
1987         /*
1988          * ignore the error if the bucket isn't empty (there may be data from elsewhere)
1989          * or the bucket not existing (already deleted perhaps?)
1990          */
1991         if (!(
1992                 (response_code == 409 && s3_error_code == S3_ERROR_BucketNotEmpty) ||
1993                 (response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket))) {
1994
1995             device_set_error(pself,
1996                 stralloc(errmsg),
1997                 DEVICE_STATUS_DEVICE_ERROR);
1998             return FALSE;
1999         }
2000     }
2001     self->volume_bytes = 0;
2002     return TRUE;
2003 }
2004
2005 /* functions for reading */
2006
2007 static dumpfile_t*
2008 s3_device_seek_file(Device *pself, guint file) {
2009     S3Device *self = S3_DEVICE(pself);
2010     gboolean result;
2011     char *key;
2012     CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
2013     dumpfile_t *amanda_header;
2014     const char *errmsg = NULL;
2015     int thread;
2016
2017     if (device_in_error(self)) return NULL;
2018
2019     reset_thread(self);
2020
2021     pself->file = file;
2022     pself->is_eof = FALSE;
2023     pself->in_file = FALSE;
2024     pself->block = 0;
2025     self->next_block_to_read = 0;
2026
2027     /* read it in */
2028     key = special_file_to_key(self, "filestart", pself->file);
2029     result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS,
2030         &buf, NULL, NULL);
2031     g_free(key);
2032
2033     if (!result) {
2034         guint response_code;
2035         s3_error_code_t s3_error_code;
2036         s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL);
2037
2038         /* if it's an expected error (not found), check what to do. */
2039         if (response_code == 404 &&
2040             (s3_error_code == S3_ERROR_NoSuchKey ||
2041              s3_error_code == S3_ERROR_NoSuchEntity)) {
2042             int next_file;
2043             next_file = find_next_file(self, pself->file);
2044             if (next_file > 0) {
2045                 /* Note short-circut of dispatcher. */
2046                 return s3_device_seek_file(pself, next_file);
2047             } else if (next_file == 0) {
2048                 /* No next file. Check if we are one past the end. */
2049                 key = special_file_to_key(self, "filestart", pself->file - 1);
2050                 result = s3_read(self->s3t[0].s3, self->bucket, key,
2051                     S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL);
2052                 g_free(key);
2053                 if (result) {
2054                     /* pself->file, etc. are already correct */
2055                     return make_tapeend_header();
2056                 } else {
2057                     device_set_error(pself,
2058                         stralloc(_("Attempt to read past tape-end file")),
2059                         DEVICE_STATUS_SUCCESS);
2060                     return NULL;
2061                 }
2062             }
2063         } else {
2064             /* An unexpected error occured finding out if we are the last file. */
2065             device_set_error(pself,
2066                 stralloc(errmsg),
2067                 DEVICE_STATUS_DEVICE_ERROR);
2068             return NULL;
2069         }
2070     }
2071
2072     /* and make a dumpfile_t out of it */
2073     g_assert(buf.buffer != NULL);
2074     amanda_header = g_new(dumpfile_t, 1);
2075     fh_init(amanda_header);
2076     parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
2077     g_free(buf.buffer);
2078
2079     switch (amanda_header->type) {
2080         case F_DUMPFILE:
2081         case F_CONT_DUMPFILE:
2082         case F_SPLIT_DUMPFILE:
2083             break;
2084
2085         default:
2086             device_set_error(pself,
2087                 stralloc(_("Invalid amanda header while reading file header")),
2088                 DEVICE_STATUS_VOLUME_ERROR);
2089             g_free(amanda_header);
2090             return NULL;
2091     }
2092
2093     pself->in_file = TRUE;
2094     for (thread = 0; thread < self->nb_threads; thread++)  {
2095         self->s3t[thread].idle = 1;
2096         self->s3t[thread].eof = FALSE;
2097     }
2098     return amanda_header;
2099 }
2100
2101 static gboolean
2102 s3_device_seek_block(Device *pself, guint64 block) {
2103     S3Device * self = S3_DEVICE(pself);
2104     if (device_in_error(pself)) return FALSE;
2105
2106     reset_thread(self);
2107     pself->block = block;
2108     self->next_block_to_read = block;
2109     return TRUE;
2110 }
2111
2112 static int
2113 s3_device_read_block (Device * pself, gpointer data, int *size_req) {
2114     S3Device * self = S3_DEVICE(pself);
2115     char *key;
2116     int thread;
2117     int done = 0;
2118
2119     g_assert (self != NULL);
2120     if (device_in_error(self)) return -1;
2121
2122     g_mutex_lock(self->thread_idle_mutex);
2123     /* start a read ahead for each thread */
2124     for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2125         S3_by_thread *s3t = &self->s3t[thread];
2126         if (s3t->idle) {
2127             key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
2128             s3t->filename = key;
2129             s3t->done = 0;
2130             s3t->idle = 0;
2131             s3t->eof = FALSE;
2132             s3t->errflags = DEVICE_STATUS_SUCCESS;
2133             if (self->s3t[thread].curl_buffer.buffer &&
2134                 (int)self->s3t[thread].curl_buffer.buffer_len < *size_req) {
2135                 g_free(self->s3t[thread].curl_buffer.buffer);
2136                 self->s3t[thread].curl_buffer.buffer = NULL;
2137                 self->s3t[thread].curl_buffer.buffer_len = 0;
2138                 self->s3t[thread].buffer_len = 0;
2139             }
2140             if (!self->s3t[thread].curl_buffer.buffer) {
2141                 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
2142                 self->s3t[thread].curl_buffer.buffer_len = *size_req;
2143                 self->s3t[thread].buffer_len = *size_req;
2144             }
2145             s3t->curl_buffer.buffer_pos = 0;
2146             s3t->curl_buffer.max_buffer_size = S3_DEVICE_MAX_BLOCK_SIZE;
2147             self->next_block_to_read++;
2148             g_thread_pool_push(self->thread_pool_read, s3t, NULL);
2149         }
2150     }
2151
2152     /* get the file*/
2153     key = file_and_block_to_key(self, pself->file, pself->block);
2154     g_assert(key != NULL);
2155     while (!done) {
2156         /* find which thread read the key */
2157         for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2158             S3_by_thread *s3t;
2159             s3t = &self->s3t[thread];
2160             if (!s3t->idle &&
2161                 s3t->done &&
2162                 strcmp(key, (char *)s3t->filename) == 0) {
2163                 if (s3t->eof) {
2164                     /* return eof */
2165                     g_free(key);
2166                     pself->is_eof = TRUE;
2167                     pself->in_file = FALSE;
2168                     device_set_error(pself, stralloc(_("EOF")),
2169                                      DEVICE_STATUS_SUCCESS);
2170                     g_mutex_unlock(self->thread_idle_mutex);
2171                     return -1;
2172                 } else if (s3t->errflags != DEVICE_STATUS_SUCCESS) {
2173                     /* return the error */
2174                     device_set_error(pself, (char *)s3t->errmsg, s3t->errflags);
2175                     g_free(key);
2176                     g_mutex_unlock(self->thread_idle_mutex);
2177                     return -1;
2178
2179                 } else if ((guint)*size_req >= s3t->curl_buffer.buffer_pos) {
2180                     /* return the buffer */
2181                     g_mutex_unlock(self->thread_idle_mutex);
2182                     memcpy(data, s3t->curl_buffer.buffer,
2183                                  s3t->curl_buffer.buffer_pos);
2184                     *size_req = s3t->curl_buffer.buffer_pos;
2185                     g_free(key);
2186                     s3t->idle = 1;
2187                     g_free((char *)s3t->filename);
2188                     pself->block++;
2189                     done = 1;
2190                     g_mutex_lock(self->thread_idle_mutex);
2191                     break;
2192                 } else { /* buffer not enough large */
2193                     *size_req = s3t->curl_buffer.buffer_len;
2194                     g_free(key);
2195                     g_mutex_unlock(self->thread_idle_mutex);
2196                     return 0;
2197                 }
2198             }
2199         }
2200         if (!done) {
2201             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2202         }
2203     }
2204
2205     /* start a read ahead for the thread */
2206     for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2207         S3_by_thread *s3t = &self->s3t[thread];
2208         if (s3t->idle) {
2209             key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
2210             s3t->filename = key;
2211             s3t->done = 0;
2212             s3t->idle = 0;
2213             s3t->eof = FALSE;
2214             s3t->errflags = DEVICE_STATUS_SUCCESS;
2215             if (!self->s3t[thread].curl_buffer.buffer) {
2216                 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
2217                 self->s3t[thread].curl_buffer.buffer_len = *size_req;
2218             }
2219             s3t->curl_buffer.buffer_pos = 0;
2220             self->next_block_to_read++;
2221             g_thread_pool_push(self->thread_pool_read, s3t, NULL);
2222         }
2223     }
2224     g_mutex_unlock(self->thread_idle_mutex);
2225
2226     return *size_req;
2227
2228 }
2229
2230 static void
2231 s3_thread_read_block(
2232     gpointer thread_data,
2233     gpointer data)
2234 {
2235     S3_by_thread *s3t = (S3_by_thread *)thread_data;
2236     Device *pself = (Device *)data;
2237     S3Device *self = S3_DEVICE(pself);
2238     gboolean result;
2239
2240     result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename, s3_buffer_write_func,
2241         s3_buffer_reset_func, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
2242
2243     g_mutex_lock(self->thread_idle_mutex);
2244     if (!result) {
2245         guint response_code;
2246         s3_error_code_t s3_error_code;
2247         s3_error(s3t->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2248
2249         /* if it's an expected error (not found), just return -1 */
2250         if (response_code == 404 &&
2251             (s3_error_code == S3_ERROR_NoSuchKey ||
2252              s3_error_code == S3_ERROR_NoSuchEntity)) {
2253             s3t->eof = TRUE;
2254         } else {
2255
2256             /* otherwise, log it and return FALSE */
2257             s3t->errflags = DEVICE_STATUS_VOLUME_ERROR;
2258             s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"),
2259                                           s3_strerror(s3t->s3));
2260         }
2261     }
2262     s3t->done = 1;
2263     g_cond_broadcast(self->thread_idle_cond);
2264     g_mutex_unlock(self->thread_idle_mutex);
2265
2266     return;
2267 }
2268
2269 static gboolean
2270 check_at_peom(S3Device *self, guint64 size)
2271 {
2272     if(self->enforce_volume_limit && (self->volume_limit > 0)) {
2273         guint64 newtotal = self->volume_bytes + size;
2274         if(newtotal > self->volume_limit) {
2275             return TRUE;
2276         }
2277     }
2278     return FALSE;
2279 }
2280
2281 static gboolean
2282 check_at_leom(S3Device *self, guint64 size)
2283 {
2284     guint64 block_size = DEVICE(self)->block_size;
2285     guint64 eom_warning_buffer = block_size *
2286                 (EOM_EARLY_WARNING_ZONE_BLOCKS + self->nb_threads);
2287
2288     if(!self->leom)
2289         return FALSE;
2290
2291     if(self->enforce_volume_limit && (self->volume_limit > 0)) {
2292         guint64 newtotal = self->volume_bytes + size + eom_warning_buffer;
2293         if(newtotal > self->volume_limit) {
2294            return TRUE;
2295         }
2296     }
2297     return FALSE;
2298 }
2299
2300 static void
2301 reset_thread(
2302     S3Device *self)
2303 {
2304     int thread;
2305     int nb_done = 0;
2306
2307     g_mutex_lock(self->thread_idle_mutex);
2308     while(nb_done != self->nb_threads) {
2309         nb_done = 0;
2310         for (thread = 0; thread < self->nb_threads; thread++)  {
2311             if (self->s3t[thread].done == 1)
2312                 nb_done++;
2313         }
2314         if (nb_done != self->nb_threads) {
2315             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2316         }
2317     }
2318     g_mutex_unlock(self->thread_idle_mutex);
2319 }