7a987505c4e81ba70d40c8fb4f70d12aa13b97c3
[debian/amanda] / device-src / s3-device.c
1 /*
2  * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  *
17  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 /* An S3 device uses Amazon's S3 service (http://www.amazon.com/s3) to store
22  * data.  It stores data in keys named with a user-specified prefix, inside a
23  * user-specified bucket.  Data is stored in the form of numbered (large)
24  * blocks.
25  */
26
27 #include "amanda.h"
28 #include <string.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <unistd.h>
32 #include <dirent.h>
33 #include <regex.h>
34 #include <time.h>
35 #include "util.h"
36 #include "conffile.h"
37 #include "device.h"
38 #include "s3.h"
39 #include <curl/curl.h>
40 #ifdef HAVE_OPENSSL_HMAC_H
41 # include <openssl/hmac.h>
42 #else
43 # ifdef HAVE_CRYPTO_HMAC_H
44 #  include <crypto/hmac.h>
45 # else
46 #  ifdef HAVE_HMAC_H
47 #   include <hmac.h>
48 #  endif
49 # endif
50 #endif
51
52 /*
53  * Type checking and casting macros
54  */
55 #define TYPE_S3_DEVICE  (s3_device_get_type())
56 #define S3_DEVICE(obj)  G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device)
57 #define S3_DEVICE_CONST(obj)    G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device const)
58 #define S3_DEVICE_CLASS(klass)  G_TYPE_CHECK_CLASS_CAST((klass), s3_device_get_type(), S3DeviceClass)
59 #define IS_S3_DEVICE(obj)       G_TYPE_CHECK_INSTANCE_TYPE((obj), s3_device_get_type ())
60
61 #define S3_DEVICE_GET_CLASS(obj)        G_TYPE_INSTANCE_GET_CLASS((obj), s3_device_get_type(), S3DeviceClass)
62 static GType    s3_device_get_type      (void);
63
64 /*
65  * Main object structure
66  */
67 typedef struct _S3MetadataFile S3MetadataFile;
68
69 typedef struct _S3_by_thread S3_by_thread;
70 struct _S3_by_thread {
71     S3Handle * volatile          s3;
72     CurlBuffer volatile          curl_buffer;
73     guint volatile               buffer_len;
74     int volatile                 idle;
75     int volatile                 eof;
76     int volatile                 done;
77     char volatile * volatile     filename;
78     DeviceStatusFlags volatile   errflags;      /* device_status */
79     char volatile * volatile     errmsg;        /* device error message */
80 };
81
82 typedef struct _S3Device S3Device;
83 struct _S3Device {
84     Device __parent__;
85
86     /* The "easy" curl handle we use to access Amazon S3 */
87     S3_by_thread *s3t;
88
89     /* S3 access information */
90     char *bucket;
91     char *prefix;
92
93     /* The S3 access information. */
94     char *secret_key;
95     char *access_key;
96     char *user_token;
97
98     /* The Openstack swift information. */
99     char *swift_account_id;
100     char *swift_access_key;
101
102     char *bucket_location;
103     char *storage_class;
104     char *host;
105     char *service_path;
106     char *server_side_encryption;
107
108     char *ca_info;
109
110     /* a cache for unsuccessful reads (where we get the file but the caller
111      * doesn't have space for it or doesn't want it), where we expect the
112      * next call will request the same file.
113      */
114     char *cached_buf;
115     char *cached_key;
116     int cached_size;
117
118     /* Produce verbose output? */
119     gboolean verbose;
120
121     /* Use SSL? */
122     gboolean use_ssl;
123     gboolean openstack_swift_api;
124
125     /* Throttling */
126     guint64 max_send_speed;
127     guint64 max_recv_speed;
128
129     gboolean leom;
130     guint64 volume_bytes;
131     guint64 volume_limit;
132     gboolean enforce_volume_limit;
133     gboolean use_subdomain;
134
135     int          nb_threads;
136     int          nb_threads_backup;
137     int          nb_threads_recovery;
138     GThreadPool *thread_pool_delete;
139     GThreadPool *thread_pool_write;
140     GThreadPool *thread_pool_read;
141     GCond       *thread_idle_cond;
142     GMutex      *thread_idle_mutex;
143     int          next_block_to_read;
144     GSList      *keys;
145 };
146
147 /*
148  * Class definition
149  */
150 typedef struct _S3DeviceClass S3DeviceClass;
151 struct _S3DeviceClass {
152     DeviceClass __parent__;
153 };
154
155
156 /*
157  * Constants and static data
158  */
159
160 #define S3_DEVICE_NAME "s3"
161
162 /* Maximum key length as specified in the S3 documentation
163  * (*excluding* null terminator) */
164 #define S3_MAX_KEY_LENGTH 1024
165
166 /* Note: for compatability, min can only be decreased and max increased */
167 #define S3_DEVICE_MIN_BLOCK_SIZE 1024
168 #define S3_DEVICE_MAX_BLOCK_SIZE (3*1024*1024*1024ULL)
169 #define S3_DEVICE_DEFAULT_BLOCK_SIZE (10*1024*1024)
170 #define EOM_EARLY_WARNING_ZONE_BLOCKS 4
171
172 /* This goes in lieu of file number for metadata. */
173 #define SPECIAL_INFIX "special-"
174
175 /* pointer to the class of our parent */
176 static DeviceClass *parent_class = NULL;
177
178 /*
179  * device-specific properties
180  */
181
182 /* Authentication information for Amazon S3. Both of these are strings. */
183 static DevicePropertyBase device_property_s3_access_key;
184 static DevicePropertyBase device_property_s3_secret_key;
185 #define PROPERTY_S3_SECRET_KEY (device_property_s3_secret_key.ID)
186 #define PROPERTY_S3_ACCESS_KEY (device_property_s3_access_key.ID)
187
188 /* Authentication information for Openstack Swift. Both of these are strings. */
189 static DevicePropertyBase device_property_swift_account_id;
190 static DevicePropertyBase device_property_swift_access_key;
191 #define PROPERTY_SWIFT_ACCOUNT_ID (device_property_swift_account_id.ID)
192 #define PROPERTY_SWIFT_ACCESS_KEY (device_property_swift_access_key.ID)
193
194 /* Host and path */
195 static DevicePropertyBase device_property_s3_host;
196 static DevicePropertyBase device_property_s3_service_path;
197 #define PROPERTY_S3_HOST (device_property_s3_host.ID)
198 #define PROPERTY_S3_SERVICE_PATH (device_property_s3_service_path.ID)
199
200 /* Same, but for S3 with DevPay. */
201 static DevicePropertyBase device_property_s3_user_token;
202 #define PROPERTY_S3_USER_TOKEN (device_property_s3_user_token.ID)
203
204 /* Location constraint for new buckets created on Amazon S3. */
205 static DevicePropertyBase device_property_s3_bucket_location;
206 #define PROPERTY_S3_BUCKET_LOCATION (device_property_s3_bucket_location.ID)
207
208 /* Storage class */
209 static DevicePropertyBase device_property_s3_storage_class;
210 #define PROPERTY_S3_STORAGE_CLASS (device_property_s3_storage_class.ID)
211
212 /* Storage class */
213 static DevicePropertyBase device_property_s3_server_side_encryption;
214 #define PROPERTY_S3_SERVER_SIDE_ENCRYPTION (device_property_s3_server_side_encryption.ID)
215
216 /* Path to certificate authority certificate */
217 static DevicePropertyBase device_property_ssl_ca_info;
218 #define PROPERTY_SSL_CA_INFO (device_property_ssl_ca_info.ID)
219
220 /* Whether to use openstack protocol. */
221 static DevicePropertyBase device_property_openstack_swift_api;
222 #define PROPERTY_OPENSTACK_SWIFT_API (device_property_openstack_swift_api.ID)
223
224 /* Whether to use SSL with Amazon S3. */
225 static DevicePropertyBase device_property_s3_ssl;
226 #define PROPERTY_S3_SSL (device_property_s3_ssl.ID)
227
228 /* Speed limits for sending and receiving */
229 static DevicePropertyBase device_property_max_send_speed;
230 static DevicePropertyBase device_property_max_recv_speed;
231 #define PROPERTY_MAX_SEND_SPEED (device_property_max_send_speed.ID)
232 #define PROPERTY_MAX_RECV_SPEED (device_property_max_recv_speed.ID)
233
234 /* Whether to use subdomain */
235 static DevicePropertyBase device_property_s3_subdomain;
236 #define PROPERTY_S3_SUBDOMAIN (device_property_s3_subdomain.ID)
237
238 /* Number of threads to use */
239 static DevicePropertyBase device_property_nb_threads_backup;
240 #define PROPERTY_NB_THREADS_BACKUP (device_property_nb_threads_backup.ID)
241 static DevicePropertyBase device_property_nb_threads_recovery;
242 #define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID)
243
244 /*
245  * prototypes
246  */
247
248 void s3_device_register(void);
249
250 /*
251  * utility functions */
252
253 /* Given file and block numbers, return an S3 key.
254  *
255  * @param self: the S3Device object
256  * @param file: the file number
257  * @param block: the block within that file
258  * @returns: a newly allocated string containing an S3 key.
259  */
260 static char *
261 file_and_block_to_key(S3Device *self,
262                       int file,
263                       guint64 block);
264
265 /* Given the name of a special file (such as 'tapestart'), generate
266  * the S3 key to use for that file.
267  *
268  * @param self: the S3Device object
269  * @param special_name: name of the special file
270  * @param file: a file number to include; omitted if -1
271  * @returns: a newly alocated string containing an S3 key.
272  */
273 static char *
274 special_file_to_key(S3Device *self,
275                     char *special_name,
276                     int file);
277 /* Write an amanda header file to S3.
278  *
279  * @param self: the S3Device object
280  * @param label: the volume label
281  * @param timestamp: the volume timestamp
282  */
283 static gboolean
284 write_amanda_header(S3Device *self,
285                     char *label,
286                     char * timestamp);
287
288 /* "Fast forward" this device to the end by looking up the largest file number
289  * present and setting the current file number one greater.
290  *
291  * @param self: the S3Device object
292  */
293 static gboolean
294 seek_to_end(S3Device *self);
295
296 /* Find the number of the last file that contains any data (even just a header).
297  *
298  * @param self: the S3Device object
299  * @returns: the last file, or -1 in event of an error
300  */
301 static int
302 find_last_file(S3Device *self);
303
304 /* Delete all blocks in the given file, including the filestart block
305  *
306  * @param self: the S3Device object
307  * @param file: the file to delete
308  */
309 static gboolean
310 delete_file(S3Device *self,
311             int file);
312
313
314 /* Delete all files in the given device
315  *
316  * @param self: the S3Device object
317  */
318 static gboolean
319 delete_all_files(S3Device *self);
320
321 /* Set up self->s3t as best as possible.
322  *
323  * The return value is TRUE iff self->s3t is useable.
324  *
325  * @param self: the S3Device object
326  * @returns: TRUE if the handle is set up
327  */
328 static gboolean
329 setup_handle(S3Device * self);
330
331 static void
332 s3_wait_thread_delete(S3Device *self);
333
334 /*
335  * class mechanics */
336
337 static void
338 s3_device_init(S3Device * o);
339
340 static void
341 s3_device_class_init(S3DeviceClass * c);
342
343 static void
344 s3_device_finalize(GObject * o);
345
346 static Device*
347 s3_device_factory(char * device_name, char * device_type, char * device_node);
348
349 /*
350  * Property{Get,Set}Fns */
351
352 static gboolean s3_device_set_access_key_fn(Device *self,
353     DevicePropertyBase *base, GValue *val,
354     PropertySurety surety, PropertySource source);
355
356 static gboolean s3_device_set_secret_key_fn(Device *self,
357     DevicePropertyBase *base, GValue *val,
358     PropertySurety surety, PropertySource source);
359
360 static gboolean s3_device_set_swift_account_id_fn(Device *self,
361     DevicePropertyBase *base, GValue *val,
362     PropertySurety surety, PropertySource source);
363
364 static gboolean s3_device_set_swift_access_key_fn(Device *self,
365     DevicePropertyBase *base, GValue *val,
366     PropertySurety surety, PropertySource source);
367
368 static gboolean s3_device_set_user_token_fn(Device *self,
369     DevicePropertyBase *base, GValue *val,
370     PropertySurety surety, PropertySource source);
371
372 static gboolean s3_device_set_bucket_location_fn(Device *self,
373     DevicePropertyBase *base, GValue *val,
374     PropertySurety surety, PropertySource source);
375
376 static gboolean s3_device_set_storage_class_fn(Device *self,
377     DevicePropertyBase *base, GValue *val,
378     PropertySurety surety, PropertySource source);
379
380 static gboolean s3_device_set_server_side_encryption_fn(Device *self,
381     DevicePropertyBase *base, GValue *val,
382     PropertySurety surety, PropertySource source);
383
384 static gboolean s3_device_set_ca_info_fn(Device *self,
385     DevicePropertyBase *base, GValue *val,
386     PropertySurety surety, PropertySource source);
387
388 static gboolean s3_device_set_verbose_fn(Device *self,
389     DevicePropertyBase *base, GValue *val,
390     PropertySurety surety, PropertySource source);
391
392 static gboolean s3_device_set_openstack_swift_api_fn(Device *self,
393     DevicePropertyBase *base, GValue *val,
394     PropertySurety surety, PropertySource source);
395
396 static gboolean s3_device_set_ssl_fn(Device *self,
397     DevicePropertyBase *base, GValue *val,
398     PropertySurety surety, PropertySource source);
399
400 static gboolean s3_device_set_max_send_speed_fn(Device *self,
401     DevicePropertyBase *base, GValue *val,
402     PropertySurety surety, PropertySource source);
403
404 static gboolean s3_device_set_max_recv_speed_fn(Device *self,
405     DevicePropertyBase *base, GValue *val,
406     PropertySurety surety, PropertySource source);
407
408 static gboolean s3_device_set_nb_threads_backup(Device *self,
409     DevicePropertyBase *base, GValue *val,
410     PropertySurety surety, PropertySource source);
411
412 static gboolean s3_device_set_nb_threads_recovery(Device *self,
413     DevicePropertyBase *base, GValue *val,
414     PropertySurety surety, PropertySource source);
415
416 static gboolean s3_device_set_max_volume_usage_fn(Device *p_self,
417     DevicePropertyBase *base, GValue *val,
418     PropertySurety surety, PropertySource source);
419
420 static gboolean property_set_leom_fn(Device *p_self,
421     DevicePropertyBase *base, GValue *val,
422     PropertySurety surety, PropertySource source);
423
424 static gboolean s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
425     DevicePropertyBase *base, GValue *val,
426     PropertySurety surety, PropertySource source);
427
428 static gboolean s3_device_set_use_subdomain_fn(Device *p_self,
429     DevicePropertyBase *base, GValue *val,
430     PropertySurety surety, PropertySource source);
431
432 static gboolean s3_device_set_host_fn(Device *p_self,
433     DevicePropertyBase *base, GValue *val,
434     PropertySurety surety, PropertySource source);
435
436 static gboolean s3_device_set_service_path_fn(Device *p_self,
437     DevicePropertyBase *base, GValue *val,
438     PropertySurety surety, PropertySource source);
439
440 static void s3_thread_read_block(gpointer thread_data,
441                                  gpointer data);
442 static void s3_thread_write_block(gpointer thread_data,
443                                   gpointer data);
444 static gboolean make_bucket(Device * pself);
445
446
447 /* Wait that all threads are done */
448 static void reset_thread(S3Device *self);
449
450 /*
451  * virtual functions */
452
453 static void
454 s3_device_open_device(Device *pself, char *device_name,
455                   char * device_type, char * device_node);
456
457 static DeviceStatusFlags s3_device_read_label(Device * self);
458
459 static gboolean
460 s3_device_start(Device * self,
461                 DeviceAccessMode mode,
462                 char * label,
463                 char * timestamp);
464
465 static gboolean
466 s3_device_finish(Device * self);
467
468 static gboolean
469 s3_device_start_file(Device * self,
470                      dumpfile_t * jobInfo);
471
472 static gboolean
473 s3_device_write_block(Device * self,
474                       guint size,
475                       gpointer data);
476
477 static gboolean
478 s3_device_finish_file(Device * self);
479
480 static dumpfile_t*
481 s3_device_seek_file(Device *pself,
482                     guint file);
483
484 static gboolean
485 s3_device_seek_block(Device *pself,
486                      guint64 block);
487
488 static int
489 s3_device_read_block(Device * pself,
490                      gpointer data,
491                      int *size_req);
492
493 static gboolean
494 s3_device_recycle_file(Device *pself,
495                        guint file);
496
497 static gboolean
498 s3_device_erase(Device *pself);
499
500 static gboolean
501 check_at_leom(S3Device *self,
502                 guint64 size);
503
504 static gboolean
505 check_at_peom(S3Device *self,
506                 guint64 size);
507
508 /*
509  * Private functions
510  */
511
512 static char *
513 file_and_block_to_key(S3Device *self,
514                       int file,
515                       guint64 block)
516 {
517     char *s3_key = g_strdup_printf("%sf%08x-b%016llx.data",
518                                    self->prefix, file, (long long unsigned int)block);
519     g_assert(strlen(s3_key) <= S3_MAX_KEY_LENGTH);
520     return s3_key;
521 }
522
523 static char *
524 special_file_to_key(S3Device *self,
525                     char *special_name,
526                     int file)
527 {
528     if (file == -1)
529         return g_strdup_printf("%s" SPECIAL_INFIX "%s", self->prefix, special_name);
530     else
531         return g_strdup_printf("%sf%08x-%s", self->prefix, file, special_name);
532 }
533
534 static gboolean
535 write_amanda_header(S3Device *self,
536                     char *label,
537                     char * timestamp)
538 {
539     CurlBuffer amanda_header = {NULL, 0, 0, 0};
540     char * key = NULL;
541     gboolean result;
542     dumpfile_t * dumpinfo = NULL;
543     Device *d_self = DEVICE(self);
544     size_t header_size;
545
546     /* build the header */
547     header_size = 0; /* no minimum size */
548     dumpinfo = make_tapestart_header(DEVICE(self), label, timestamp);
549     amanda_header.buffer = device_build_amanda_header(DEVICE(self), dumpinfo,
550         &header_size);
551     if (amanda_header.buffer == NULL) {
552         device_set_error(d_self,
553             stralloc(_("Amanda tapestart header won't fit in a single block!")),
554             DEVICE_STATUS_DEVICE_ERROR);
555         dumpfile_free(dumpinfo);
556         g_free(amanda_header.buffer);
557         return FALSE;
558     }
559
560     if(check_at_leom(self, header_size))
561         d_self->is_eom = TRUE;
562
563     if(check_at_peom(self, header_size)) {
564         d_self->is_eom = TRUE;
565         device_set_error(d_self,
566             stralloc(_("No space left on device")),
567             DEVICE_STATUS_DEVICE_ERROR);
568         g_free(amanda_header.buffer);
569         return FALSE;
570     }
571
572     /* write out the header and flush the uploads. */
573     key = special_file_to_key(self, "tapestart", -1);
574     g_assert(header_size < G_MAXUINT); /* for cast to guint */
575     amanda_header.buffer_len = (guint)header_size;
576     result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
577                        &amanda_header, NULL, NULL);
578     g_free(amanda_header.buffer);
579     g_free(key);
580
581     if (!result) {
582         device_set_error(d_self,
583             vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3t[0].s3)),
584             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
585         dumpfile_free(dumpinfo);
586     } else {
587         dumpfile_free(d_self->volume_header);
588         d_self->volume_header = dumpinfo;
589         self->volume_bytes += header_size;
590     }
591     d_self->header_block_size = header_size;
592     return result;
593 }
594
595 static gboolean
596 seek_to_end(S3Device *self) {
597     int last_file;
598
599     Device *pself = DEVICE(self);
600
601     last_file = find_last_file(self);
602     if (last_file < 0)
603         return FALSE;
604
605     pself->file = last_file;
606
607     return TRUE;
608 }
609
610 /* Convert an object name into a file number, assuming the given prefix
611  * length. Returns -1 if the object name is invalid, or 0 if the object name
612  * is a "special" key. */
613 static int key_to_file(guint prefix_len, const char * key) {
614     int file;
615     int i;
616
617     /* skip the prefix */
618     if (strlen(key) <= prefix_len)
619         return -1;
620
621     key += prefix_len;
622
623     if (strncmp(key, SPECIAL_INFIX, strlen(SPECIAL_INFIX)) == 0) {
624         return 0;
625     }
626
627     /* check that key starts with 'f' */
628     if (key[0] != 'f')
629         return -1;
630     key++;
631
632     /* check that key is of the form "%08x-" */
633     for (i = 0; i < 8; i++) {
634         if (!(key[i] >= '0' && key[i] <= '9') &&
635             !(key[i] >= 'a' && key[i] <= 'f') &&
636             !(key[i] >= 'A' && key[i] <= 'F')) break;
637     }
638     if (key[i] != '-') return -1;
639     if (i < 8) return -1;
640
641     /* convert the file number */
642     errno = 0;
643     file = strtoul(key, NULL, 16);
644     if (errno != 0) {
645         g_warning(_("unparseable file number '%s'"), key);
646         return -1;
647     }
648
649     return file;
650 }
651
652 /* Find the number of the last file that contains any data (even just a header).
653  * Returns -1 in event of an error
654  */
655 static int
656 find_last_file(S3Device *self) {
657     gboolean result;
658     GSList *keys;
659     unsigned int prefix_len = strlen(self->prefix);
660     int last_file = 0;
661     Device *d_self = DEVICE(self);
662
663     /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
664     result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-", &keys, NULL);
665     if (!result) {
666         device_set_error(d_self,
667             vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
668             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
669         return -1;
670     }
671
672     for (; keys; keys = g_slist_remove(keys, keys->data)) {
673         int file = key_to_file(prefix_len, keys->data);
674
675         /* and if it's the last, keep it */
676         if (file > last_file)
677             last_file = file;
678     }
679
680     return last_file;
681 }
682
683 /* Find the number of the file following the requested one, if any.
684  * Returns 0 if there is no such file or -1 in event of an error
685  */
686 static int
687 find_next_file(S3Device *self, int last_file) {
688     gboolean result;
689     GSList *keys;
690     unsigned int prefix_len = strlen(self->prefix);
691     int next_file = 0;
692     Device *d_self = DEVICE(self);
693
694     /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
695     result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-",
696                           &keys, NULL);
697     if (!result) {
698         device_set_error(d_self,
699             vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
700             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
701         return -1;
702     }
703
704     for (; keys; keys = g_slist_remove(keys, keys->data)) {
705         int file;
706
707         file = key_to_file(prefix_len, (char*)keys->data);
708
709         if (file < 0) {
710             /* Set this in case we don't find a next file; this is not a
711              * hard error, so if we can find a next file we'll return that
712              * instead. */
713             next_file = -1;
714         }
715
716         if (file < next_file && file > last_file) {
717             next_file = file;
718         }
719     }
720
721     return next_file;
722 }
723
724 static gboolean
725 delete_file(S3Device *self,
726             int file)
727 {
728     int thread = -1;
729
730     gboolean result;
731     GSList *keys;
732     guint64 total_size = 0;
733     Device *d_self = DEVICE(self);
734     char *my_prefix;
735     if (file == -1) {
736         my_prefix = g_strdup_printf("%sf", self->prefix);
737     } else {
738         my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
739     }
740
741     result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL, &keys,
742                           &total_size);
743     if (!result) {
744         device_set_error(d_self,
745             vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
746             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
747         return FALSE;
748     }
749
750     g_mutex_lock(self->thread_idle_mutex);
751     if (!self->keys) {
752         self->keys = keys;
753     } else {
754         self->keys = g_slist_concat(self->keys, keys);
755     }
756
757     // start the threads
758     for (thread = 0; thread < self->nb_threads; thread++)  {
759         if (self->s3t[thread].idle == 1) {
760             /* Check if the thread is in error */
761             if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
762                 device_set_error(d_self,
763                                  (char *)self->s3t[thread].errmsg,
764                                  self->s3t[thread].errflags);
765                 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
766                 self->s3t[thread].errmsg = NULL;
767                 g_mutex_unlock(self->thread_idle_mutex);
768                 s3_wait_thread_delete(self);
769                 return FALSE;
770             }
771             self->s3t[thread].idle = 0;
772             self->s3t[thread].done = 0;
773             g_thread_pool_push(self->thread_pool_delete, &self->s3t[thread],
774                                NULL);
775         }
776     }
777     g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
778     g_mutex_unlock(self->thread_idle_mutex);
779
780     self->volume_bytes = total_size;
781
782     s3_wait_thread_delete(self);
783
784     return TRUE;
785 }
786
787 static void
788 s3_thread_delete_block(
789     gpointer thread_data,
790     gpointer data)
791 {
792     static int count = 0;
793     S3_by_thread *s3t = (S3_by_thread *)thread_data;
794     Device *pself = (Device *)data;
795     S3Device *self = S3_DEVICE(pself);
796     gboolean result = 1;
797     char *filename;
798
799     g_mutex_lock(self->thread_idle_mutex);
800     while (result && self->keys) {
801         filename = self->keys->data;
802         self->keys = g_slist_remove(self->keys, self->keys->data);
803         count++;
804         if (count >= 1000) {
805             g_debug("Deleting %s ...", filename);
806             count = 0;
807         }
808         g_mutex_unlock(self->thread_idle_mutex);
809         result = s3_delete(s3t->s3, (const char *)self->bucket, (const char *)filename);
810         if (!result) {
811             s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
812             s3t->errmsg = g_strdup_printf(_("While deleting key '%s': %s"),
813                                           filename, s3_strerror(s3t->s3));
814         }
815         g_free(filename);
816         g_mutex_lock(self->thread_idle_mutex);
817     }
818     s3t->idle = 1;
819     s3t->done = 1;
820     g_cond_broadcast(self->thread_idle_cond);
821     g_mutex_unlock(self->thread_idle_mutex);
822 }
823
824 static void
825 s3_wait_thread_delete(S3Device *self)
826 {
827     Device *d_self = (Device *)self;
828     int idle_thread = 0;
829     int thread;
830
831     g_mutex_lock(self->thread_idle_mutex);
832     while (idle_thread != self->nb_threads) {
833         idle_thread = 0;
834         for (thread = 0; thread < self->nb_threads; thread++)  {
835             if (self->s3t[thread].idle == 1) {
836                 idle_thread++;
837             }
838             /* Check if the thread is in error */
839             if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
840                 device_set_error(d_self, (char *)self->s3t[thread].errmsg,
841                                              self->s3t[thread].errflags);
842                 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
843                 self->s3t[thread].errmsg = NULL;
844             }
845         }
846         if (idle_thread != self->nb_threads) {
847             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
848         }
849     }
850     g_mutex_unlock(self->thread_idle_mutex);
851 }
852
853
854 static gboolean
855 delete_all_files(S3Device *self)
856 {
857     return delete_file(self, -1);
858 }
859
860 /*
861  * Class mechanics
862  */
863
864 void
865 s3_device_register(void)
866 {
867     static const char * device_prefix_list[] = { S3_DEVICE_NAME, NULL };
868     g_assert(s3_init());
869
870     /* set up our properties */
871     device_property_fill_and_register(&device_property_s3_secret_key,
872                                       G_TYPE_STRING, "s3_secret_key",
873        "Secret access key to authenticate with Amazon S3");
874     device_property_fill_and_register(&device_property_s3_access_key,
875                                       G_TYPE_STRING, "s3_access_key",
876        "Access key ID to authenticate with Amazon S3");
877     device_property_fill_and_register(&device_property_swift_account_id,
878                                       G_TYPE_STRING, "swift_account_id",
879        "Account ID to authenticate with openstack swift");
880     device_property_fill_and_register(&device_property_swift_access_key,
881                                       G_TYPE_STRING, "swift_access_key",
882        "Access key to authenticate with openstack swift");
883     device_property_fill_and_register(&device_property_s3_host,
884                                       G_TYPE_STRING, "s3_host",
885        "hostname:port of the server");
886     device_property_fill_and_register(&device_property_s3_service_path,
887                                       G_TYPE_STRING, "s3_service_path",
888        "path to add in the url");
889     device_property_fill_and_register(&device_property_s3_user_token,
890                                       G_TYPE_STRING, "s3_user_token",
891        "User token for authentication Amazon devpay requests");
892     device_property_fill_and_register(&device_property_s3_bucket_location,
893                                       G_TYPE_STRING, "s3_bucket_location",
894        "Location constraint for buckets on Amazon S3");
895     device_property_fill_and_register(&device_property_s3_storage_class,
896                                       G_TYPE_STRING, "s3_storage_class",
897        "Storage class as specified by Amazon (STANDARD or REDUCED_REDUNDANCY)");
898     device_property_fill_and_register(&device_property_s3_server_side_encryption,
899                                       G_TYPE_STRING, "s3_server_side_encryption",
900        "Serve side encryption as specified by Amazon (AES256)");
901     device_property_fill_and_register(&device_property_ssl_ca_info,
902                                       G_TYPE_STRING, "ssl_ca_info",
903        "Path to certificate authority certificate");
904     device_property_fill_and_register(&device_property_openstack_swift_api,
905                                       G_TYPE_BOOLEAN, "openstack_swift_api",
906        "Whether to use openstack protocol");
907     device_property_fill_and_register(&device_property_s3_ssl,
908                                       G_TYPE_BOOLEAN, "s3_ssl",
909        "Whether to use SSL with Amazon S3");
910     device_property_fill_and_register(&device_property_s3_subdomain,
911                                       G_TYPE_BOOLEAN, "s3_subdomain",
912        "Whether to use subdomain");
913     device_property_fill_and_register(&device_property_max_send_speed,
914                                       G_TYPE_UINT64, "max_send_speed",
915        "Maximum average upload speed (bytes/sec)");
916     device_property_fill_and_register(&device_property_max_recv_speed,
917                                       G_TYPE_UINT64, "max_recv_speed",
918        "Maximum average download speed (bytes/sec)");
919     device_property_fill_and_register(&device_property_nb_threads_backup,
920                                       G_TYPE_UINT64, "nb_threads_backup",
921        "Number of writer thread");
922     device_property_fill_and_register(&device_property_nb_threads_recovery,
923                                       G_TYPE_UINT64, "nb_threads_recovery",
924        "Number of reader thread");
925
926     /* register the device itself */
927     register_device(s3_device_factory, device_prefix_list);
928 }
929
930 static GType
931 s3_device_get_type(void)
932 {
933     static GType type = 0;
934
935     if G_UNLIKELY(type == 0) {
936         static const GTypeInfo info = {
937             sizeof (S3DeviceClass),
938             (GBaseInitFunc) NULL,
939             (GBaseFinalizeFunc) NULL,
940             (GClassInitFunc) s3_device_class_init,
941             (GClassFinalizeFunc) NULL,
942             NULL /* class_data */,
943             sizeof (S3Device),
944             0 /* n_preallocs */,
945             (GInstanceInitFunc) s3_device_init,
946             NULL
947         };
948
949         type = g_type_register_static (TYPE_DEVICE, "S3Device", &info,
950                                        (GTypeFlags)0);
951     }
952
953     return type;
954 }
955
956 static void
957 s3_device_init(S3Device * self)
958 {
959     Device * dself = DEVICE(self);
960     GValue response;
961
962     self->volume_bytes = 0;
963     self->volume_limit = 0;
964     self->leom = TRUE;
965     self->enforce_volume_limit = FALSE;
966     self->use_subdomain = FALSE;
967     self->nb_threads = 1;
968     self->nb_threads_backup = 1;
969     self->nb_threads_recovery = 1;
970     self->thread_pool_delete = NULL;
971     self->thread_pool_write = NULL;
972     self->thread_pool_read = NULL;
973     self->thread_idle_cond = NULL;
974     self->thread_idle_mutex = NULL;
975
976     /* Register property values
977      * Note: Some aren't added until s3_device_open_device()
978      */
979     bzero(&response, sizeof(response));
980
981     g_value_init(&response, CONCURRENCY_PARADIGM_TYPE);
982     g_value_set_enum(&response, CONCURRENCY_PARADIGM_SHARED_READ);
983     device_set_simple_property(dself, PROPERTY_CONCURRENCY,
984             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
985     g_value_unset(&response);
986
987     g_value_init(&response, STREAMING_REQUIREMENT_TYPE);
988     g_value_set_enum(&response, STREAMING_REQUIREMENT_NONE);
989     device_set_simple_property(dself, PROPERTY_STREAMING,
990             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
991     g_value_unset(&response);
992
993     g_value_init(&response, G_TYPE_BOOLEAN);
994     g_value_set_boolean(&response, TRUE);
995     device_set_simple_property(dself, PROPERTY_APPENDABLE,
996             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
997     g_value_unset(&response);
998
999     g_value_init(&response, G_TYPE_BOOLEAN);
1000     g_value_set_boolean(&response, TRUE);
1001     device_set_simple_property(dself, PROPERTY_PARTIAL_DELETION,
1002             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1003     g_value_unset(&response);
1004
1005     g_value_init(&response, G_TYPE_BOOLEAN);
1006     g_value_set_boolean(&response, TRUE);
1007     device_set_simple_property(dself, PROPERTY_FULL_DELETION,
1008             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1009     g_value_unset(&response);
1010
1011     g_value_init(&response, G_TYPE_BOOLEAN);
1012     g_value_set_boolean(&response, TRUE); /* well, there *is* no EOM on S3 .. */
1013     device_set_simple_property(dself, PROPERTY_LEOM,
1014             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1015     g_value_unset(&response);
1016
1017     g_value_init(&response, G_TYPE_BOOLEAN);
1018     g_value_set_boolean(&response, FALSE);
1019     device_set_simple_property(dself, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
1020             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1021     g_value_unset(&response);
1022
1023     g_value_init(&response, G_TYPE_BOOLEAN);
1024     g_value_set_boolean(&response, FALSE);
1025     device_set_simple_property(dself, PROPERTY_S3_SUBDOMAIN,
1026             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1027     g_value_unset(&response);
1028
1029     g_value_init(&response, G_TYPE_BOOLEAN);
1030     g_value_set_boolean(&response, FALSE);
1031     device_set_simple_property(dself, PROPERTY_COMPRESSION,
1032             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1033     g_value_unset(&response);
1034
1035     g_value_init(&response, MEDIA_ACCESS_MODE_TYPE);
1036     g_value_set_enum(&response, MEDIA_ACCESS_MODE_READ_WRITE);
1037     device_set_simple_property(dself, PROPERTY_MEDIUM_ACCESS_TYPE,
1038             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1039     g_value_unset(&response);
1040
1041 }
1042
1043 static void
1044 s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED)
1045 {
1046     GObjectClass *g_object_class = (GObjectClass*) c;
1047     DeviceClass *device_class = (DeviceClass *)c;
1048
1049     parent_class = g_type_class_ref (TYPE_DEVICE);
1050
1051     device_class->open_device = s3_device_open_device;
1052     device_class->read_label = s3_device_read_label;
1053     device_class->start = s3_device_start;
1054     device_class->finish = s3_device_finish;
1055
1056     device_class->start_file = s3_device_start_file;
1057     device_class->write_block = s3_device_write_block;
1058     device_class->finish_file = s3_device_finish_file;
1059
1060     device_class->seek_file = s3_device_seek_file;
1061     device_class->seek_block = s3_device_seek_block;
1062     device_class->read_block = s3_device_read_block;
1063     device_class->recycle_file = s3_device_recycle_file;
1064
1065     device_class->erase = s3_device_erase;
1066
1067     g_object_class->finalize = s3_device_finalize;
1068
1069     device_class_register_property(device_class, PROPERTY_S3_ACCESS_KEY,
1070             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1071             device_simple_property_get_fn,
1072             s3_device_set_access_key_fn);
1073
1074     device_class_register_property(device_class, PROPERTY_S3_SECRET_KEY,
1075             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1076             device_simple_property_get_fn,
1077             s3_device_set_secret_key_fn);
1078
1079     device_class_register_property(device_class, PROPERTY_SWIFT_ACCOUNT_ID,
1080             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1081             device_simple_property_get_fn,
1082             s3_device_set_swift_account_id_fn);
1083
1084     device_class_register_property(device_class, PROPERTY_SWIFT_ACCESS_KEY,
1085             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1086             device_simple_property_get_fn,
1087             s3_device_set_swift_access_key_fn);
1088
1089     device_class_register_property(device_class, PROPERTY_S3_HOST,
1090             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1091             device_simple_property_get_fn,
1092             s3_device_set_host_fn);
1093
1094     device_class_register_property(device_class, PROPERTY_S3_SERVICE_PATH,
1095             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1096             device_simple_property_get_fn,
1097             s3_device_set_service_path_fn);
1098
1099     device_class_register_property(device_class, PROPERTY_S3_USER_TOKEN,
1100             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1101             device_simple_property_get_fn,
1102             s3_device_set_user_token_fn);
1103
1104     device_class_register_property(device_class, PROPERTY_S3_BUCKET_LOCATION,
1105             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1106             device_simple_property_get_fn,
1107             s3_device_set_bucket_location_fn);
1108
1109     device_class_register_property(device_class, PROPERTY_S3_STORAGE_CLASS,
1110             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1111             device_simple_property_get_fn,
1112             s3_device_set_storage_class_fn);
1113
1114     device_class_register_property(device_class, PROPERTY_S3_SERVER_SIDE_ENCRYPTION,
1115             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1116             device_simple_property_get_fn,
1117             s3_device_set_server_side_encryption_fn);
1118
1119     device_class_register_property(device_class, PROPERTY_SSL_CA_INFO,
1120             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1121             device_simple_property_get_fn,
1122             s3_device_set_ca_info_fn);
1123
1124     device_class_register_property(device_class, PROPERTY_VERBOSE,
1125             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1126             device_simple_property_get_fn,
1127             s3_device_set_verbose_fn);
1128
1129     device_class_register_property(device_class, PROPERTY_OPENSTACK_SWIFT_API,
1130             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1131             device_simple_property_get_fn,
1132             s3_device_set_openstack_swift_api_fn);
1133
1134     device_class_register_property(device_class, PROPERTY_S3_SSL,
1135             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1136             device_simple_property_get_fn,
1137             s3_device_set_ssl_fn);
1138
1139     device_class_register_property(device_class, PROPERTY_MAX_SEND_SPEED,
1140             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1141             device_simple_property_get_fn,
1142             s3_device_set_max_send_speed_fn);
1143
1144     device_class_register_property(device_class, PROPERTY_MAX_RECV_SPEED,
1145             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1146             device_simple_property_get_fn,
1147             s3_device_set_max_recv_speed_fn);
1148
1149     device_class_register_property(device_class, PROPERTY_NB_THREADS_BACKUP,
1150             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1151             device_simple_property_get_fn,
1152             s3_device_set_nb_threads_backup);
1153
1154     device_class_register_property(device_class, PROPERTY_NB_THREADS_RECOVERY,
1155             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1156             device_simple_property_get_fn,
1157             s3_device_set_nb_threads_recovery);
1158
1159     device_class_register_property(device_class, PROPERTY_COMPRESSION,
1160             PROPERTY_ACCESS_GET_MASK,
1161             device_simple_property_get_fn,
1162             NULL);
1163
1164     device_class_register_property(device_class, PROPERTY_LEOM,
1165             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1166             device_simple_property_get_fn,
1167             property_set_leom_fn);
1168
1169     device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
1170             (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1171                 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1172             device_simple_property_get_fn,
1173             s3_device_set_max_volume_usage_fn);
1174
1175     device_class_register_property(device_class, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
1176             (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1177                 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1178             device_simple_property_get_fn,
1179             s3_device_set_enforce_max_volume_usage_fn);
1180
1181     device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN,
1182             (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1183                 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1184             device_simple_property_get_fn,
1185             s3_device_set_use_subdomain_fn);
1186 }
1187
1188 static gboolean
1189 s3_device_set_access_key_fn(Device *p_self, DevicePropertyBase *base,
1190     GValue *val, PropertySurety surety, PropertySource source)
1191 {
1192     S3Device *self = S3_DEVICE(p_self);
1193
1194     amfree(self->access_key);
1195     self->access_key = g_value_dup_string(val);
1196     device_clear_volume_details(p_self);
1197
1198     return device_simple_property_set_fn(p_self, base, val, surety, source);
1199 }
1200
1201 static gboolean
1202 s3_device_set_secret_key_fn(Device *p_self, DevicePropertyBase *base,
1203     GValue *val, PropertySurety surety, PropertySource source)
1204 {
1205     S3Device *self = S3_DEVICE(p_self);
1206
1207     amfree(self->secret_key);
1208     self->secret_key = g_value_dup_string(val);
1209     device_clear_volume_details(p_self);
1210
1211     return device_simple_property_set_fn(p_self, base, val, surety, source);
1212 }
1213
1214 static gboolean
1215 s3_device_set_swift_account_id_fn(Device *p_self, DevicePropertyBase *base,
1216     GValue *val, PropertySurety surety, PropertySource source)
1217 {
1218     S3Device *self = S3_DEVICE(p_self);
1219
1220     amfree(self->swift_account_id);
1221     self->swift_account_id = g_value_dup_string(val);
1222     device_clear_volume_details(p_self);
1223
1224     return device_simple_property_set_fn(p_self, base, val, surety, source);
1225 }
1226
1227 static gboolean
1228 s3_device_set_swift_access_key_fn(Device *p_self, DevicePropertyBase *base,
1229     GValue *val, PropertySurety surety, PropertySource source)
1230 {
1231     S3Device *self = S3_DEVICE(p_self);
1232
1233     amfree(self->swift_access_key);
1234     self->swift_access_key = g_value_dup_string(val);
1235     device_clear_volume_details(p_self);
1236
1237     return device_simple_property_set_fn(p_self, base, val, surety, source);
1238 }
1239
1240 static gboolean
1241 s3_device_set_host_fn(Device *p_self,
1242     DevicePropertyBase *base, GValue *val,
1243     PropertySurety surety, PropertySource source)
1244 {
1245     S3Device *self = S3_DEVICE(p_self);
1246
1247     amfree(self->host);
1248     self->host = g_value_dup_string(val);
1249     device_clear_volume_details(p_self);
1250
1251     return device_simple_property_set_fn(p_self, base, val, surety, source);
1252 }
1253
1254 static gboolean
1255 s3_device_set_service_path_fn(Device *p_self,
1256     DevicePropertyBase *base, GValue *val,
1257     PropertySurety surety, PropertySource source)
1258 {
1259     S3Device *self = S3_DEVICE(p_self);
1260
1261     amfree(self->service_path);
1262     self->service_path = g_value_dup_string(val);
1263     device_clear_volume_details(p_self);
1264
1265     return device_simple_property_set_fn(p_self, base, val, surety, source);
1266 }
1267
1268 static gboolean
1269 s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base,
1270     GValue *val, PropertySurety surety, PropertySource source)
1271 {
1272     S3Device *self = S3_DEVICE(p_self);
1273
1274     amfree(self->user_token);
1275     self->user_token = g_value_dup_string(val);
1276     device_clear_volume_details(p_self);
1277
1278     return device_simple_property_set_fn(p_self, base, val, surety, source);
1279 }
1280
1281 static gboolean
1282 s3_device_set_bucket_location_fn(Device *p_self, DevicePropertyBase *base,
1283     GValue *val, PropertySurety surety, PropertySource source)
1284 {
1285     S3Device *self = S3_DEVICE(p_self);
1286     char *str_val = g_value_dup_string(val);
1287
1288     if (str_val[0] && self->use_ssl && !s3_curl_location_compat()) {
1289         device_set_error(p_self, stralloc(_(
1290                 "Location constraint given for Amazon S3 bucket, "
1291                 "but libcurl is too old support wildcard certificates.")),
1292             DEVICE_STATUS_DEVICE_ERROR);
1293         goto fail;
1294     }
1295
1296     if (str_val[0] && !s3_bucket_location_compat(self->bucket)) {
1297         device_set_error(p_self, g_strdup_printf(_(
1298                 "Location constraint given for Amazon S3 bucket, "
1299                 "but the bucket name (%s) is not usable as a subdomain."),
1300                 self->bucket),
1301             DEVICE_STATUS_DEVICE_ERROR);
1302         goto fail;
1303     }
1304
1305     amfree(self->bucket_location);
1306     self->bucket_location = str_val;
1307     device_clear_volume_details(p_self);
1308
1309     return device_simple_property_set_fn(p_self, base, val, surety, source);
1310 fail:
1311     g_free(str_val);
1312     return FALSE;
1313 }
1314
1315 static gboolean
1316 s3_device_set_storage_class_fn(Device *p_self, DevicePropertyBase *base,
1317     GValue *val, PropertySurety surety, PropertySource source)
1318 {
1319     S3Device *self = S3_DEVICE(p_self);
1320     char *str_val = g_value_dup_string(val);
1321
1322     amfree(self->storage_class);
1323     self->storage_class = str_val;
1324     device_clear_volume_details(p_self);
1325
1326     return device_simple_property_set_fn(p_self, base, val, surety, source);
1327 }
1328
1329 static gboolean
1330 s3_device_set_server_side_encryption_fn(Device *p_self, DevicePropertyBase *base,
1331     GValue *val, PropertySurety surety, PropertySource source)
1332 {
1333     S3Device *self = S3_DEVICE(p_self);
1334     char *str_val = g_value_dup_string(val);
1335
1336     amfree(self->server_side_encryption);
1337     self->server_side_encryption = str_val;
1338     device_clear_volume_details(p_self);
1339
1340     return device_simple_property_set_fn(p_self, base, val, surety, source);
1341 }
1342
1343 static gboolean
1344 s3_device_set_ca_info_fn(Device *p_self, DevicePropertyBase *base,
1345     GValue *val, PropertySurety surety, PropertySource source)
1346 {
1347     S3Device *self = S3_DEVICE(p_self);
1348
1349     amfree(self->ca_info);
1350     self->ca_info = g_value_dup_string(val);
1351     device_clear_volume_details(p_self);
1352
1353     return device_simple_property_set_fn(p_self, base, val, surety, source);
1354 }
1355
1356 static gboolean
1357 s3_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base,
1358     GValue *val, PropertySurety surety, PropertySource source)
1359 {
1360     S3Device *self = S3_DEVICE(p_self);
1361     int       thread;
1362
1363     self->verbose = g_value_get_boolean(val);
1364     /* Our S3 handle may not yet have been instantiated; if so, it will
1365      * get the proper verbose setting when it is created */
1366     if (self->s3t) {
1367         for (thread = 0; thread < self->nb_threads; thread++) {
1368             if (self->s3t[thread].s3)
1369                 s3_verbose(self->s3t[thread].s3, self->verbose);
1370         }
1371     }
1372
1373     return device_simple_property_set_fn(p_self, base, val, surety, source);
1374 }
1375
1376 static gboolean
1377 s3_device_set_openstack_swift_api_fn(Device *p_self, DevicePropertyBase *base,
1378     GValue *val, PropertySurety surety, PropertySource source)
1379 {
1380     S3Device *self = S3_DEVICE(p_self);
1381
1382     self->openstack_swift_api = g_value_get_boolean(val);
1383
1384     return device_simple_property_set_fn(p_self, base, val, surety, source);
1385 }
1386
1387 static gboolean
1388 s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base,
1389     GValue *val, PropertySurety surety, PropertySource source)
1390 {
1391     S3Device *self = S3_DEVICE(p_self);
1392     gboolean new_val;
1393     int      thread;
1394
1395     new_val = g_value_get_boolean(val);
1396     /* Our S3 handle may not yet have been instantiated; if so, it will
1397      * get the proper use_ssl setting when it is created */
1398     if (self->s3t) {
1399         for (thread = 0; thread < self->nb_threads; thread++) {
1400             if (self->s3t[thread].s3 && !s3_use_ssl(self->s3t[thread].s3, new_val)) {
1401                 device_set_error(p_self, g_strdup_printf(_(
1402                         "Error setting S3 SSL/TLS use "
1403                         "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
1404                     DEVICE_STATUS_DEVICE_ERROR);
1405                 return FALSE;
1406             }
1407         }
1408     }
1409     self->use_ssl = new_val;
1410
1411     return device_simple_property_set_fn(p_self, base, val, surety, source);
1412 }
1413
1414 static gboolean
1415 s3_device_set_max_send_speed_fn(Device *p_self,
1416     DevicePropertyBase *base, GValue *val,
1417     PropertySurety surety, PropertySource source)
1418 {
1419     S3Device *self = S3_DEVICE(p_self);
1420     guint64 new_val;
1421     int     thread;
1422
1423     new_val = g_value_get_uint64(val);
1424     if (self->s3t) {
1425         for (thread = 0; thread < self->nb_threads; thread++) {
1426             if (self->s3t[thread].s3 && !s3_set_max_send_speed(self->s3t[thread].s3, new_val)) {
1427                 device_set_error(p_self,
1428                         g_strdup("Could not set S3 maximum send speed"),
1429                         DEVICE_STATUS_DEVICE_ERROR);
1430                 return FALSE;
1431             }
1432         }
1433     }
1434     self->max_send_speed = new_val;
1435
1436     return device_simple_property_set_fn(p_self, base, val, surety, source);
1437 }
1438
1439 static gboolean
1440 s3_device_set_max_recv_speed_fn(Device *p_self,
1441     DevicePropertyBase *base, GValue *val,
1442     PropertySurety surety, PropertySource source)
1443 {
1444     S3Device *self = S3_DEVICE(p_self);
1445     guint64 new_val;
1446     int     thread;
1447
1448     new_val = g_value_get_uint64(val);
1449     if (self->s3t) {
1450         for (thread = 0; thread < self->nb_threads; thread++) {
1451             if (self->s3t[thread].s3 &&
1452                 !s3_set_max_recv_speed(self->s3t[thread].s3, new_val)) {
1453                 device_set_error(p_self,
1454                         g_strdup("Could not set S3 maximum recv speed"),
1455                         DEVICE_STATUS_DEVICE_ERROR);
1456                 return FALSE;
1457             }
1458         }
1459     }
1460     self->max_recv_speed = new_val;
1461
1462     return device_simple_property_set_fn(p_self, base, val, surety, source);
1463 }
1464
1465 static gboolean
1466 s3_device_set_nb_threads_backup(Device *p_self,
1467     DevicePropertyBase *base, GValue *val,
1468     PropertySurety surety, PropertySource source)
1469 {
1470     S3Device *self = S3_DEVICE(p_self);
1471     guint64 new_val;
1472
1473     new_val = g_value_get_uint64(val);
1474     self->nb_threads_backup = new_val;
1475     if (self->nb_threads_backup > self->nb_threads) {
1476         self->nb_threads = self->nb_threads_backup;
1477     }
1478
1479     return device_simple_property_set_fn(p_self, base, val, surety, source);
1480 }
1481
1482 static gboolean
1483 s3_device_set_nb_threads_recovery(Device *p_self,
1484     DevicePropertyBase *base, GValue *val,
1485     PropertySurety surety, PropertySource source)
1486 {
1487     S3Device *self = S3_DEVICE(p_self);
1488     guint64 new_val;
1489
1490     new_val = g_value_get_uint64(val);
1491     self->nb_threads_recovery = new_val;
1492     if (self->nb_threads_recovery > self->nb_threads) {
1493         self->nb_threads = self->nb_threads_recovery;
1494     }
1495
1496     return device_simple_property_set_fn(p_self, base, val, surety, source);
1497 }
1498
1499 static gboolean
1500 s3_device_set_max_volume_usage_fn(Device *p_self,
1501     DevicePropertyBase *base, GValue *val,
1502     PropertySurety surety, PropertySource source)
1503 {
1504     S3Device *self = S3_DEVICE(p_self);
1505
1506     self->volume_limit = g_value_get_uint64(val);
1507
1508     return device_simple_property_set_fn(p_self, base, val, surety, source);
1509
1510 }
1511
1512 static gboolean
1513 s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
1514     DevicePropertyBase *base, GValue *val,
1515     PropertySurety surety, PropertySource source)
1516 {
1517     S3Device *self = S3_DEVICE(p_self);
1518
1519     self->enforce_volume_limit = g_value_get_boolean(val);
1520
1521     return device_simple_property_set_fn(p_self, base, val, surety, source);
1522
1523 }
1524
1525 static gboolean
1526 s3_device_set_use_subdomain_fn(Device *p_self,
1527     DevicePropertyBase *base, GValue *val,
1528     PropertySurety surety, PropertySource source)
1529 {
1530     S3Device *self = S3_DEVICE(p_self);
1531
1532     self->use_subdomain = g_value_get_boolean(val);
1533
1534     return device_simple_property_set_fn(p_self, base, val, surety, source);
1535 }
1536
1537 static gboolean
1538 property_set_leom_fn(Device *p_self,
1539     DevicePropertyBase *base, GValue *val,
1540     PropertySurety surety, PropertySource source)
1541 {
1542     S3Device *self = S3_DEVICE(p_self);
1543
1544     self->leom = g_value_get_boolean(val);
1545
1546     return device_simple_property_set_fn(p_self, base, val, surety, source);
1547 }
1548 static Device*
1549 s3_device_factory(char * device_name, char * device_type, char * device_node)
1550 {
1551     Device *rval;
1552     g_assert(0 == strcmp(device_type, S3_DEVICE_NAME));
1553     rval = DEVICE(g_object_new(TYPE_S3_DEVICE, NULL));
1554
1555     device_open_device(rval, device_name, device_type, device_node);
1556     return rval;
1557 }
1558
1559 /*
1560  * Virtual function overrides
1561  */
1562
1563 static void
1564 s3_device_open_device(Device *pself, char *device_name,
1565                         char * device_type, char * device_node)
1566 {
1567     S3Device *self = S3_DEVICE(pself);
1568     char * name_colon;
1569     GValue tmp_value;
1570
1571     pself->min_block_size = S3_DEVICE_MIN_BLOCK_SIZE;
1572     pself->max_block_size = S3_DEVICE_MAX_BLOCK_SIZE;
1573     pself->block_size = S3_DEVICE_DEFAULT_BLOCK_SIZE;
1574
1575     /* Device name may be bucket/prefix, to support multiple volumes in a
1576      * single bucket. */
1577     name_colon = strchr(device_node, '/');
1578     if (name_colon == NULL) {
1579         self->bucket = g_strdup(device_node);
1580         self->prefix = g_strdup("");
1581     } else {
1582         self->bucket = g_strndup(device_node, name_colon - device_node);
1583         self->prefix = g_strdup(name_colon + 1);
1584     }
1585
1586     if (self->bucket == NULL || self->bucket[0] == '\0') {
1587         device_set_error(pself,
1588             vstrallocf(_("Empty bucket name in device %s"), device_name),
1589             DEVICE_STATUS_DEVICE_ERROR);
1590         amfree(self->bucket);
1591         amfree(self->prefix);
1592         return;
1593     }
1594
1595     g_debug(_("S3 driver using bucket '%s', prefix '%s'"), self->bucket, self->prefix);
1596
1597     /* default values */
1598     self->verbose = FALSE;
1599     self->openstack_swift_api = FALSE;
1600
1601     /* use SSL if available */
1602     self->use_ssl = s3_curl_supports_ssl();
1603     bzero(&tmp_value, sizeof(GValue));
1604     g_value_init(&tmp_value, G_TYPE_BOOLEAN);
1605     g_value_set_boolean(&tmp_value, self->use_ssl);
1606     device_set_simple_property(pself, device_property_s3_ssl.ID,
1607         &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
1608
1609     if (parent_class->open_device) {
1610         parent_class->open_device(pself, device_name, device_type, device_node);
1611     }
1612 }
1613
1614 static void s3_device_finalize(GObject * obj_self) {
1615     S3Device *self = S3_DEVICE (obj_self);
1616     int thread;
1617
1618     if(G_OBJECT_CLASS(parent_class)->finalize)
1619         (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
1620
1621     if (self->thread_pool_delete) {
1622         g_thread_pool_free(self->thread_pool_delete, 1, 1);
1623         self->thread_pool_delete = NULL;
1624     }
1625     if (self->thread_pool_write) {
1626         g_thread_pool_free(self->thread_pool_write, 1, 1);
1627         self->thread_pool_write = NULL;
1628     }
1629     if (self->thread_pool_read) {
1630         g_thread_pool_free(self->thread_pool_read, 1, 1);
1631         self->thread_pool_read = NULL;
1632     }
1633     if (self->thread_idle_mutex) {
1634         g_mutex_free(self->thread_idle_mutex);
1635         self->thread_idle_mutex = NULL;
1636     }
1637     if (self->thread_idle_cond) {
1638         g_cond_free(self->thread_idle_cond);
1639         self->thread_idle_cond = NULL;
1640     }
1641     if (self->s3t) {
1642         for (thread = 0; thread < self->nb_threads; thread++) {
1643             if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3);
1644         }
1645         g_free(self->s3t);
1646     }
1647     if(self->bucket) g_free(self->bucket);
1648     if(self->prefix) g_free(self->prefix);
1649     if(self->access_key) g_free(self->access_key);
1650     if(self->secret_key) g_free(self->secret_key);
1651     if(self->swift_account_id) g_free(self->swift_account_id);
1652     if(self->swift_access_key) g_free(self->swift_access_key);
1653     if(self->host) g_free(self->host);
1654     if(self->service_path) g_free(self->service_path);
1655     if(self->user_token) g_free(self->user_token);
1656     if(self->bucket_location) g_free(self->bucket_location);
1657     if(self->storage_class) g_free(self->storage_class);
1658     if(self->server_side_encryption) g_free(self->server_side_encryption);
1659     if(self->ca_info) g_free(self->ca_info);
1660 }
1661
1662 static gboolean setup_handle(S3Device * self) {
1663     Device *d_self = DEVICE(self);
1664     int thread;
1665     guint response_code;
1666     s3_error_code_t s3_error_code;
1667     CURLcode curl_code;
1668
1669     if (self->s3t == NULL) {
1670         self->s3t = g_new(S3_by_thread, self->nb_threads);
1671         if (self->s3t == NULL) {
1672             device_set_error(d_self,
1673                 stralloc(_("Can't allocate S3Handle array")),
1674                 DEVICE_STATUS_DEVICE_ERROR);
1675             return FALSE;
1676         }
1677
1678         if (!self->openstack_swift_api) {
1679             if (self->access_key == NULL || self->access_key[0] == '\0') {
1680                 device_set_error(d_self,
1681                     g_strdup(_("No Amazon access key specified")),
1682                     DEVICE_STATUS_DEVICE_ERROR);
1683                 return FALSE;
1684             }
1685
1686             if (self->secret_key == NULL || self->secret_key[0] == '\0') {
1687                 device_set_error(d_self,
1688                     g_strdup(_("No Amazon secret key specified")),
1689                     DEVICE_STATUS_DEVICE_ERROR);
1690                 return FALSE;
1691             }
1692         } else {
1693             if (self->swift_account_id == NULL ||
1694                 self->swift_account_id[0] == '\0') {
1695                 device_set_error(d_self,
1696                     g_strdup(_("No Swift account id specified")),
1697                     DEVICE_STATUS_DEVICE_ERROR);
1698                 return FALSE;
1699             }
1700             if (self->swift_access_key == NULL ||
1701                 self->swift_access_key[0] == '\0') {
1702                 device_set_error(d_self,
1703                     g_strdup(_("No Swift access key specified")),
1704                     DEVICE_STATUS_DEVICE_ERROR);
1705                 return FALSE;
1706             }
1707         }
1708
1709         if (!self->use_ssl && self->ca_info) {
1710             amfree(self->ca_info);
1711         }
1712
1713         self->thread_idle_cond = g_cond_new();
1714         self->thread_idle_mutex = g_mutex_new();
1715
1716         for (thread = 0; thread < self->nb_threads; thread++) {
1717             self->s3t[thread].idle = 1;
1718             self->s3t[thread].done = 1;
1719             self->s3t[thread].eof = FALSE;
1720             self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
1721             self->s3t[thread].errmsg = NULL;
1722             self->s3t[thread].filename = NULL;
1723             self->s3t[thread].curl_buffer.buffer = NULL;
1724             self->s3t[thread].curl_buffer.buffer_len = 0;
1725             self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key,
1726                                            self->swift_account_id,
1727                                            self->swift_access_key,
1728                                            self->host, self->service_path,
1729                                            self->use_subdomain,
1730                                            self->user_token, self->bucket_location,
1731                                            self->storage_class, self->ca_info,
1732                                            self->server_side_encryption,
1733                                            self->openstack_swift_api);
1734             if (self->s3t[thread].s3 == NULL) {
1735                 device_set_error(d_self,
1736                     stralloc(_("Internal error creating S3 handle")),
1737                     DEVICE_STATUS_DEVICE_ERROR);
1738                 self->nb_threads = thread+1;
1739                 return FALSE;
1740             } else if (self->openstack_swift_api) {
1741                 s3_error(self->s3t[0].s3, NULL, &response_code,
1742                         &s3_error_code, NULL, &curl_code, NULL);
1743                 if (response_code != 200) {
1744                     device_set_error(d_self,
1745                         g_strdup_printf(_("Internal error creating S3 handle: %s"),
1746                                         s3_strerror(self->s3t[0].s3)),
1747                         DEVICE_STATUS_DEVICE_ERROR);
1748                     self->nb_threads = thread+1;
1749                     return FALSE;
1750                 }
1751             }
1752         }
1753
1754         g_debug("Create %d threads", self->nb_threads);
1755         self->thread_pool_delete = g_thread_pool_new(s3_thread_delete_block,
1756                                                      self, self->nb_threads, 0,
1757                                                      NULL);
1758         self->thread_pool_write = g_thread_pool_new(s3_thread_write_block, self,
1759                                               self->nb_threads, 0, NULL);
1760         self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self,
1761                                               self->nb_threads, 0, NULL);
1762     }
1763
1764     for (thread = 0; thread < self->nb_threads; thread++) {
1765         s3_verbose(self->s3t[thread].s3, self->verbose);
1766
1767         if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
1768             device_set_error(d_self, g_strdup_printf(_(
1769                 "Error setting S3 SSL/TLS use "
1770                 "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
1771                 DEVICE_STATUS_DEVICE_ERROR);
1772             return FALSE;
1773         }
1774
1775         if (self->max_send_speed &&
1776             !s3_set_max_send_speed(self->s3t[thread].s3, self->max_send_speed)) {
1777             device_set_error(d_self,
1778                 g_strdup("Could not set S3 maximum send speed"),
1779                 DEVICE_STATUS_DEVICE_ERROR);
1780             return FALSE;
1781         }
1782
1783         if (self->max_recv_speed &&
1784             !s3_set_max_recv_speed(self->s3t[thread].s3, self->max_recv_speed)) {
1785             device_set_error(d_self,
1786                 g_strdup("Could not set S3 maximum recv speed"),
1787                 DEVICE_STATUS_DEVICE_ERROR);
1788             return FALSE;
1789         }
1790     }
1791
1792     return TRUE;
1793 }
1794
1795 static gboolean
1796 make_bucket(
1797     Device * pself)
1798 {
1799     S3Device *self = S3_DEVICE(pself);
1800     guint response_code;
1801     s3_error_code_t s3_error_code;
1802     CURLcode curl_code;
1803
1804     if (s3_is_bucket_exists(self->s3t[0].s3, self->bucket)) {
1805         return TRUE;
1806     }
1807
1808     s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, &curl_code, NULL);
1809
1810     if (response_code == 0 && s3_error_code == 0 &&
1811         (curl_code == CURLE_COULDNT_CONNECT ||
1812          curl_code == CURLE_COULDNT_RESOLVE_HOST)) {
1813         device_set_error(pself,
1814             g_strdup_printf(_("While connecting to S3 bucket: %s"),
1815                             s3_strerror(self->s3t[0].s3)),
1816                 DEVICE_STATUS_DEVICE_ERROR);
1817         return FALSE;
1818     }
1819
1820     if (!s3_make_bucket(self->s3t[0].s3, self->bucket)) {
1821         s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
1822
1823         /* if it isn't an expected error (bucket already exists),
1824          * return FALSE */
1825         if (response_code != 409 ||
1826             (s3_error_code != S3_ERROR_BucketAlreadyExists &&
1827              s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) {
1828             device_set_error(pself,
1829                 g_strdup_printf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)),
1830                 DEVICE_STATUS_DEVICE_ERROR);
1831             return FALSE;
1832         }
1833     }
1834     return TRUE;
1835 }
1836
1837 static DeviceStatusFlags
1838 s3_device_read_label(Device *pself) {
1839     S3Device *self = S3_DEVICE(pself);
1840     char *key;
1841     CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
1842     dumpfile_t *amanda_header;
1843     /* note that this may be called from s3_device_start, when
1844      * self->access_mode is not ACCESS_NULL */
1845
1846     amfree(pself->volume_label);
1847     amfree(pself->volume_time);
1848     dumpfile_free(pself->volume_header);
1849     pself->volume_header = NULL;
1850
1851     if (device_in_error(self)) return pself->status;
1852
1853     if (!setup_handle(self)) {
1854         /* setup_handle already set our error message */
1855         return pself->status;
1856     }
1857     reset_thread(self);
1858
1859     key = special_file_to_key(self, "tapestart", -1);
1860
1861     if (!make_bucket(pself)) {
1862         return pself->status;
1863     }
1864
1865     if (!s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) {
1866         guint response_code;
1867         s3_error_code_t s3_error_code;
1868         s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
1869
1870         /* if it's an expected error (not found), just return FALSE */
1871         if (response_code == 404 &&
1872              (s3_error_code == S3_ERROR_None ||
1873               s3_error_code == S3_ERROR_Unknown ||
1874               s3_error_code == S3_ERROR_NoSuchKey ||
1875               s3_error_code == S3_ERROR_NoSuchEntity ||
1876               s3_error_code == S3_ERROR_NoSuchBucket)) {
1877             g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)"));
1878             device_set_error(pself,
1879                 stralloc(_("Amanda header not found -- unlabeled volume?")),
1880                   DEVICE_STATUS_DEVICE_ERROR
1881                 | DEVICE_STATUS_VOLUME_ERROR
1882                 | DEVICE_STATUS_VOLUME_UNLABELED);
1883             return pself->status;
1884         }
1885
1886         /* otherwise, log it and return */
1887         device_set_error(pself,
1888             vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3t[0].s3)),
1889             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
1890         return pself->status;
1891     }
1892
1893     /* handle an empty file gracefully */
1894     if (buf.buffer_len == 0) {
1895         device_set_error(pself, stralloc(_("Empty header file")), DEVICE_STATUS_VOLUME_ERROR);
1896         return pself->status;
1897     }
1898
1899     pself->header_block_size = buf.buffer_len;
1900     g_assert(buf.buffer != NULL);
1901     amanda_header = g_new(dumpfile_t, 1);
1902     parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
1903     pself->volume_header = amanda_header;
1904     g_free(buf.buffer);
1905
1906     if (amanda_header->type != F_TAPESTART) {
1907         device_set_error(pself, stralloc(_("Invalid amanda header")), DEVICE_STATUS_VOLUME_ERROR);
1908         return pself->status;
1909     }
1910
1911     pself->volume_label = g_strdup(amanda_header->name);
1912     pself->volume_time = g_strdup(amanda_header->datestamp);
1913     /* pself->volume_header is already set */
1914
1915     device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
1916
1917     return pself->status;
1918 }
1919
1920 static gboolean
1921 s3_device_start (Device * pself, DeviceAccessMode mode,
1922                  char * label, char * timestamp) {
1923     S3Device * self;
1924     GSList *keys;
1925     guint64 total_size = 0;
1926     gboolean result;
1927
1928     self = S3_DEVICE(pself);
1929
1930     if (device_in_error(self)) return FALSE;
1931
1932     if (!setup_handle(self)) {
1933         /* setup_handle already set our error message */
1934         return FALSE;
1935     }
1936
1937     reset_thread(self);
1938     pself->access_mode = mode;
1939     pself->in_file = FALSE;
1940
1941     /* try creating the bucket, in case it doesn't exist */
1942     if (!make_bucket(pself)) {
1943         return FALSE;
1944     }
1945
1946     /* take care of any dirty work for this mode */
1947     switch (mode) {
1948         case ACCESS_READ:
1949             if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
1950                 /* s3_device_read_label already set our error message */
1951                 return FALSE;
1952             }
1953             break;
1954
1955         case ACCESS_WRITE:
1956             delete_all_files(self);
1957
1958             /* write a new amanda header */
1959             if (!write_amanda_header(self, label, timestamp)) {
1960                 return FALSE;
1961             }
1962
1963             pself->volume_label = newstralloc(pself->volume_label, label);
1964             pself->volume_time = newstralloc(pself->volume_time, timestamp);
1965
1966             /* unset the VOLUME_UNLABELED flag, if it was set */
1967             device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
1968             break;
1969
1970         case ACCESS_APPEND:
1971             if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
1972                 /* s3_device_read_label already set our error message */
1973                 return FALSE;
1974             } else {
1975                 result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, NULL, &keys, &total_size);
1976                 if(!result) {
1977                     device_set_error(pself,
1978                                  vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
1979                                  DEVICE_STATUS_DEVICE_ERROR|DEVICE_STATUS_VOLUME_ERROR);
1980                     return FALSE;
1981                 } else {
1982                     self->volume_bytes = total_size;
1983                 }
1984             }
1985             return seek_to_end(self);
1986             break;
1987
1988         case ACCESS_NULL:
1989             g_assert_not_reached();
1990     }
1991
1992     return TRUE;
1993 }
1994
1995 static gboolean
1996 s3_device_finish (
1997     Device * pself)
1998 {
1999     S3Device *self = S3_DEVICE(pself);
2000
2001     reset_thread(self);
2002
2003     /* we're not in a file anymore */
2004     pself->access_mode = ACCESS_NULL;
2005
2006     if (device_in_error(pself)) return FALSE;
2007
2008     return TRUE;
2009 }
2010
2011 /* functions for writing */
2012
2013
2014 static gboolean
2015 s3_device_start_file (Device *pself, dumpfile_t *jobInfo) {
2016     S3Device *self = S3_DEVICE(pself);
2017     CurlBuffer amanda_header = {NULL, 0, 0, 0};
2018     gboolean result;
2019     size_t header_size;
2020     char  *key;
2021     int    thread;
2022
2023     if (device_in_error(self)) return FALSE;
2024
2025     reset_thread(self);
2026     pself->is_eom = FALSE;
2027
2028     /* Set the blocksize to zero, since there's no header to skip (it's stored
2029      * in a distinct file, rather than block zero) */
2030     jobInfo->blocksize = 0;
2031
2032     /* Build the amanda header. */
2033     header_size = 0; /* no minimum size */
2034     amanda_header.buffer = device_build_amanda_header(pself, jobInfo,
2035         &header_size);
2036     if (amanda_header.buffer == NULL) {
2037         device_set_error(pself,
2038             stralloc(_("Amanda file header won't fit in a single block!")),
2039             DEVICE_STATUS_DEVICE_ERROR);
2040         return FALSE;
2041     }
2042     amanda_header.buffer_len = header_size;
2043
2044     if(check_at_leom(self, header_size))
2045         pself->is_eom = TRUE;
2046
2047     if(check_at_peom(self, header_size)) {
2048         pself->is_eom = TRUE;
2049         device_set_error(pself,
2050             stralloc(_("No space left on device")),
2051             DEVICE_STATUS_DEVICE_ERROR);
2052         g_free(amanda_header.buffer);
2053         return FALSE;
2054     }
2055     /* set the file and block numbers correctly */
2056     pself->file = (pself->file > 0)? pself->file+1 : 1;
2057     pself->block = 0;
2058     pself->in_file = TRUE;
2059     /* write it out as a special block (not the 0th) */
2060     key = special_file_to_key(self, "filestart", pself->file);
2061     result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
2062                        &amanda_header, NULL, NULL);
2063     g_free(amanda_header.buffer);
2064     g_free(key);
2065     if (!result) {
2066         device_set_error(pself,
2067             vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3t[0].s3)),
2068             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
2069         return FALSE;
2070     }
2071
2072     self->volume_bytes += header_size;
2073     for (thread = 0; thread < self->nb_threads; thread++)  {
2074         self->s3t[thread].idle = 1;
2075     }
2076
2077     return TRUE;
2078 }
2079
2080 static gboolean
2081 s3_device_write_block (Device * pself, guint size, gpointer data) {
2082     char *filename;
2083     S3Device * self = S3_DEVICE(pself);
2084     int idle_thread = 0;
2085     int thread = -1;
2086     int first_idle = -1;
2087
2088     g_assert (self != NULL);
2089     g_assert (data != NULL);
2090     if (device_in_error(self)) return FALSE;
2091
2092     if(check_at_leom(self, size))
2093         pself->is_eom = TRUE;
2094
2095     if(check_at_peom(self, size)) {
2096         pself->is_eom = TRUE;
2097         device_set_error(pself,
2098             stralloc(_("No space left on device")),
2099             DEVICE_STATUS_DEVICE_ERROR);
2100         return FALSE;
2101     }
2102
2103     filename = file_and_block_to_key(self, pself->file, pself->block);
2104
2105     g_mutex_lock(self->thread_idle_mutex);
2106     while (!idle_thread) {
2107         idle_thread = 0;
2108         for (thread = 0; thread < self->nb_threads_backup; thread++)  {
2109             if (self->s3t[thread].idle == 1) {
2110                 idle_thread++;
2111                 /* Check if the thread is in error */
2112                 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
2113                     device_set_error(pself, (char *)self->s3t[thread].errmsg,
2114                                      self->s3t[thread].errflags);
2115                     self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
2116                     self->s3t[thread].errmsg = NULL;
2117                     g_mutex_unlock(self->thread_idle_mutex);
2118                     return FALSE;
2119                 }
2120                 if (first_idle == -1) {
2121                     first_idle = thread;
2122                     break;
2123                 }
2124             }
2125         }
2126         if (!idle_thread) {
2127             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2128         }
2129     }
2130     thread = first_idle;
2131
2132     self->s3t[thread].idle = 0;
2133     self->s3t[thread].done = 0;
2134     if (self->s3t[thread].curl_buffer.buffer &&
2135         self->s3t[thread].curl_buffer.buffer_len < size) {
2136         g_free((char *)self->s3t[thread].curl_buffer.buffer);
2137         self->s3t[thread].curl_buffer.buffer = NULL;
2138         self->s3t[thread].curl_buffer.buffer_len = 0;
2139         self->s3t[thread].buffer_len = 0;
2140     }
2141     if (self->s3t[thread].curl_buffer.buffer == NULL) {
2142         self->s3t[thread].curl_buffer.buffer = g_malloc(size);
2143         self->s3t[thread].curl_buffer.buffer_len = size;
2144         self->s3t[thread].buffer_len = size;
2145     }
2146     memcpy((char *)self->s3t[thread].curl_buffer.buffer, data, size);
2147     self->s3t[thread].curl_buffer.buffer_pos = 0;
2148     self->s3t[thread].curl_buffer.buffer_len = size;
2149     self->s3t[thread].curl_buffer.max_buffer_size = 0;
2150     self->s3t[thread].filename = filename;
2151     g_thread_pool_push(self->thread_pool_write, &self->s3t[thread], NULL);
2152     g_mutex_unlock(self->thread_idle_mutex);
2153
2154     pself->block++;
2155     self->volume_bytes += size;
2156     return TRUE;
2157 }
2158
2159 static void
2160 s3_thread_write_block(
2161     gpointer thread_data,
2162     gpointer data)
2163 {
2164     S3_by_thread *s3t = (S3_by_thread *)thread_data;
2165     Device *pself = (Device *)data;
2166     S3Device *self = S3_DEVICE(pself);
2167     gboolean result;
2168
2169     result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename,
2170                        S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
2171     g_free((void *)s3t->filename);
2172     s3t->filename = NULL;
2173     if (!result) {
2174         s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
2175         s3t->errmsg = g_strdup_printf(_("While writing data block to S3: %s"), s3_strerror(s3t->s3));
2176     }
2177     g_mutex_lock(self->thread_idle_mutex);
2178     s3t->idle = 1;
2179     s3t->done = 1;
2180     s3t->curl_buffer.buffer_len = s3t->buffer_len;
2181     g_cond_broadcast(self->thread_idle_cond);
2182     g_mutex_unlock(self->thread_idle_mutex);
2183 }
2184
2185 static gboolean
2186 s3_device_finish_file (Device * pself) {
2187     S3Device *self = S3_DEVICE(pself);
2188
2189     /* Check all threads are done */
2190     int idle_thread = 0;
2191     int thread;
2192
2193     g_mutex_lock(self->thread_idle_mutex);
2194     while (idle_thread != self->nb_threads) {
2195         idle_thread = 0;
2196         for (thread = 0; thread < self->nb_threads; thread++)  {
2197             if (self->s3t[thread].idle == 1) {
2198                 idle_thread++;
2199             }
2200             /* check thread status */
2201             if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
2202                 device_set_error(pself, (char *)self->s3t[thread].errmsg,
2203                                  self->s3t[thread].errflags);
2204                 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
2205                 self->s3t[thread].errmsg = NULL;
2206             }
2207         }
2208         if (idle_thread != self->nb_threads) {
2209             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2210         }
2211     }
2212     g_mutex_unlock(self->thread_idle_mutex);
2213
2214     if (device_in_error(pself)) return FALSE;
2215
2216     /* we're not in a file anymore */
2217     pself->in_file = FALSE;
2218
2219     return TRUE;
2220 }
2221
2222 static gboolean
2223 s3_device_recycle_file(Device *pself, guint file) {
2224     S3Device *self = S3_DEVICE(pself);
2225     if (device_in_error(self)) return FALSE;
2226
2227     reset_thread(self);
2228     delete_file(self, file);
2229     s3_wait_thread_delete(self);
2230     return !device_in_error(self);
2231     /* delete_file already set our error message if necessary */
2232 }
2233
2234 static gboolean
2235 s3_device_erase(Device *pself) {
2236     S3Device *self = S3_DEVICE(pself);
2237     char *key = NULL;
2238     const char *errmsg = NULL;
2239     guint response_code;
2240     s3_error_code_t s3_error_code;
2241
2242     if (!setup_handle(self)) {
2243         /* error set by setup_handle */
2244         return FALSE;
2245     }
2246
2247     reset_thread(self);
2248     key = special_file_to_key(self, "tapestart", -1);
2249     if (!s3_delete(self->s3t[0].s3, self->bucket, key)) {
2250         s3_error(self->s3t[0].s3, &errmsg, NULL, NULL, NULL, NULL, NULL);
2251         device_set_error(pself,
2252             stralloc(errmsg),
2253             DEVICE_STATUS_DEVICE_ERROR);
2254         return FALSE;
2255     }
2256     g_free(key);
2257
2258     dumpfile_free(pself->volume_header);
2259     pself->volume_header = NULL;
2260
2261     if (!delete_all_files(self))
2262         return FALSE;
2263
2264     device_set_error(pself, g_strdup("Unlabeled volume"),
2265                      DEVICE_STATUS_VOLUME_UNLABELED);
2266
2267     if (!s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
2268         s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2269
2270         /*
2271          * ignore the error if the bucket isn't empty (there may be data from elsewhere)
2272          * or the bucket not existing (already deleted perhaps?)
2273          */
2274         if (!(
2275                 (response_code == 409 && s3_error_code == S3_ERROR_BucketNotEmpty) ||
2276                 (response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket))) {
2277
2278             device_set_error(pself,
2279                 stralloc(errmsg),
2280                 DEVICE_STATUS_DEVICE_ERROR);
2281             return FALSE;
2282         }
2283     }
2284     self->volume_bytes = 0;
2285     return TRUE;
2286 }
2287
2288 /* functions for reading */
2289
2290 static dumpfile_t*
2291 s3_device_seek_file(Device *pself, guint file) {
2292     S3Device *self = S3_DEVICE(pself);
2293     gboolean result;
2294     char *key;
2295     CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
2296     dumpfile_t *amanda_header;
2297     const char *errmsg = NULL;
2298     int thread;
2299
2300     if (device_in_error(self)) return NULL;
2301
2302     reset_thread(self);
2303
2304     pself->file = file;
2305     pself->is_eof = FALSE;
2306     pself->in_file = FALSE;
2307     pself->block = 0;
2308     self->next_block_to_read = 0;
2309
2310     /* read it in */
2311     key = special_file_to_key(self, "filestart", pself->file);
2312     result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS,
2313         &buf, NULL, NULL);
2314     g_free(key);
2315
2316     if (!result) {
2317         guint response_code;
2318         s3_error_code_t s3_error_code;
2319         s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL);
2320
2321         /* if it's an expected error (not found), check what to do. */
2322         if (response_code == 404 &&
2323             (s3_error_code == S3_ERROR_None ||
2324              s3_error_code == S3_ERROR_NoSuchKey ||
2325              s3_error_code == S3_ERROR_NoSuchEntity)) {
2326             int next_file;
2327             next_file = find_next_file(self, pself->file);
2328             if (next_file > 0) {
2329                 /* Note short-circut of dispatcher. */
2330                 return s3_device_seek_file(pself, next_file);
2331             } else if (next_file == 0) {
2332                 /* No next file. Check if we are one past the end. */
2333                 key = special_file_to_key(self, "filestart", pself->file - 1);
2334                 result = s3_read(self->s3t[0].s3, self->bucket, key,
2335                     S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL);
2336                 g_free(key);
2337                 if (result) {
2338                     /* pself->file, etc. are already correct */
2339                     return make_tapeend_header();
2340                 } else {
2341                     device_set_error(pself,
2342                         stralloc(_("Attempt to read past tape-end file")),
2343                         DEVICE_STATUS_SUCCESS);
2344                     return NULL;
2345                 }
2346             }
2347         } else {
2348             /* An unexpected error occured finding out if we are the last file. */
2349             device_set_error(pself,
2350                 stralloc(errmsg),
2351                 DEVICE_STATUS_DEVICE_ERROR);
2352             return NULL;
2353         }
2354     }
2355
2356     /* and make a dumpfile_t out of it */
2357     g_assert(buf.buffer != NULL);
2358     amanda_header = g_new(dumpfile_t, 1);
2359     fh_init(amanda_header);
2360     parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
2361     g_free(buf.buffer);
2362
2363     switch (amanda_header->type) {
2364         case F_DUMPFILE:
2365         case F_CONT_DUMPFILE:
2366         case F_SPLIT_DUMPFILE:
2367             break;
2368
2369         default:
2370             device_set_error(pself,
2371                 stralloc(_("Invalid amanda header while reading file header")),
2372                 DEVICE_STATUS_VOLUME_ERROR);
2373             g_free(amanda_header);
2374             return NULL;
2375     }
2376
2377     pself->in_file = TRUE;
2378     for (thread = 0; thread < self->nb_threads; thread++)  {
2379         self->s3t[thread].idle = 1;
2380         self->s3t[thread].eof = FALSE;
2381     }
2382     return amanda_header;
2383 }
2384
2385 static gboolean
2386 s3_device_seek_block(Device *pself, guint64 block) {
2387     S3Device * self = S3_DEVICE(pself);
2388     if (device_in_error(pself)) return FALSE;
2389
2390     reset_thread(self);
2391     pself->block = block;
2392     self->next_block_to_read = block;
2393     return TRUE;
2394 }
2395
2396 static int
2397 s3_device_read_block (Device * pself, gpointer data, int *size_req) {
2398     S3Device * self = S3_DEVICE(pself);
2399     char *key;
2400     int thread;
2401     int done = 0;
2402
2403     g_assert (self != NULL);
2404     if (device_in_error(self)) return -1;
2405
2406     g_mutex_lock(self->thread_idle_mutex);
2407     /* start a read ahead for each thread */
2408     for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2409         S3_by_thread *s3t = &self->s3t[thread];
2410         if (s3t->idle) {
2411             key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
2412             s3t->filename = key;
2413             s3t->done = 0;
2414             s3t->idle = 0;
2415             s3t->eof = FALSE;
2416             s3t->errflags = DEVICE_STATUS_SUCCESS;
2417             if (self->s3t[thread].curl_buffer.buffer &&
2418                 (int)self->s3t[thread].curl_buffer.buffer_len < *size_req) {
2419                 g_free(self->s3t[thread].curl_buffer.buffer);
2420                 self->s3t[thread].curl_buffer.buffer = NULL;
2421                 self->s3t[thread].curl_buffer.buffer_len = 0;
2422                 self->s3t[thread].buffer_len = 0;
2423             }
2424             if (!self->s3t[thread].curl_buffer.buffer) {
2425                 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
2426                 self->s3t[thread].curl_buffer.buffer_len = *size_req;
2427                 self->s3t[thread].buffer_len = *size_req;
2428             }
2429             s3t->curl_buffer.buffer_pos = 0;
2430             s3t->curl_buffer.max_buffer_size = S3_DEVICE_MAX_BLOCK_SIZE;
2431             self->next_block_to_read++;
2432             g_thread_pool_push(self->thread_pool_read, s3t, NULL);
2433         }
2434     }
2435
2436     /* get the file*/
2437     key = file_and_block_to_key(self, pself->file, pself->block);
2438     g_assert(key != NULL);
2439     while (!done) {
2440         /* find which thread read the key */
2441         for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2442             S3_by_thread *s3t;
2443             s3t = &self->s3t[thread];
2444             if (!s3t->idle &&
2445                 s3t->done &&
2446                 strcmp(key, (char *)s3t->filename) == 0) {
2447                 if (s3t->eof) {
2448                     /* return eof */
2449                     g_free(key);
2450                     pself->is_eof = TRUE;
2451                     pself->in_file = FALSE;
2452                     device_set_error(pself, stralloc(_("EOF")),
2453                                      DEVICE_STATUS_SUCCESS);
2454                     g_mutex_unlock(self->thread_idle_mutex);
2455                     return -1;
2456                 } else if (s3t->errflags != DEVICE_STATUS_SUCCESS) {
2457                     /* return the error */
2458                     device_set_error(pself, (char *)s3t->errmsg, s3t->errflags);
2459                     g_free(key);
2460                     g_mutex_unlock(self->thread_idle_mutex);
2461                     return -1;
2462
2463                 } else if ((guint)*size_req >= s3t->curl_buffer.buffer_pos) {
2464                     /* return the buffer */
2465                     g_mutex_unlock(self->thread_idle_mutex);
2466                     memcpy(data, s3t->curl_buffer.buffer,
2467                                  s3t->curl_buffer.buffer_pos);
2468                     *size_req = s3t->curl_buffer.buffer_pos;
2469                     g_free(key);
2470                     s3t->idle = 1;
2471                     g_free((char *)s3t->filename);
2472                     pself->block++;
2473                     done = 1;
2474                     g_mutex_lock(self->thread_idle_mutex);
2475                     break;
2476                 } else { /* buffer not enough large */
2477                     *size_req = s3t->curl_buffer.buffer_len;
2478                     g_free(key);
2479                     g_mutex_unlock(self->thread_idle_mutex);
2480                     return 0;
2481                 }
2482             }
2483         }
2484         if (!done) {
2485             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2486         }
2487     }
2488
2489     /* start a read ahead for the thread */
2490     for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2491         S3_by_thread *s3t = &self->s3t[thread];
2492         if (s3t->idle) {
2493             key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
2494             s3t->filename = key;
2495             s3t->done = 0;
2496             s3t->idle = 0;
2497             s3t->eof = FALSE;
2498             s3t->errflags = DEVICE_STATUS_SUCCESS;
2499             if (!self->s3t[thread].curl_buffer.buffer) {
2500                 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
2501                 self->s3t[thread].curl_buffer.buffer_len = *size_req;
2502             }
2503             s3t->curl_buffer.buffer_pos = 0;
2504             self->next_block_to_read++;
2505             g_thread_pool_push(self->thread_pool_read, s3t, NULL);
2506         }
2507     }
2508     g_mutex_unlock(self->thread_idle_mutex);
2509
2510     return *size_req;
2511
2512 }
2513
2514 static void
2515 s3_thread_read_block(
2516     gpointer thread_data,
2517     gpointer data)
2518 {
2519     S3_by_thread *s3t = (S3_by_thread *)thread_data;
2520     Device *pself = (Device *)data;
2521     S3Device *self = S3_DEVICE(pself);
2522     gboolean result;
2523
2524     result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename, s3_buffer_write_func,
2525         s3_buffer_reset_func, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
2526
2527     g_mutex_lock(self->thread_idle_mutex);
2528     if (!result) {
2529         guint response_code;
2530         s3_error_code_t s3_error_code;
2531         s3_error(s3t->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2532         /* if it's an expected error (not found), just return -1 */
2533         if (response_code == 404 &&
2534             (s3_error_code == S3_ERROR_None ||
2535              s3_error_code == S3_ERROR_Unknown ||
2536              s3_error_code == S3_ERROR_NoSuchKey ||
2537              s3_error_code == S3_ERROR_NoSuchEntity)) {
2538             s3t->eof = TRUE;
2539         } else {
2540
2541             /* otherwise, log it and return FALSE */
2542             s3t->errflags = DEVICE_STATUS_VOLUME_ERROR;
2543             s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"),
2544                                           s3_strerror(s3t->s3));
2545         }
2546     }
2547     s3t->done = 1;
2548     g_cond_broadcast(self->thread_idle_cond);
2549     g_mutex_unlock(self->thread_idle_mutex);
2550
2551     return;
2552 }
2553
2554 static gboolean
2555 check_at_peom(S3Device *self, guint64 size)
2556 {
2557     if(self->enforce_volume_limit && (self->volume_limit > 0)) {
2558         guint64 newtotal = self->volume_bytes + size;
2559         if(newtotal > self->volume_limit) {
2560             return TRUE;
2561         }
2562     }
2563     return FALSE;
2564 }
2565
2566 static gboolean
2567 check_at_leom(S3Device *self, guint64 size)
2568 {
2569     guint64 block_size = DEVICE(self)->block_size;
2570     guint64 eom_warning_buffer = block_size *
2571                 (EOM_EARLY_WARNING_ZONE_BLOCKS + self->nb_threads);
2572
2573     if(!self->leom)
2574         return FALSE;
2575
2576     if(self->enforce_volume_limit && (self->volume_limit > 0)) {
2577         guint64 newtotal = self->volume_bytes + size + eom_warning_buffer;
2578         if(newtotal > self->volume_limit) {
2579            return TRUE;
2580         }
2581     }
2582     return FALSE;
2583 }
2584
2585 static void
2586 reset_thread(
2587     S3Device *self)
2588 {
2589     int thread;
2590     int nb_done = 0;
2591
2592     g_mutex_lock(self->thread_idle_mutex);
2593     while(nb_done != self->nb_threads) {
2594         nb_done = 0;
2595         for (thread = 0; thread < self->nb_threads; thread++)  {
2596             if (self->s3t[thread].done == 1)
2597                 nb_done++;
2598         }
2599         if (nb_done != self->nb_threads) {
2600             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2601         }
2602     }
2603     g_mutex_unlock(self->thread_idle_mutex);
2604 }