Imported Upstream version 3.3.2
[debian/amanda] / device-src / s3-device.c
1 /*
2  * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 as published
6  * by the Free Software Foundation.
7  *
8  * This program is distributed in the hope that it will be useful, but
9  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11  * for more details.
12  *
13  * You should have received a copy of the GNU General Public License along
14  * with this program; if not, write to the Free Software Foundation, Inc.,
15  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16  *
17  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19  */
20
21 /* An S3 device uses Amazon's S3 service (http://www.amazon.com/s3) to store
22  * data.  It stores data in keys named with a user-specified prefix, inside a
23  * user-specified bucket.  Data is stored in the form of numbered (large)
24  * blocks.
25  */
26
27 #include "amanda.h"
28 #include <string.h>
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <unistd.h>
32 #include <dirent.h>
33 #include <regex.h>
34 #include <time.h>
35 #include "util.h"
36 #include "conffile.h"
37 #include "device.h"
38 #include "s3.h"
39 #include <curl/curl.h>
40 #ifdef HAVE_OPENSSL_HMAC_H
41 # include <openssl/hmac.h>
42 #else
43 # ifdef HAVE_CRYPTO_HMAC_H
44 #  include <crypto/hmac.h>
45 # else
46 #  ifdef HAVE_HMAC_H
47 #   include <hmac.h>
48 #  endif
49 # endif
50 #endif
51
52 /*
53  * Type checking and casting macros
54  */
55 #define TYPE_S3_DEVICE  (s3_device_get_type())
56 #define S3_DEVICE(obj)  G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device)
57 #define S3_DEVICE_CONST(obj)    G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device const)
58 #define S3_DEVICE_CLASS(klass)  G_TYPE_CHECK_CLASS_CAST((klass), s3_device_get_type(), S3DeviceClass)
59 #define IS_S3_DEVICE(obj)       G_TYPE_CHECK_INSTANCE_TYPE((obj), s3_device_get_type ())
60
61 #define S3_DEVICE_GET_CLASS(obj)        G_TYPE_INSTANCE_GET_CLASS((obj), s3_device_get_type(), S3DeviceClass)
62 static GType    s3_device_get_type      (void);
63
64 /*
65  * Main object structure
66  */
67 typedef struct _S3MetadataFile S3MetadataFile;
68 typedef struct _S3Device S3Device;
69
70 typedef struct _S3_by_thread S3_by_thread;
71 struct _S3_by_thread {
72     S3Handle * volatile          s3;
73     CurlBuffer volatile          curl_buffer;
74     guint volatile               buffer_len;
75     int volatile                 idle;
76     int volatile                 eof;
77     int volatile                 done;
78     char volatile * volatile     filename;
79     DeviceStatusFlags volatile   errflags;      /* device_status */
80     char volatile * volatile     errmsg;        /* device error message */
81     GMutex                      *now_mutex;
82     guint64                      dlnow, ulnow;
83 };
84
85 struct _S3Device {
86     Device __parent__;
87
88     /* The "easy" curl handle we use to access Amazon S3 */
89     S3_by_thread *s3t;
90
91     /* S3 access information */
92     char *bucket;
93     char *prefix;
94
95     /* The S3 access information. */
96     char *secret_key;
97     char *access_key;
98     char *user_token;
99
100     /* The Openstack swift information. */
101     char *swift_account_id;
102     char *swift_access_key;
103
104     char *username;
105     char *password;
106     char *tenant_id;
107     char *tenant_name;
108
109     char *bucket_location;
110     char *storage_class;
111     char *host;
112     char *service_path;
113     char *server_side_encryption;
114     char *proxy;
115
116     char *ca_info;
117
118     /* a cache for unsuccessful reads (where we get the file but the caller
119      * doesn't have space for it or doesn't want it), where we expect the
120      * next call will request the same file.
121      */
122     char *cached_buf;
123     char *cached_key;
124     int cached_size;
125
126     /* Produce verbose output? */
127     gboolean verbose;
128
129     /* create the bucket? */
130     gboolean create_bucket;
131
132     /* Use SSL? */
133     gboolean use_ssl;
134     S3_api s3_api;
135
136     /* Throttling */
137     guint64 max_send_speed;
138     guint64 max_recv_speed;
139
140     gboolean leom;
141     guint64 volume_bytes;
142     guint64 volume_limit;
143     gboolean enforce_volume_limit;
144     gboolean use_subdomain;
145     gboolean use_s3_multi_delete;
146
147     int          nb_threads;
148     int          nb_threads_backup;
149     int          nb_threads_recovery;
150     GThreadPool *thread_pool_delete;
151     GThreadPool *thread_pool_write;
152     GThreadPool *thread_pool_read;
153     GCond       *thread_idle_cond;
154     GMutex      *thread_idle_mutex;
155     int          next_block_to_read;
156     GSList      *keys;
157
158     guint64      dltotal;
159     guint64      ultotal;
160
161     /* google OAUTH2 */
162     char        *client_id;
163     char        *client_secret;
164     char        *refresh_token;
165     char        *project_id;
166
167     gboolean     reuse_connection;
168 };
169
170 /*
171  * Class definition
172  */
173 typedef struct _S3DeviceClass S3DeviceClass;
174 struct _S3DeviceClass {
175     DeviceClass __parent__;
176 };
177
178
179 /*
180  * Constants and static data
181  */
182
183 #define S3_DEVICE_NAME "s3"
184
185 /* Maximum key length as specified in the S3 documentation
186  * (*excluding* null terminator) */
187 #define S3_MAX_KEY_LENGTH 1024
188
189 /* Note: for compatability, min can only be decreased and max increased */
190 #define S3_DEVICE_MIN_BLOCK_SIZE 1024
191 #define S3_DEVICE_MAX_BLOCK_SIZE (3*1024*1024*1024ULL)
192 #define S3_DEVICE_DEFAULT_BLOCK_SIZE (10*1024*1024)
193 #define EOM_EARLY_WARNING_ZONE_BLOCKS 4
194
195 /* This goes in lieu of file number for metadata. */
196 #define SPECIAL_INFIX "special-"
197
198 /* pointer to the class of our parent */
199 static DeviceClass *parent_class = NULL;
200
201 /*
202  * device-specific properties
203  */
204
205 /* Authentication information for Amazon S3. Both of these are strings. */
206 static DevicePropertyBase device_property_s3_access_key;
207 static DevicePropertyBase device_property_s3_secret_key;
208 #define PROPERTY_S3_SECRET_KEY (device_property_s3_secret_key.ID)
209 #define PROPERTY_S3_ACCESS_KEY (device_property_s3_access_key.ID)
210
211 /* Authentication information for Openstack Swift. Both of these are strings. */
212 static DevicePropertyBase device_property_swift_account_id;
213 static DevicePropertyBase device_property_swift_access_key;
214 #define PROPERTY_SWIFT_ACCOUNT_ID (device_property_swift_account_id.ID)
215 #define PROPERTY_SWIFT_ACCESS_KEY (device_property_swift_access_key.ID)
216
217 /* Authentication information for Openstack Swift. Both of these are strings. */
218 static DevicePropertyBase device_property_username;
219 static DevicePropertyBase device_property_password;
220 static DevicePropertyBase device_property_tenant_id;
221 static DevicePropertyBase device_property_tenant_name;
222 #define PROPERTY_USERNAME (device_property_username.ID)
223 #define PROPERTY_PASSWORD (device_property_password.ID)
224 #define PROPERTY_TENANT_ID (device_property_tenant_id.ID)
225 #define PROPERTY_TENANT_NAME (device_property_tenant_name.ID)
226
227 /* Host and path */
228 static DevicePropertyBase device_property_s3_host;
229 static DevicePropertyBase device_property_s3_service_path;
230 #define PROPERTY_S3_HOST (device_property_s3_host.ID)
231 #define PROPERTY_S3_SERVICE_PATH (device_property_s3_service_path.ID)
232
233 /* Same, but for S3 with DevPay. */
234 static DevicePropertyBase device_property_s3_user_token;
235 #define PROPERTY_S3_USER_TOKEN (device_property_s3_user_token.ID)
236
237 /* Location constraint for new buckets created on Amazon S3. */
238 static DevicePropertyBase device_property_s3_bucket_location;
239 #define PROPERTY_S3_BUCKET_LOCATION (device_property_s3_bucket_location.ID)
240
241 /* Storage class */
242 static DevicePropertyBase device_property_s3_storage_class;
243 #define PROPERTY_S3_STORAGE_CLASS (device_property_s3_storage_class.ID)
244
245 /* Server side encryption */
246 static DevicePropertyBase device_property_s3_server_side_encryption;
247 #define PROPERTY_S3_SERVER_SIDE_ENCRYPTION (device_property_s3_server_side_encryption.ID)
248
249 /* proxy */
250 static DevicePropertyBase device_property_proxy;
251 #define PROPERTY_PROXY (device_property_proxy.ID)
252
253 /* Path to certificate authority certificate */
254 static DevicePropertyBase device_property_ssl_ca_info;
255 #define PROPERTY_SSL_CA_INFO (device_property_ssl_ca_info.ID)
256
257 /* Which strotage api to use. */
258 static DevicePropertyBase device_property_storage_api;
259 #define PROPERTY_STORAGE_API (device_property_storage_api.ID)
260
261 /* Whether to use openstack protocol. */
262 /* DEPRECATED */
263 static DevicePropertyBase device_property_openstack_swift_api;
264 #define PROPERTY_OPENSTACK_SWIFT_API (device_property_openstack_swift_api.ID)
265
266 /* Whether to use SSL with Amazon S3. */
267 static DevicePropertyBase device_property_s3_ssl;
268 #define PROPERTY_S3_SSL (device_property_s3_ssl.ID)
269
270 /* Whether to re-use connection. */
271 static DevicePropertyBase device_property_reuse_connection;
272 #define PROPERTY_REUSE_CONNECTION (device_property_reuse_connection.ID)
273
274 /* Speed limits for sending and receiving */
275 static DevicePropertyBase device_property_max_send_speed;
276 static DevicePropertyBase device_property_max_recv_speed;
277 #define PROPERTY_MAX_SEND_SPEED (device_property_max_send_speed.ID)
278 #define PROPERTY_MAX_RECV_SPEED (device_property_max_recv_speed.ID)
279
280 /* Whether to use subdomain */
281 static DevicePropertyBase device_property_s3_subdomain;
282 #define PROPERTY_S3_SUBDOMAIN (device_property_s3_subdomain.ID)
283
284 /* Number of threads to use */
285 static DevicePropertyBase device_property_nb_threads_backup;
286 #define PROPERTY_NB_THREADS_BACKUP (device_property_nb_threads_backup.ID)
287 static DevicePropertyBase device_property_nb_threads_recovery;
288 #define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID)
289
290 /* If the s3 server have the multi-delete functionality */
291 static DevicePropertyBase device_property_s3_multi_delete;
292 #define PROPERTY_S3_MULTI_DELETE (device_property_s3_multi_delete.ID)
293
294 /* The client_id for OAUTH2 */
295 static DevicePropertyBase device_property_client_id;
296 #define PROPERTY_CLIENT_ID (device_property_client_id.ID)
297
298 /* The client_secret for OAUTH2 */
299 static DevicePropertyBase device_property_client_secret;
300 #define PROPERTY_CLIENT_SECRET (device_property_client_secret.ID)
301
302 /* The refresh token for OAUTH2 */
303 static DevicePropertyBase device_property_refresh_token;
304 #define PROPERTY_REFRESH_TOKEN (device_property_refresh_token.ID)
305
306 /* The PROJECT ID */
307 static DevicePropertyBase device_property_project_id;
308 #define PROPERTY_PROJECT_ID (device_property_project_id.ID)
309
310 /* The PROJECT ID */
311 static DevicePropertyBase device_property_create_bucket;
312 #define PROPERTY_CREATE_BUCKET (device_property_create_bucket.ID)
313
314 /*
315  * prototypes
316  */
317
318 void s3_device_register(void);
319
320 /*
321  * utility functions */
322
323 /* Given file and block numbers, return an S3 key.
324  *
325  * @param self: the S3Device object
326  * @param file: the file number
327  * @param block: the block within that file
328  * @returns: a newly allocated string containing an S3 key.
329  */
330 static char *
331 file_and_block_to_key(S3Device *self,
332                       int file,
333                       guint64 block);
334
335 /* Given the name of a special file (such as 'tapestart'), generate
336  * the S3 key to use for that file.
337  *
338  * @param self: the S3Device object
339  * @param special_name: name of the special file
340  * @param file: a file number to include; omitted if -1
341  * @returns: a newly alocated string containing an S3 key.
342  */
343 static char *
344 special_file_to_key(S3Device *self,
345                     char *special_name,
346                     int file);
347 /* Write an amanda header file to S3.
348  *
349  * @param self: the S3Device object
350  * @param label: the volume label
351  * @param timestamp: the volume timestamp
352  */
353 static gboolean
354 write_amanda_header(S3Device *self,
355                     char *label,
356                     char * timestamp);
357
358 /* "Fast forward" this device to the end by looking up the largest file number
359  * present and setting the current file number one greater.
360  *
361  * @param self: the S3Device object
362  */
363 static gboolean
364 seek_to_end(S3Device *self);
365
366 /* Find the number of the last file that contains any data (even just a header).
367  *
368  * @param self: the S3Device object
369  * @returns: the last file, or -1 in event of an error
370  */
371 static int
372 find_last_file(S3Device *self);
373
374 /* Delete all blocks in the given file, including the filestart block
375  *
376  * @param self: the S3Device object
377  * @param file: the file to delete
378  */
379 static gboolean
380 delete_file(S3Device *self,
381             int file);
382
383
384 /* Delete all files in the given device
385  *
386  * @param self: the S3Device object
387  */
388 static gboolean
389 delete_all_files(S3Device *self);
390
391 /* Set up self->s3t as best as possible.
392  *
393  * The return value is TRUE iff self->s3t is useable.
394  *
395  * @param self: the S3Device object
396  * @returns: TRUE if the handle is set up
397  */
398 static gboolean
399 setup_handle(S3Device * self);
400
401 static void
402 s3_wait_thread_delete(S3Device *self);
403
404 /*
405  * class mechanics */
406
407 static void
408 s3_device_init(S3Device * o);
409
410 static void
411 s3_device_class_init(S3DeviceClass * c);
412
413 static void
414 s3_device_finalize(GObject * o);
415
416 static Device*
417 s3_device_factory(char * device_name, char * device_type, char * device_node);
418
419 /*
420  * Property{Get,Set}Fns */
421
422 static gboolean s3_device_set_access_key_fn(Device *self,
423     DevicePropertyBase *base, GValue *val,
424     PropertySurety surety, PropertySource source);
425
426 static gboolean s3_device_set_secret_key_fn(Device *self,
427     DevicePropertyBase *base, GValue *val,
428     PropertySurety surety, PropertySource source);
429
430 static gboolean s3_device_set_swift_account_id_fn(Device *self,
431     DevicePropertyBase *base, GValue *val,
432     PropertySurety surety, PropertySource source);
433
434 static gboolean s3_device_set_swift_access_key_fn(Device *self,
435     DevicePropertyBase *base, GValue *val,
436     PropertySurety surety, PropertySource source);
437
438 static gboolean s3_device_set_username(Device *self,
439     DevicePropertyBase *base, GValue *val,
440     PropertySurety surety, PropertySource source);
441
442 static gboolean s3_device_set_password(Device *self,
443     DevicePropertyBase *base, GValue *val,
444     PropertySurety surety, PropertySource source);
445
446 static gboolean s3_device_set_tenant_id(Device *self,
447     DevicePropertyBase *base, GValue *val,
448     PropertySurety surety, PropertySource source);
449
450 static gboolean s3_device_set_tenant_name(Device *self,
451     DevicePropertyBase *base, GValue *val,
452     PropertySurety surety, PropertySource source);
453
454 static gboolean s3_device_set_user_token_fn(Device *self,
455     DevicePropertyBase *base, GValue *val,
456     PropertySurety surety, PropertySource source);
457
458 static gboolean s3_device_set_bucket_location_fn(Device *self,
459     DevicePropertyBase *base, GValue *val,
460     PropertySurety surety, PropertySource source);
461
462 static gboolean s3_device_set_storage_class_fn(Device *self,
463     DevicePropertyBase *base, GValue *val,
464     PropertySurety surety, PropertySource source);
465
466 static gboolean s3_device_set_server_side_encryption_fn(Device *self,
467     DevicePropertyBase *base, GValue *val,
468     PropertySurety surety, PropertySource source);
469
470 static gboolean s3_device_set_proxy_fn(Device *self,
471     DevicePropertyBase *base, GValue *val,
472     PropertySurety surety, PropertySource source);
473
474 static gboolean s3_device_set_ca_info_fn(Device *self,
475     DevicePropertyBase *base, GValue *val,
476     PropertySurety surety, PropertySource source);
477
478 static gboolean s3_device_set_verbose_fn(Device *self,
479     DevicePropertyBase *base, GValue *val,
480     PropertySurety surety, PropertySource source);
481
482 static gboolean s3_device_set_create_bucket_fn(Device *self,
483     DevicePropertyBase *base, GValue *val,
484     PropertySurety surety, PropertySource source);
485
486 static gboolean s3_device_set_storage_api(Device *self,
487     DevicePropertyBase *base, GValue *val,
488     PropertySurety surety, PropertySource source);
489
490 static gboolean s3_device_set_openstack_swift_api_fn(Device *self,
491     DevicePropertyBase *base, GValue *val,
492     PropertySurety surety, PropertySource source);
493
494 static gboolean s3_device_set_s3_multi_delete_fn(Device *self,
495     DevicePropertyBase *base, GValue *val,
496     PropertySurety surety, PropertySource source);
497
498 static gboolean s3_device_set_ssl_fn(Device *self,
499     DevicePropertyBase *base, GValue *val,
500     PropertySurety surety, PropertySource source);
501
502 static gboolean s3_device_set_reuse_connection_fn(Device *self,
503     DevicePropertyBase *base, GValue *val,
504     PropertySurety surety, PropertySource source);
505
506 static gboolean s3_device_set_max_send_speed_fn(Device *self,
507     DevicePropertyBase *base, GValue *val,
508     PropertySurety surety, PropertySource source);
509
510 static gboolean s3_device_set_max_recv_speed_fn(Device *self,
511     DevicePropertyBase *base, GValue *val,
512     PropertySurety surety, PropertySource source);
513
514 static gboolean s3_device_set_nb_threads_backup(Device *self,
515     DevicePropertyBase *base, GValue *val,
516     PropertySurety surety, PropertySource source);
517
518 static gboolean s3_device_set_nb_threads_recovery(Device *self,
519     DevicePropertyBase *base, GValue *val,
520     PropertySurety surety, PropertySource source);
521
522 static gboolean s3_device_set_max_volume_usage_fn(Device *p_self,
523     DevicePropertyBase *base, GValue *val,
524     PropertySurety surety, PropertySource source);
525
526 static gboolean property_set_leom_fn(Device *p_self,
527     DevicePropertyBase *base, GValue *val,
528     PropertySurety surety, PropertySource source);
529
530 static gboolean s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
531     DevicePropertyBase *base, GValue *val,
532     PropertySurety surety, PropertySource source);
533
534 static gboolean s3_device_set_use_subdomain_fn(Device *p_self,
535     DevicePropertyBase *base, GValue *val,
536     PropertySurety surety, PropertySource source);
537
538 static gboolean s3_device_set_host_fn(Device *p_self,
539     DevicePropertyBase *base, GValue *val,
540     PropertySurety surety, PropertySource source);
541
542 static gboolean s3_device_set_service_path_fn(Device *p_self,
543     DevicePropertyBase *base, GValue *val,
544     PropertySurety surety, PropertySource source);
545
546 static gboolean s3_device_set_client_id_fn(Device *p_self,
547     DevicePropertyBase *base, GValue *val,
548     PropertySurety surety, PropertySource source);
549
550 static gboolean s3_device_set_client_secret_fn(Device *p_self,
551     DevicePropertyBase *base, GValue *val,
552     PropertySurety surety, PropertySource source);
553
554 static gboolean s3_device_set_refresh_token_fn(Device *p_self,
555     DevicePropertyBase *base, GValue *val,
556     PropertySurety surety, PropertySource source);
557
558 static gboolean s3_device_set_project_id_fn(Device *p_self,
559     DevicePropertyBase *base, GValue *val,
560     PropertySurety surety, PropertySource source);
561
562 static void s3_thread_read_block(gpointer thread_data,
563                                  gpointer data);
564 static void s3_thread_write_block(gpointer thread_data,
565                                   gpointer data);
566 static gboolean make_bucket(Device * pself);
567
568
569 /* Wait that all threads are done */
570 static void reset_thread(S3Device *self);
571
572 /*
573  * virtual functions */
574
575 static void
576 s3_device_open_device(Device *pself, char *device_name,
577                   char * device_type, char * device_node);
578
579 static DeviceStatusFlags s3_device_read_label(Device * self);
580
581 static gboolean
582 s3_device_start(Device * self,
583                 DeviceAccessMode mode,
584                 char * label,
585                 char * timestamp);
586
587 static gboolean
588 s3_device_finish(Device * self);
589
590 static guint64
591 s3_device_get_bytes_read(Device * self);
592
593 static guint64
594 s3_device_get_bytes_written(Device * self);
595
596 static gboolean
597 s3_device_start_file(Device * self,
598                      dumpfile_t * jobInfo);
599
600 static gboolean
601 s3_device_write_block(Device * self,
602                       guint size,
603                       gpointer data);
604
605 static gboolean
606 s3_device_finish_file(Device * self);
607
608 static dumpfile_t*
609 s3_device_seek_file(Device *pself,
610                     guint file);
611
612 static gboolean
613 s3_device_seek_block(Device *pself,
614                      guint64 block);
615
616 static int
617 s3_device_read_block(Device * pself,
618                      gpointer data,
619                      int *size_req);
620
621 static gboolean
622 s3_device_recycle_file(Device *pself,
623                        guint file);
624
625 static gboolean
626 s3_device_erase(Device *pself);
627
628 static gboolean
629 check_at_leom(S3Device *self,
630                 guint64 size);
631
632 static gboolean
633 check_at_peom(S3Device *self,
634                 guint64 size);
635
636 /*
637  * Private functions
638  */
639
640 static char *
641 file_and_block_to_key(S3Device *self,
642                       int file,
643                       guint64 block)
644 {
645     char *s3_key = g_strdup_printf("%sf%08x-b%016llx.data",
646                                    self->prefix, file, (long long unsigned int)block);
647     g_assert(strlen(s3_key) <= S3_MAX_KEY_LENGTH);
648     return s3_key;
649 }
650
651 static char *
652 special_file_to_key(S3Device *self,
653                     char *special_name,
654                     int file)
655 {
656     if (file == -1)
657         return g_strdup_printf("%s" SPECIAL_INFIX "%s", self->prefix, special_name);
658     else
659         return g_strdup_printf("%sf%08x-%s", self->prefix, file, special_name);
660 }
661
662 static gboolean
663 write_amanda_header(S3Device *self,
664                     char *label,
665                     char * timestamp)
666 {
667     CurlBuffer amanda_header = {NULL, 0, 0, 0};
668     char * key = NULL;
669     gboolean result;
670     dumpfile_t * dumpinfo = NULL;
671     Device *d_self = DEVICE(self);
672     size_t header_size;
673
674     /* build the header */
675     header_size = 0; /* no minimum size */
676     dumpinfo = make_tapestart_header(DEVICE(self), label, timestamp);
677     amanda_header.buffer = device_build_amanda_header(DEVICE(self), dumpinfo,
678         &header_size);
679     if (amanda_header.buffer == NULL) {
680         device_set_error(d_self,
681             stralloc(_("Amanda tapestart header won't fit in a single block!")),
682             DEVICE_STATUS_DEVICE_ERROR);
683         dumpfile_free(dumpinfo);
684         g_free(amanda_header.buffer);
685         return FALSE;
686     }
687
688     if(check_at_leom(self, header_size))
689         d_self->is_eom = TRUE;
690
691     if(check_at_peom(self, header_size)) {
692         d_self->is_eom = TRUE;
693         device_set_error(d_self,
694             stralloc(_("No space left on device")),
695             DEVICE_STATUS_DEVICE_ERROR);
696         g_free(amanda_header.buffer);
697         return FALSE;
698     }
699
700     /* write out the header and flush the uploads. */
701     key = special_file_to_key(self, "tapestart", -1);
702     g_assert(header_size < G_MAXUINT); /* for cast to guint */
703     amanda_header.buffer_len = (guint)header_size;
704     result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
705                        &amanda_header, NULL, NULL);
706     g_free(amanda_header.buffer);
707     g_free(key);
708
709     if (!result) {
710         device_set_error(d_self,
711             vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3t[0].s3)),
712             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
713         dumpfile_free(dumpinfo);
714     } else {
715         dumpfile_free(d_self->volume_header);
716         d_self->volume_header = dumpinfo;
717         self->volume_bytes += header_size;
718     }
719     d_self->header_block_size = header_size;
720     return result;
721 }
722
723 static gboolean
724 seek_to_end(S3Device *self) {
725     int last_file;
726
727     Device *pself = DEVICE(self);
728
729     last_file = find_last_file(self);
730     if (last_file < 0)
731         return FALSE;
732
733     pself->file = last_file;
734
735     return TRUE;
736 }
737
738 /* Convert an object name into a file number, assuming the given prefix
739  * length. Returns -1 if the object name is invalid, or 0 if the object name
740  * is a "special" key. */
741 static int key_to_file(guint prefix_len, const char * key) {
742     int file;
743     int i;
744
745     /* skip the prefix */
746     if (strlen(key) <= prefix_len)
747         return -1;
748
749     key += prefix_len;
750
751     if (strncmp(key, SPECIAL_INFIX, strlen(SPECIAL_INFIX)) == 0) {
752         return 0;
753     }
754
755     /* check that key starts with 'f' */
756     if (key[0] != 'f')
757         return -1;
758     key++;
759
760     /* check that key is of the form "%08x-" */
761     for (i = 0; i < 8; i++) {
762         if (!(key[i] >= '0' && key[i] <= '9') &&
763             !(key[i] >= 'a' && key[i] <= 'f') &&
764             !(key[i] >= 'A' && key[i] <= 'F')) break;
765     }
766     if (key[i] != '-') return -1;
767     if (i < 8) return -1;
768
769     /* convert the file number */
770     errno = 0;
771     file = strtoul(key, NULL, 16);
772     if (errno != 0) {
773         g_warning(_("unparseable file number '%s'"), key);
774         return -1;
775     }
776
777     return file;
778 }
779
780 /* Find the number of the last file that contains any data (even just a header).
781  * Returns -1 in event of an error
782  */
783 static int
784 find_last_file(S3Device *self) {
785     gboolean result;
786     GSList *keys;
787     unsigned int prefix_len = strlen(self->prefix);
788     int last_file = 0;
789     Device *d_self = DEVICE(self);
790
791     /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
792     result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-", &keys, NULL);
793     if (!result) {
794         device_set_error(d_self,
795             vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
796             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
797         return -1;
798     }
799
800     for (; keys; keys = g_slist_remove(keys, keys->data)) {
801         int file = key_to_file(prefix_len, keys->data);
802
803         /* and if it's the last, keep it */
804         if (file > last_file)
805             last_file = file;
806     }
807
808     return last_file;
809 }
810
811 /* Find the number of the file following the requested one, if any.
812  * Returns 0 if there is no such file or -1 in event of an error
813  */
814 static int
815 find_next_file(S3Device *self, int last_file) {
816     gboolean result;
817     GSList *keys;
818     unsigned int prefix_len = strlen(self->prefix);
819     int next_file = 0;
820     Device *d_self = DEVICE(self);
821
822     /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
823     result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-",
824                           &keys, NULL);
825     if (!result) {
826         device_set_error(d_self,
827             vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
828             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
829         return -1;
830     }
831
832     for (; keys; keys = g_slist_remove(keys, keys->data)) {
833         int file;
834
835         file = key_to_file(prefix_len, (char*)keys->data);
836
837         if (file < 0) {
838             /* Set this in case we don't find a next file; this is not a
839              * hard error, so if we can find a next file we'll return that
840              * instead. */
841             next_file = -1;
842         }
843
844         if (file < next_file && file > last_file) {
845             next_file = file;
846         }
847     }
848
849     return next_file;
850 }
851
852 static gboolean
853 delete_file(S3Device *self,
854             int file)
855 {
856     int thread = -1;
857
858     gboolean result;
859     GSList *keys;
860     guint64 total_size = 0;
861     Device *d_self = DEVICE(self);
862     char *my_prefix;
863
864     if (file == -1) {
865         my_prefix = g_strdup_printf("%sf", self->prefix);
866     } else {
867         my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
868     }
869
870     result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL,
871                           &keys, &total_size);
872     if (!result) {
873         device_set_error(d_self,
874                 g_strdup_printf(_("While listing S3 keys: %s"),
875                     s3_strerror(self->s3t[0].s3)),
876                 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
877         return FALSE;
878     }
879
880     g_mutex_lock(self->thread_idle_mutex);
881     if (!self->keys) {
882         self->keys = keys;
883     } else {
884         self->keys = g_slist_concat(self->keys, keys);
885     }
886
887     // start the threads
888     for (thread = 0; thread < self->nb_threads; thread++)  {
889         if (self->s3t[thread].idle == 1) {
890             /* Check if the thread is in error */
891             if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
892                 device_set_error(d_self,
893                                  (char *)self->s3t[thread].errmsg,
894                                  self->s3t[thread].errflags);
895                 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
896                 self->s3t[thread].errmsg = NULL;
897                 g_mutex_unlock(self->thread_idle_mutex);
898                 s3_wait_thread_delete(self);
899                 return FALSE;
900             }
901             self->s3t[thread].idle = 0;
902             self->s3t[thread].done = 0;
903             g_thread_pool_push(self->thread_pool_delete, &self->s3t[thread],
904                                NULL);
905         }
906     }
907     g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
908     g_mutex_unlock(self->thread_idle_mutex);
909
910     self->volume_bytes = total_size;
911
912     s3_wait_thread_delete(self);
913
914     return TRUE;
915 }
916
917 static void
918 s3_thread_delete_block(
919     gpointer thread_data,
920     gpointer data)
921 {
922     static int count = 0;
923     S3_by_thread *s3t = (S3_by_thread *)thread_data;
924     Device *pself = (Device *)data;
925     S3Device *self = S3_DEVICE(pself);
926     int result = 1;
927     char *filename;
928
929     g_mutex_lock(self->thread_idle_mutex);
930     while (result && self->keys) {
931         if (self->use_s3_multi_delete) {
932             char **filenames = g_new(char *, 1001);
933             char **f = filenames;
934             int  n = 0;
935             while (self->keys && n<1000) {
936                 *f++ = self->keys->data;
937                 self->keys = g_slist_remove(self->keys, self->keys->data);
938                 n++;
939             }
940             *f++ = NULL;
941             g_mutex_unlock(self->thread_idle_mutex);
942             result = s3_multi_delete(s3t->s3, (const char *)self->bucket,
943                                               (const char **)filenames);
944             if (result != 1) {
945                 char **f;
946
947                 if (result == 2) {
948                     g_debug("Deleting multiple keys not implemented");
949                 } else { /* result == 0 */
950                     g_debug("Deleteing multiple keys failed: %s",
951                             s3_strerror(s3t->s3));
952                 }
953
954                 self->use_s3_multi_delete = 0;
955                 /* re-add all filenames */
956                 f = filenames;
957                 g_mutex_lock(self->thread_idle_mutex);
958                 while(*f) {
959                     self->keys = g_slist_prepend(self->keys, *f++);
960                 }
961                 g_mutex_unlock(self->thread_idle_mutex);
962                 g_free(filenames);
963                 result = 1;
964                 g_mutex_lock(self->thread_idle_mutex);
965                 continue;
966             }
967             f = filenames;
968             while(*f) {
969                 g_free(*f++);
970             }
971             g_free(filenames);
972         } else {
973             filename = self->keys->data;
974             self->keys = g_slist_remove(self->keys, self->keys->data);
975             count++;
976             if (count >= 1000) {
977                 g_debug("Deleting %s ...", filename);
978                 count = 0;
979             }
980             g_mutex_unlock(self->thread_idle_mutex);
981             result = s3_delete(s3t->s3, (const char *)self->bucket,
982                                         (const char *)filename);
983             if (!result) {
984                 s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
985                 s3t->errmsg = g_strdup_printf(_("While deleting key '%s': %s"),
986                                           filename, s3_strerror(s3t->s3));
987             }
988             g_free(filename);
989         }
990         g_mutex_lock(self->thread_idle_mutex);
991     }
992     s3t->idle = 1;
993     s3t->done = 1;
994     g_cond_broadcast(self->thread_idle_cond);
995     g_mutex_unlock(self->thread_idle_mutex);
996 }
997
998 static void
999 s3_wait_thread_delete(S3Device *self)
1000 {
1001     Device *d_self = (Device *)self;
1002     int idle_thread = 0;
1003     int thread;
1004
1005     g_mutex_lock(self->thread_idle_mutex);
1006     while (idle_thread != self->nb_threads) {
1007         idle_thread = 0;
1008         for (thread = 0; thread < self->nb_threads; thread++)  {
1009             if (self->s3t[thread].idle == 1) {
1010                 idle_thread++;
1011             }
1012             /* Check if the thread is in error */
1013             if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
1014                 device_set_error(d_self, (char *)self->s3t[thread].errmsg,
1015                                              self->s3t[thread].errflags);
1016                 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
1017                 self->s3t[thread].errmsg = NULL;
1018             }
1019         }
1020         if (idle_thread != self->nb_threads) {
1021             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
1022         }
1023     }
1024     g_mutex_unlock(self->thread_idle_mutex);
1025 }
1026
1027
1028 static gboolean
1029 delete_all_files(S3Device *self)
1030 {
1031     return delete_file(self, -1);
1032 }
1033
1034 /*
1035  * Class mechanics
1036  */
1037
1038 void
1039 s3_device_register(void)
1040 {
1041     static const char * device_prefix_list[] = { S3_DEVICE_NAME, NULL };
1042     g_assert(s3_init());
1043
1044     /* set up our properties */
1045     device_property_fill_and_register(&device_property_s3_secret_key,
1046                                       G_TYPE_STRING, "s3_secret_key",
1047        "Secret access key to authenticate with Amazon S3");
1048     device_property_fill_and_register(&device_property_s3_access_key,
1049                                       G_TYPE_STRING, "s3_access_key",
1050        "Access key ID to authenticate with Amazon S3");
1051     device_property_fill_and_register(&device_property_swift_account_id,
1052                                       G_TYPE_STRING, "swift_account_id",
1053        "Account ID to authenticate with openstack swift");
1054     device_property_fill_and_register(&device_property_swift_access_key,
1055                                       G_TYPE_STRING, "swift_access_key",
1056        "Access key to authenticate with openstack swift");
1057     device_property_fill_and_register(&device_property_username,
1058                                       G_TYPE_STRING, "username",
1059        "Username to authenticate with");
1060     device_property_fill_and_register(&device_property_password,
1061                                       G_TYPE_STRING, "password",
1062        "password to authenticate with");
1063     device_property_fill_and_register(&device_property_tenant_id,
1064                                       G_TYPE_STRING, "tenant_id",
1065        "tenant_id to authenticate with");
1066     device_property_fill_and_register(&device_property_tenant_name,
1067                                       G_TYPE_STRING, "tenant_name",
1068        "tenant_name to authenticate with");
1069     device_property_fill_and_register(&device_property_s3_host,
1070                                       G_TYPE_STRING, "s3_host",
1071        "hostname:port of the server");
1072     device_property_fill_and_register(&device_property_s3_service_path,
1073                                       G_TYPE_STRING, "s3_service_path",
1074        "path to add in the url");
1075     device_property_fill_and_register(&device_property_s3_user_token,
1076                                       G_TYPE_STRING, "s3_user_token",
1077        "User token for authentication Amazon devpay requests");
1078     device_property_fill_and_register(&device_property_s3_bucket_location,
1079                                       G_TYPE_STRING, "s3_bucket_location",
1080        "Location constraint for buckets on Amazon S3");
1081     device_property_fill_and_register(&device_property_s3_storage_class,
1082                                       G_TYPE_STRING, "s3_storage_class",
1083        "Storage class as specified by Amazon (STANDARD or REDUCED_REDUNDANCY)");
1084     device_property_fill_and_register(&device_property_s3_server_side_encryption,
1085                                       G_TYPE_STRING, "s3_server_side_encryption",
1086        "Serve side encryption as specified by Amazon (AES256)");
1087     device_property_fill_and_register(&device_property_proxy,
1088                                       G_TYPE_STRING, "proxy",
1089        "The proxy");
1090     device_property_fill_and_register(&device_property_ssl_ca_info,
1091                                       G_TYPE_STRING, "ssl_ca_info",
1092        "Path to certificate authority certificate");
1093     device_property_fill_and_register(&device_property_storage_api,
1094                                       G_TYPE_STRING, "storage_api",
1095        "Which cloud API to use.");
1096     device_property_fill_and_register(&device_property_openstack_swift_api,
1097                                       G_TYPE_STRING, "openstack_swift_api",
1098        "Whether to use openstack protocol");
1099     device_property_fill_and_register(&device_property_client_id,
1100                                       G_TYPE_STRING, "client_id",
1101        "client_id for use with oauth2");
1102     device_property_fill_and_register(&device_property_client_secret,
1103                                       G_TYPE_STRING, "client_secret",
1104        "client_secret for use with oauth2");
1105     device_property_fill_and_register(&device_property_refresh_token,
1106                                       G_TYPE_STRING, "refresh_token",
1107        "refresh_token for use with oauth2");
1108     device_property_fill_and_register(&device_property_project_id,
1109                                       G_TYPE_STRING, "project_id",
1110        "project id for use with google");
1111     device_property_fill_and_register(&device_property_s3_ssl,
1112                                       G_TYPE_BOOLEAN, "s3_ssl",
1113        "Whether to use SSL with Amazon S3");
1114     device_property_fill_and_register(&device_property_reuse_connection,
1115                                       G_TYPE_BOOLEAN, "reuse_connection",
1116        "Whether to reuse connection");
1117     device_property_fill_and_register(&device_property_create_bucket,
1118                                       G_TYPE_BOOLEAN, "create_bucket",
1119        "Whether to create/delete bucket");
1120     device_property_fill_and_register(&device_property_s3_subdomain,
1121                                       G_TYPE_BOOLEAN, "s3_subdomain",
1122        "Whether to use subdomain");
1123     device_property_fill_and_register(&device_property_max_send_speed,
1124                                       G_TYPE_UINT64, "max_send_speed",
1125        "Maximum average upload speed (bytes/sec)");
1126     device_property_fill_and_register(&device_property_max_recv_speed,
1127                                       G_TYPE_UINT64, "max_recv_speed",
1128        "Maximum average download speed (bytes/sec)");
1129     device_property_fill_and_register(&device_property_nb_threads_backup,
1130                                       G_TYPE_UINT64, "nb_threads_backup",
1131        "Number of writer thread");
1132     device_property_fill_and_register(&device_property_nb_threads_recovery,
1133                                       G_TYPE_UINT64, "nb_threads_recovery",
1134        "Number of reader thread");
1135     device_property_fill_and_register(&device_property_s3_multi_delete,
1136                                       G_TYPE_BOOLEAN, "s3_multi_delete",
1137        "Whether to use multi-delete");
1138
1139     /* register the device itself */
1140     register_device(s3_device_factory, device_prefix_list);
1141 }
1142
1143 static GType
1144 s3_device_get_type(void)
1145 {
1146     static GType type = 0;
1147
1148     if G_UNLIKELY(type == 0) {
1149         static const GTypeInfo info = {
1150             sizeof (S3DeviceClass),
1151             (GBaseInitFunc) NULL,
1152             (GBaseFinalizeFunc) NULL,
1153             (GClassInitFunc) s3_device_class_init,
1154             (GClassFinalizeFunc) NULL,
1155             NULL /* class_data */,
1156             sizeof (S3Device),
1157             0 /* n_preallocs */,
1158             (GInstanceInitFunc) s3_device_init,
1159             NULL
1160         };
1161
1162         type = g_type_register_static (TYPE_DEVICE, "S3Device", &info,
1163                                        (GTypeFlags)0);
1164     }
1165
1166     return type;
1167 }
1168
1169 static void
1170 s3_device_init(S3Device * self)
1171 {
1172     Device * dself = DEVICE(self);
1173     GValue response;
1174
1175     self->s3_api = S3_API_S3;
1176     self->volume_bytes = 0;
1177     self->volume_limit = 0;
1178     self->leom = TRUE;
1179     self->enforce_volume_limit = FALSE;
1180     self->use_subdomain = FALSE;
1181     self->nb_threads = 1;
1182     self->nb_threads_backup = 1;
1183     self->nb_threads_recovery = 1;
1184     self->thread_pool_delete = NULL;
1185     self->thread_pool_write = NULL;
1186     self->thread_pool_read = NULL;
1187     self->thread_idle_cond = NULL;
1188     self->thread_idle_mutex = NULL;
1189     self->use_s3_multi_delete = 1;
1190
1191     /* Register property values
1192      * Note: Some aren't added until s3_device_open_device()
1193      */
1194     bzero(&response, sizeof(response));
1195
1196     g_value_init(&response, CONCURRENCY_PARADIGM_TYPE);
1197     g_value_set_enum(&response, CONCURRENCY_PARADIGM_SHARED_READ);
1198     device_set_simple_property(dself, PROPERTY_CONCURRENCY,
1199             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1200     g_value_unset(&response);
1201
1202     g_value_init(&response, STREAMING_REQUIREMENT_TYPE);
1203     g_value_set_enum(&response, STREAMING_REQUIREMENT_NONE);
1204     device_set_simple_property(dself, PROPERTY_STREAMING,
1205             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1206     g_value_unset(&response);
1207
1208     g_value_init(&response, G_TYPE_BOOLEAN);
1209     g_value_set_boolean(&response, TRUE);
1210     device_set_simple_property(dself, PROPERTY_APPENDABLE,
1211             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1212     g_value_unset(&response);
1213
1214     g_value_init(&response, G_TYPE_BOOLEAN);
1215     g_value_set_boolean(&response, TRUE);
1216     device_set_simple_property(dself, PROPERTY_PARTIAL_DELETION,
1217             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1218     g_value_unset(&response);
1219
1220     g_value_init(&response, G_TYPE_BOOLEAN);
1221     g_value_set_boolean(&response, TRUE);
1222     device_set_simple_property(dself, PROPERTY_FULL_DELETION,
1223             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1224     g_value_unset(&response);
1225
1226     g_value_init(&response, G_TYPE_BOOLEAN);
1227     g_value_set_boolean(&response, TRUE); /* well, there *is* no EOM on S3 .. */
1228     device_set_simple_property(dself, PROPERTY_LEOM,
1229             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1230     g_value_unset(&response);
1231
1232     g_value_init(&response, G_TYPE_BOOLEAN);
1233     g_value_set_boolean(&response, FALSE);
1234     device_set_simple_property(dself, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
1235             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1236     g_value_unset(&response);
1237
1238     g_value_init(&response, G_TYPE_BOOLEAN);
1239     g_value_set_boolean(&response, FALSE);
1240     device_set_simple_property(dself, PROPERTY_S3_SUBDOMAIN,
1241             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1242     g_value_unset(&response);
1243
1244     g_value_init(&response, G_TYPE_BOOLEAN);
1245     g_value_set_boolean(&response, FALSE);
1246     device_set_simple_property(dself, PROPERTY_COMPRESSION,
1247             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1248     g_value_unset(&response);
1249
1250     g_value_init(&response, MEDIA_ACCESS_MODE_TYPE);
1251     g_value_set_enum(&response, MEDIA_ACCESS_MODE_READ_WRITE);
1252     device_set_simple_property(dself, PROPERTY_MEDIUM_ACCESS_TYPE,
1253             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1254     g_value_unset(&response);
1255
1256 }
1257
1258 static void
1259 s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED)
1260 {
1261     GObjectClass *g_object_class = (GObjectClass*) c;
1262     DeviceClass *device_class = (DeviceClass *)c;
1263
1264     parent_class = g_type_class_ref (TYPE_DEVICE);
1265
1266     device_class->open_device = s3_device_open_device;
1267     device_class->read_label = s3_device_read_label;
1268     device_class->start = s3_device_start;
1269     device_class->finish = s3_device_finish;
1270     device_class->get_bytes_read = s3_device_get_bytes_read;
1271     device_class->get_bytes_written = s3_device_get_bytes_written;
1272
1273     device_class->start_file = s3_device_start_file;
1274     device_class->write_block = s3_device_write_block;
1275     device_class->finish_file = s3_device_finish_file;
1276
1277     device_class->seek_file = s3_device_seek_file;
1278     device_class->seek_block = s3_device_seek_block;
1279     device_class->read_block = s3_device_read_block;
1280     device_class->recycle_file = s3_device_recycle_file;
1281
1282     device_class->erase = s3_device_erase;
1283
1284     g_object_class->finalize = s3_device_finalize;
1285
1286     device_class_register_property(device_class, PROPERTY_S3_ACCESS_KEY,
1287             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1288             device_simple_property_get_fn,
1289             s3_device_set_access_key_fn);
1290
1291     device_class_register_property(device_class, PROPERTY_S3_SECRET_KEY,
1292             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1293             device_simple_property_get_fn,
1294             s3_device_set_secret_key_fn);
1295
1296     device_class_register_property(device_class, PROPERTY_SWIFT_ACCOUNT_ID,
1297             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1298             device_simple_property_get_fn,
1299             s3_device_set_swift_account_id_fn);
1300
1301     device_class_register_property(device_class, PROPERTY_SWIFT_ACCESS_KEY,
1302             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1303             device_simple_property_get_fn,
1304             s3_device_set_swift_access_key_fn);
1305
1306     device_class_register_property(device_class, PROPERTY_USERNAME,
1307             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1308             device_simple_property_get_fn,
1309             s3_device_set_username);
1310
1311     device_class_register_property(device_class, PROPERTY_PASSWORD,
1312             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1313             device_simple_property_get_fn,
1314             s3_device_set_password);
1315
1316     device_class_register_property(device_class, PROPERTY_TENANT_ID,
1317             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1318             device_simple_property_get_fn,
1319             s3_device_set_tenant_id);
1320
1321     device_class_register_property(device_class, PROPERTY_TENANT_NAME,
1322             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1323             device_simple_property_get_fn,
1324             s3_device_set_tenant_name);
1325
1326     device_class_register_property(device_class, PROPERTY_S3_HOST,
1327             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1328             device_simple_property_get_fn,
1329             s3_device_set_host_fn);
1330
1331     device_class_register_property(device_class, PROPERTY_S3_SERVICE_PATH,
1332             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1333             device_simple_property_get_fn,
1334             s3_device_set_service_path_fn);
1335
1336     device_class_register_property(device_class, PROPERTY_S3_USER_TOKEN,
1337             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1338             device_simple_property_get_fn,
1339             s3_device_set_user_token_fn);
1340
1341     device_class_register_property(device_class, PROPERTY_S3_BUCKET_LOCATION,
1342             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1343             device_simple_property_get_fn,
1344             s3_device_set_bucket_location_fn);
1345
1346     device_class_register_property(device_class, PROPERTY_S3_STORAGE_CLASS,
1347             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1348             device_simple_property_get_fn,
1349             s3_device_set_storage_class_fn);
1350
1351     device_class_register_property(device_class, PROPERTY_S3_SERVER_SIDE_ENCRYPTION,
1352             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1353             device_simple_property_get_fn,
1354             s3_device_set_server_side_encryption_fn);
1355
1356     device_class_register_property(device_class, PROPERTY_PROXY,
1357             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1358             device_simple_property_get_fn,
1359             s3_device_set_proxy_fn);
1360
1361     device_class_register_property(device_class, PROPERTY_SSL_CA_INFO,
1362             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1363             device_simple_property_get_fn,
1364             s3_device_set_ca_info_fn);
1365
1366     device_class_register_property(device_class, PROPERTY_VERBOSE,
1367             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1368             device_simple_property_get_fn,
1369             s3_device_set_verbose_fn);
1370
1371     device_class_register_property(device_class, PROPERTY_CREATE_BUCKET,
1372             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1373             device_simple_property_get_fn,
1374             s3_device_set_create_bucket_fn);
1375
1376     device_class_register_property(device_class, PROPERTY_STORAGE_API,
1377             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1378             device_simple_property_get_fn,
1379             s3_device_set_storage_api);
1380
1381     device_class_register_property(device_class, PROPERTY_OPENSTACK_SWIFT_API,
1382             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1383             device_simple_property_get_fn,
1384             s3_device_set_openstack_swift_api_fn);
1385
1386     device_class_register_property(device_class, PROPERTY_S3_MULTI_DELETE,
1387             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1388             device_simple_property_get_fn,
1389             s3_device_set_s3_multi_delete_fn);
1390
1391     device_class_register_property(device_class, PROPERTY_S3_SSL,
1392             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1393             device_simple_property_get_fn,
1394             s3_device_set_ssl_fn);
1395
1396     device_class_register_property(device_class, PROPERTY_REUSE_CONNECTION,
1397             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1398             device_simple_property_get_fn,
1399             s3_device_set_reuse_connection_fn);
1400
1401     device_class_register_property(device_class, PROPERTY_MAX_SEND_SPEED,
1402             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1403             device_simple_property_get_fn,
1404             s3_device_set_max_send_speed_fn);
1405
1406     device_class_register_property(device_class, PROPERTY_MAX_RECV_SPEED,
1407             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1408             device_simple_property_get_fn,
1409             s3_device_set_max_recv_speed_fn);
1410
1411     device_class_register_property(device_class, PROPERTY_NB_THREADS_BACKUP,
1412             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1413             device_simple_property_get_fn,
1414             s3_device_set_nb_threads_backup);
1415
1416     device_class_register_property(device_class, PROPERTY_NB_THREADS_RECOVERY,
1417             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1418             device_simple_property_get_fn,
1419             s3_device_set_nb_threads_recovery);
1420
1421     device_class_register_property(device_class, PROPERTY_COMPRESSION,
1422             PROPERTY_ACCESS_GET_MASK,
1423             device_simple_property_get_fn,
1424             NULL);
1425
1426     device_class_register_property(device_class, PROPERTY_LEOM,
1427             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1428             device_simple_property_get_fn,
1429             property_set_leom_fn);
1430
1431     device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
1432             (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1433                 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1434             device_simple_property_get_fn,
1435             s3_device_set_max_volume_usage_fn);
1436
1437     device_class_register_property(device_class, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
1438             (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1439                 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1440             device_simple_property_get_fn,
1441             s3_device_set_enforce_max_volume_usage_fn);
1442
1443     device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN,
1444             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1445             device_simple_property_get_fn,
1446             s3_device_set_use_subdomain_fn);
1447
1448     device_class_register_property(device_class, PROPERTY_CLIENT_ID,
1449             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1450             device_simple_property_get_fn,
1451             s3_device_set_client_id_fn);
1452
1453     device_class_register_property(device_class, PROPERTY_CLIENT_SECRET,
1454             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1455             device_simple_property_get_fn,
1456             s3_device_set_client_secret_fn);
1457
1458     device_class_register_property(device_class, PROPERTY_REFRESH_TOKEN,
1459             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1460             device_simple_property_get_fn,
1461             s3_device_set_refresh_token_fn);
1462
1463     device_class_register_property(device_class, PROPERTY_PROJECT_ID,
1464             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1465             device_simple_property_get_fn,
1466             s3_device_set_project_id_fn);
1467 }
1468
1469 static gboolean
1470 s3_device_set_access_key_fn(Device *p_self, DevicePropertyBase *base,
1471     GValue *val, PropertySurety surety, PropertySource source)
1472 {
1473     S3Device *self = S3_DEVICE(p_self);
1474
1475     amfree(self->access_key);
1476     self->access_key = g_value_dup_string(val);
1477     device_clear_volume_details(p_self);
1478
1479     return device_simple_property_set_fn(p_self, base, val, surety, source);
1480 }
1481
1482 static gboolean
1483 s3_device_set_secret_key_fn(Device *p_self, DevicePropertyBase *base,
1484     GValue *val, PropertySurety surety, PropertySource source)
1485 {
1486     S3Device *self = S3_DEVICE(p_self);
1487
1488     amfree(self->secret_key);
1489     self->secret_key = g_value_dup_string(val);
1490     device_clear_volume_details(p_self);
1491
1492     return device_simple_property_set_fn(p_self, base, val, surety, source);
1493 }
1494
1495 static gboolean
1496 s3_device_set_swift_account_id_fn(Device *p_self, DevicePropertyBase *base,
1497     GValue *val, PropertySurety surety, PropertySource source)
1498 {
1499     S3Device *self = S3_DEVICE(p_self);
1500
1501     amfree(self->swift_account_id);
1502     self->swift_account_id = g_value_dup_string(val);
1503     device_clear_volume_details(p_self);
1504
1505     return device_simple_property_set_fn(p_self, base, val, surety, source);
1506 }
1507
1508 static gboolean
1509 s3_device_set_swift_access_key_fn(Device *p_self, DevicePropertyBase *base,
1510     GValue *val, PropertySurety surety, PropertySource source)
1511 {
1512     S3Device *self = S3_DEVICE(p_self);
1513
1514     amfree(self->swift_access_key);
1515     self->swift_access_key = g_value_dup_string(val);
1516     device_clear_volume_details(p_self);
1517
1518     return device_simple_property_set_fn(p_self, base, val, surety, source);
1519 }
1520
1521 static gboolean
1522 s3_device_set_username(Device *p_self, DevicePropertyBase *base,
1523     GValue *val, PropertySurety surety, PropertySource source)
1524 {
1525     S3Device *self = S3_DEVICE(p_self);
1526
1527     amfree(self->username);
1528     self->username = g_value_dup_string(val);
1529     device_clear_volume_details(p_self);
1530
1531     return device_simple_property_set_fn(p_self, base, val, surety, source);
1532 }
1533
1534 static gboolean
1535 s3_device_set_password(Device *p_self, DevicePropertyBase *base,
1536     GValue *val, PropertySurety surety, PropertySource source)
1537 {
1538     S3Device *self = S3_DEVICE(p_self);
1539
1540     amfree(self->password);
1541     self->password = g_value_dup_string(val);
1542     device_clear_volume_details(p_self);
1543
1544     return device_simple_property_set_fn(p_self, base, val, surety, source);
1545 }
1546
1547 static gboolean
1548 s3_device_set_tenant_id(Device *p_self, DevicePropertyBase *base,
1549     GValue *val, PropertySurety surety, PropertySource source)
1550 {
1551     S3Device *self = S3_DEVICE(p_self);
1552
1553     amfree(self->tenant_id);
1554     self->tenant_id = g_value_dup_string(val);
1555     device_clear_volume_details(p_self);
1556
1557     return device_simple_property_set_fn(p_self, base, val, surety, source);
1558 }
1559
1560 static gboolean
1561 s3_device_set_tenant_name(Device *p_self, DevicePropertyBase *base,
1562     GValue *val, PropertySurety surety, PropertySource source)
1563 {
1564     S3Device *self = S3_DEVICE(p_self);
1565
1566     amfree(self->tenant_name);
1567     self->tenant_name = g_value_dup_string(val);
1568     device_clear_volume_details(p_self);
1569
1570     return device_simple_property_set_fn(p_self, base, val, surety, source);
1571 }
1572
1573 static gboolean
1574 s3_device_set_host_fn(Device *p_self,
1575     DevicePropertyBase *base, GValue *val,
1576     PropertySurety surety, PropertySource source)
1577 {
1578     S3Device *self = S3_DEVICE(p_self);
1579
1580     amfree(self->host);
1581     self->host = g_value_dup_string(val);
1582     device_clear_volume_details(p_self);
1583
1584     return device_simple_property_set_fn(p_self, base, val, surety, source);
1585 }
1586
1587 static gboolean
1588 s3_device_set_service_path_fn(Device *p_self,
1589     DevicePropertyBase *base, GValue *val,
1590     PropertySurety surety, PropertySource source)
1591 {
1592     S3Device *self = S3_DEVICE(p_self);
1593
1594     amfree(self->service_path);
1595     self->service_path = g_value_dup_string(val);
1596     device_clear_volume_details(p_self);
1597
1598     return device_simple_property_set_fn(p_self, base, val, surety, source);
1599 }
1600
1601 static gboolean
1602 s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base,
1603     GValue *val, PropertySurety surety, PropertySource source)
1604 {
1605     S3Device *self = S3_DEVICE(p_self);
1606
1607     amfree(self->user_token);
1608     self->user_token = g_value_dup_string(val);
1609     device_clear_volume_details(p_self);
1610
1611     return device_simple_property_set_fn(p_self, base, val, surety, source);
1612 }
1613
1614 static gboolean
1615 s3_device_set_bucket_location_fn(Device *p_self, DevicePropertyBase *base,
1616     GValue *val, PropertySurety surety, PropertySource source)
1617 {
1618     S3Device *self = S3_DEVICE(p_self);
1619     char *str_val = g_value_dup_string(val);
1620
1621     if (str_val[0] && self->use_ssl && !s3_curl_location_compat()) {
1622         device_set_error(p_self, stralloc(_(
1623                 "Location constraint given for Amazon S3 bucket, "
1624                 "but libcurl is too old support wildcard certificates.")),
1625             DEVICE_STATUS_DEVICE_ERROR);
1626         goto fail;
1627     }
1628
1629     if (str_val[0] && !s3_bucket_location_compat(self->bucket)) {
1630         device_set_error(p_self, g_strdup_printf(_(
1631                 "Location constraint given for Amazon S3 bucket, "
1632                 "but the bucket name (%s) is not usable as a subdomain."),
1633                 self->bucket),
1634             DEVICE_STATUS_DEVICE_ERROR);
1635         goto fail;
1636     }
1637
1638     amfree(self->bucket_location);
1639     self->bucket_location = str_val;
1640     device_clear_volume_details(p_self);
1641
1642     return device_simple_property_set_fn(p_self, base, val, surety, source);
1643 fail:
1644     g_free(str_val);
1645     return FALSE;
1646 }
1647
1648 static gboolean
1649 s3_device_set_storage_class_fn(Device *p_self, DevicePropertyBase *base,
1650     GValue *val, PropertySurety surety, PropertySource source)
1651 {
1652     S3Device *self = S3_DEVICE(p_self);
1653     char *str_val = g_value_dup_string(val);
1654
1655     amfree(self->storage_class);
1656     self->storage_class = str_val;
1657     device_clear_volume_details(p_self);
1658
1659     return device_simple_property_set_fn(p_self, base, val, surety, source);
1660 }
1661
1662 static gboolean
1663 s3_device_set_server_side_encryption_fn(Device *p_self, DevicePropertyBase *base,
1664     GValue *val, PropertySurety surety, PropertySource source)
1665 {
1666     S3Device *self = S3_DEVICE(p_self);
1667     char *str_val = g_value_dup_string(val);
1668
1669     amfree(self->server_side_encryption);
1670     self->server_side_encryption = str_val;
1671     device_clear_volume_details(p_self);
1672
1673     return device_simple_property_set_fn(p_self, base, val, surety, source);
1674 }
1675
1676 static gboolean
1677 s3_device_set_proxy_fn(Device *p_self, DevicePropertyBase *base,
1678     GValue *val, PropertySurety surety, PropertySource source)
1679 {
1680     S3Device *self = S3_DEVICE(p_self);
1681     char *str_val = g_value_dup_string(val);
1682
1683     amfree(self->proxy);
1684     self->proxy = str_val;
1685     device_clear_volume_details(p_self);
1686
1687     return device_simple_property_set_fn(p_self, base, val, surety, source);
1688 }
1689
1690 static gboolean
1691 s3_device_set_ca_info_fn(Device *p_self, DevicePropertyBase *base,
1692     GValue *val, PropertySurety surety, PropertySource source)
1693 {
1694     S3Device *self = S3_DEVICE(p_self);
1695
1696     amfree(self->ca_info);
1697     self->ca_info = g_value_dup_string(val);
1698     device_clear_volume_details(p_self);
1699
1700     return device_simple_property_set_fn(p_self, base, val, surety, source);
1701 }
1702
1703 static gboolean
1704 s3_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base,
1705     GValue *val, PropertySurety surety, PropertySource source)
1706 {
1707     S3Device *self = S3_DEVICE(p_self);
1708     int       thread;
1709
1710     self->verbose = g_value_get_boolean(val);
1711     /* Our S3 handle may not yet have been instantiated; if so, it will
1712      * get the proper verbose setting when it is created */
1713     if (self->s3t) {
1714         for (thread = 0; thread < self->nb_threads; thread++) {
1715             if (self->s3t[thread].s3)
1716                 s3_verbose(self->s3t[thread].s3, self->verbose);
1717         }
1718     }
1719
1720     return device_simple_property_set_fn(p_self, base, val, surety, source);
1721 }
1722
1723 static gboolean
1724 s3_device_set_create_bucket_fn(Device *p_self, DevicePropertyBase *base,
1725     GValue *val, PropertySurety surety, PropertySource source)
1726 {
1727     S3Device *self = S3_DEVICE(p_self);
1728     int       thread;
1729
1730     self->create_bucket = g_value_get_boolean(val);
1731     /* Our S3 handle may not yet have been instantiated; if so, it will
1732      * get the proper verbose setting when it is created */
1733     if (self->s3t) {
1734         for (thread = 0; thread < self->nb_threads; thread++) {
1735             if (self->s3t[thread].s3)
1736                 s3_verbose(self->s3t[thread].s3, self->verbose);
1737         }
1738     }
1739
1740     return device_simple_property_set_fn(p_self, base, val, surety, source);
1741 }
1742
1743 static gboolean
1744 s3_device_set_storage_api(Device *p_self, DevicePropertyBase *base,
1745     GValue *val, PropertySurety surety, PropertySource source)
1746 {
1747     S3Device *self = S3_DEVICE(p_self);
1748
1749     const char *storage_api = g_value_get_string(val);
1750     if (g_str_equal(storage_api, "S3")) {
1751         self->s3_api = S3_API_S3;
1752     } else if (g_str_equal(storage_api, "SWIFT-1.0")) {
1753         self->s3_api = S3_API_SWIFT_1;
1754     } else if (g_str_equal(storage_api, "SWIFT-2.0")) {
1755         self->s3_api = S3_API_SWIFT_2;
1756     } else if (g_str_equal(storage_api, "OAUTH2")) {
1757         self->s3_api = S3_API_OAUTH2;
1758     } else {
1759         g_debug("Invalid STORAGE_API, using \"S3\".");
1760         self->s3_api = S3_API_S3;
1761     }
1762
1763     return device_simple_property_set_fn(p_self, base, val, surety, source);
1764 }
1765
1766 static gboolean
1767 s3_device_set_openstack_swift_api_fn(Device *p_self, DevicePropertyBase *base,
1768     GValue *val, PropertySurety surety, PropertySource source)
1769 {
1770
1771     const gboolean openstack_swift_api = g_value_get_boolean(val);
1772     if (openstack_swift_api) {
1773         GValue storage_api_val;
1774         g_value_init(&storage_api_val, G_TYPE_STRING);
1775         g_value_set_static_string(&storage_api_val, "SWIFT-1.0");
1776         return s3_device_set_storage_api(p_self, base, &storage_api_val,
1777                                          surety, source);
1778     }
1779     return TRUE;
1780 }
1781
1782 static gboolean
1783 s3_device_set_s3_multi_delete_fn(Device *p_self,
1784     DevicePropertyBase *base, GValue *val,
1785     PropertySurety surety, PropertySource source)
1786 {
1787     S3Device *self = S3_DEVICE(p_self);
1788
1789     self->use_s3_multi_delete = g_value_get_boolean(val);
1790
1791     return device_simple_property_set_fn(p_self, base, val, surety, source);
1792 }
1793
1794 static gboolean
1795 s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base,
1796     GValue *val, PropertySurety surety, PropertySource source)
1797 {
1798     S3Device *self = S3_DEVICE(p_self);
1799     gboolean new_val;
1800     int      thread;
1801
1802     new_val = g_value_get_boolean(val);
1803     /* Our S3 handle may not yet have been instantiated; if so, it will
1804      * get the proper use_ssl setting when it is created */
1805     if (self->s3t) {
1806         for (thread = 0; thread < self->nb_threads; thread++) {
1807             if (self->s3t[thread].s3 && !s3_use_ssl(self->s3t[thread].s3, new_val)) {
1808                 device_set_error(p_self, g_strdup_printf(_(
1809                         "Error setting S3 SSL/TLS use "
1810                         "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
1811                     DEVICE_STATUS_DEVICE_ERROR);
1812                 return FALSE;
1813             }
1814         }
1815     }
1816     self->use_ssl = new_val;
1817
1818     return device_simple_property_set_fn(p_self, base, val, surety, source);
1819 }
1820
1821 static gboolean
1822 s3_device_set_reuse_connection_fn(Device *p_self, DevicePropertyBase *base,
1823     GValue *val, PropertySurety surety, PropertySource source)
1824 {
1825     S3Device *self = S3_DEVICE(p_self);
1826
1827     self->reuse_connection = g_value_get_boolean(val);
1828
1829     return device_simple_property_set_fn(p_self, base, val, surety, source);
1830 }
1831
1832 static gboolean
1833 s3_device_set_max_send_speed_fn(Device *p_self,
1834     DevicePropertyBase *base, GValue *val,
1835     PropertySurety surety, PropertySource source)
1836 {
1837     S3Device *self = S3_DEVICE(p_self);
1838     guint64 new_val;
1839     int     thread;
1840
1841     new_val = g_value_get_uint64(val);
1842     if (self->s3t) {
1843         for (thread = 0; thread < self->nb_threads; thread++) {
1844             if (self->s3t[thread].s3 && !s3_set_max_send_speed(self->s3t[thread].s3, new_val)) {
1845                 device_set_error(p_self,
1846                         g_strdup("Could not set S3 maximum send speed"),
1847                         DEVICE_STATUS_DEVICE_ERROR);
1848                 return FALSE;
1849             }
1850         }
1851     }
1852     self->max_send_speed = new_val;
1853
1854     return device_simple_property_set_fn(p_self, base, val, surety, source);
1855 }
1856
1857 static gboolean
1858 s3_device_set_max_recv_speed_fn(Device *p_self,
1859     DevicePropertyBase *base, GValue *val,
1860     PropertySurety surety, PropertySource source)
1861 {
1862     S3Device *self = S3_DEVICE(p_self);
1863     guint64 new_val;
1864     int     thread;
1865
1866     new_val = g_value_get_uint64(val);
1867     if (self->s3t) {
1868         for (thread = 0; thread < self->nb_threads; thread++) {
1869             if (self->s3t[thread].s3 &&
1870                 !s3_set_max_recv_speed(self->s3t[thread].s3, new_val)) {
1871                 device_set_error(p_self,
1872                         g_strdup("Could not set S3 maximum recv speed"),
1873                         DEVICE_STATUS_DEVICE_ERROR);
1874                 return FALSE;
1875             }
1876         }
1877     }
1878     self->max_recv_speed = new_val;
1879
1880     return device_simple_property_set_fn(p_self, base, val, surety, source);
1881 }
1882
1883 static gboolean
1884 s3_device_set_nb_threads_backup(Device *p_self,
1885     DevicePropertyBase *base, GValue *val,
1886     PropertySurety surety, PropertySource source)
1887 {
1888     S3Device *self = S3_DEVICE(p_self);
1889     guint64 new_val;
1890
1891     new_val = g_value_get_uint64(val);
1892     self->nb_threads_backup = new_val;
1893     if (self->nb_threads_backup > self->nb_threads) {
1894         self->nb_threads = self->nb_threads_backup;
1895     }
1896
1897     return device_simple_property_set_fn(p_self, base, val, surety, source);
1898 }
1899
1900 static gboolean
1901 s3_device_set_nb_threads_recovery(Device *p_self,
1902     DevicePropertyBase *base, GValue *val,
1903     PropertySurety surety, PropertySource source)
1904 {
1905     S3Device *self = S3_DEVICE(p_self);
1906     guint64 new_val;
1907
1908     new_val = g_value_get_uint64(val);
1909     self->nb_threads_recovery = new_val;
1910     if (self->nb_threads_recovery > self->nb_threads) {
1911         self->nb_threads = self->nb_threads_recovery;
1912     }
1913
1914     return device_simple_property_set_fn(p_self, base, val, surety, source);
1915 }
1916
1917 static gboolean
1918 s3_device_set_max_volume_usage_fn(Device *p_self,
1919     DevicePropertyBase *base, GValue *val,
1920     PropertySurety surety, PropertySource source)
1921 {
1922     S3Device *self = S3_DEVICE(p_self);
1923
1924     self->volume_limit = g_value_get_uint64(val);
1925
1926     return device_simple_property_set_fn(p_self, base, val, surety, source);
1927
1928 }
1929
1930 static gboolean
1931 s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
1932     DevicePropertyBase *base, GValue *val,
1933     PropertySurety surety, PropertySource source)
1934 {
1935     S3Device *self = S3_DEVICE(p_self);
1936
1937     self->enforce_volume_limit = g_value_get_boolean(val);
1938
1939     return device_simple_property_set_fn(p_self, base, val, surety, source);
1940
1941 }
1942
1943 static gboolean
1944 s3_device_set_use_subdomain_fn(Device *p_self,
1945     DevicePropertyBase *base, GValue *val,
1946     PropertySurety surety, PropertySource source)
1947 {
1948     S3Device *self = S3_DEVICE(p_self);
1949
1950     self->use_subdomain = g_value_get_boolean(val);
1951
1952     return device_simple_property_set_fn(p_self, base, val, surety, source);
1953 }
1954
1955 static gboolean
1956 property_set_leom_fn(Device *p_self,
1957     DevicePropertyBase *base, GValue *val,
1958     PropertySurety surety, PropertySource source)
1959 {
1960     S3Device *self = S3_DEVICE(p_self);
1961
1962     self->leom = g_value_get_boolean(val);
1963
1964     return device_simple_property_set_fn(p_self, base, val, surety, source);
1965 }
1966
1967 static gboolean
1968 s3_device_set_client_id_fn(Device *p_self,
1969     DevicePropertyBase *base, GValue *val,
1970     PropertySurety surety, PropertySource source)
1971 {
1972     S3Device *self = S3_DEVICE(p_self);
1973
1974     amfree(self->client_id);
1975     self->client_id = g_value_dup_string(val);
1976
1977     return device_simple_property_set_fn(p_self, base, val, surety, source);
1978 }
1979
1980 static gboolean
1981 s3_device_set_client_secret_fn(Device *p_self,
1982     DevicePropertyBase *base, GValue *val,
1983     PropertySurety surety, PropertySource source)
1984 {
1985     S3Device *self = S3_DEVICE(p_self);
1986
1987     amfree(self->client_secret);
1988     self->client_secret = g_value_dup_string(val);
1989
1990     return device_simple_property_set_fn(p_self, base, val, surety, source);
1991 }
1992
1993 static gboolean
1994 s3_device_set_refresh_token_fn(Device *p_self,
1995     DevicePropertyBase *base, GValue *val,
1996     PropertySurety surety, PropertySource source)
1997 {
1998     S3Device *self = S3_DEVICE(p_self);
1999
2000     amfree(self->refresh_token);
2001     self->refresh_token = g_value_dup_string(val);
2002
2003     return device_simple_property_set_fn(p_self, base, val, surety, source);
2004 }
2005
2006 static gboolean
2007 s3_device_set_project_id_fn(Device *p_self,
2008     DevicePropertyBase *base, GValue *val,
2009     PropertySurety surety, PropertySource source)
2010 {
2011     S3Device *self = S3_DEVICE(p_self);
2012
2013     amfree(self->project_id);
2014     self->project_id = g_value_dup_string(val);
2015
2016     return device_simple_property_set_fn(p_self, base, val, surety, source);
2017 }
2018
2019 static Device*
2020 s3_device_factory(char * device_name, char * device_type, char * device_node)
2021 {
2022     Device *rval;
2023     g_assert(0 == strcmp(device_type, S3_DEVICE_NAME));
2024     rval = DEVICE(g_object_new(TYPE_S3_DEVICE, NULL));
2025
2026     device_open_device(rval, device_name, device_type, device_node);
2027     return rval;
2028 }
2029
2030 /*
2031  * Virtual function overrides
2032  */
2033
2034 static void
2035 s3_device_open_device(Device *pself, char *device_name,
2036                         char * device_type, char * device_node)
2037 {
2038     S3Device *self = S3_DEVICE(pself);
2039     char * name_colon;
2040     GValue tmp_value;
2041
2042     pself->min_block_size = S3_DEVICE_MIN_BLOCK_SIZE;
2043     pself->max_block_size = S3_DEVICE_MAX_BLOCK_SIZE;
2044     pself->block_size = S3_DEVICE_DEFAULT_BLOCK_SIZE;
2045
2046     /* Device name may be bucket/prefix, to support multiple volumes in a
2047      * single bucket. */
2048     name_colon = strchr(device_node, '/');
2049     if (name_colon == NULL) {
2050         self->bucket = g_strdup(device_node);
2051         self->prefix = g_strdup("");
2052     } else {
2053         self->bucket = g_strndup(device_node, name_colon - device_node);
2054         self->prefix = g_strdup(name_colon + 1);
2055     }
2056
2057     if (self->bucket == NULL || self->bucket[0] == '\0') {
2058         device_set_error(pself,
2059             vstrallocf(_("Empty bucket name in device %s"), device_name),
2060             DEVICE_STATUS_DEVICE_ERROR);
2061         amfree(self->bucket);
2062         amfree(self->prefix);
2063         return;
2064     }
2065
2066     g_debug(_("S3 driver using bucket '%s', prefix '%s'"), self->bucket, self->prefix);
2067
2068     /* default values */
2069     self->verbose = FALSE;
2070     self->s3_api = S3_API_S3;
2071
2072     /* use SSL if available */
2073     self->use_ssl = s3_curl_supports_ssl();
2074     bzero(&tmp_value, sizeof(GValue));
2075     g_value_init(&tmp_value, G_TYPE_BOOLEAN);
2076     g_value_set_boolean(&tmp_value, self->use_ssl);
2077     device_set_simple_property(pself, device_property_s3_ssl.ID,
2078         &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
2079
2080     /* reuse connection */
2081     self->reuse_connection = TRUE;
2082     bzero(&tmp_value, sizeof(GValue));
2083     g_value_init(&tmp_value, G_TYPE_BOOLEAN);
2084     g_value_set_boolean(&tmp_value, self->reuse_connection);
2085     device_set_simple_property(pself, device_property_reuse_connection.ID,
2086         &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
2087
2088     /* Set default create_bucket */
2089     self->create_bucket = TRUE;
2090     bzero(&tmp_value, sizeof(GValue));
2091     g_value_init(&tmp_value, G_TYPE_BOOLEAN);
2092     g_value_set_boolean(&tmp_value, self->create_bucket);
2093     device_set_simple_property(pself, device_property_create_bucket.ID,
2094         &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
2095
2096     if (parent_class->open_device) {
2097         parent_class->open_device(pself, device_name, device_type, device_node);
2098     }
2099 }
2100
2101 static void s3_device_finalize(GObject * obj_self) {
2102     S3Device *self = S3_DEVICE (obj_self);
2103     int thread;
2104
2105     if(G_OBJECT_CLASS(parent_class)->finalize)
2106         (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
2107
2108     if (self->thread_pool_delete) {
2109         g_thread_pool_free(self->thread_pool_delete, 1, 1);
2110         self->thread_pool_delete = NULL;
2111     }
2112     if (self->thread_pool_write) {
2113         g_thread_pool_free(self->thread_pool_write, 1, 1);
2114         self->thread_pool_write = NULL;
2115     }
2116     if (self->thread_pool_read) {
2117         g_thread_pool_free(self->thread_pool_read, 1, 1);
2118         self->thread_pool_read = NULL;
2119     }
2120     if (self->thread_idle_mutex) {
2121         g_mutex_free(self->thread_idle_mutex);
2122         self->thread_idle_mutex = NULL;
2123     }
2124     if (self->thread_idle_cond) {
2125         g_cond_free(self->thread_idle_cond);
2126         self->thread_idle_cond = NULL;
2127     }
2128     if (self->s3t) {
2129         for (thread = 0; thread < self->nb_threads; thread++) {
2130             g_mutex_free(self->s3t[thread].now_mutex);
2131             if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3);
2132             g_free(self->s3t[thread].curl_buffer.buffer);
2133         }
2134         g_free(self->s3t);
2135     }
2136     if(self->bucket) g_free(self->bucket);
2137     if(self->prefix) g_free(self->prefix);
2138     if(self->access_key) g_free(self->access_key);
2139     if(self->secret_key) g_free(self->secret_key);
2140     if(self->swift_account_id) g_free(self->swift_account_id);
2141     if(self->swift_access_key) g_free(self->swift_access_key);
2142     if(self->username) g_free(self->username);
2143     if(self->password) g_free(self->password);
2144     if(self->tenant_id) g_free(self->tenant_id);
2145     if(self->tenant_name) g_free(self->tenant_name);
2146     if(self->host) g_free(self->host);
2147     if(self->service_path) g_free(self->service_path);
2148     if(self->user_token) g_free(self->user_token);
2149     if(self->bucket_location) g_free(self->bucket_location);
2150     if(self->storage_class) g_free(self->storage_class);
2151     if(self->server_side_encryption) g_free(self->server_side_encryption);
2152     if(self->proxy) g_free(self->proxy);
2153     if(self->ca_info) g_free(self->ca_info);
2154 }
2155
2156 static gboolean setup_handle(S3Device * self) {
2157     Device *d_self = DEVICE(self);
2158     int thread;
2159     guint response_code;
2160     s3_error_code_t s3_error_code;
2161     CURLcode curl_code;
2162
2163     if (self->s3t == NULL) {
2164         if (self->s3_api == S3_API_S3) {
2165             if (self->access_key == NULL || self->access_key[0] == '\0') {
2166                 device_set_error(d_self,
2167                     g_strdup(_("No Amazon access key specified")),
2168                     DEVICE_STATUS_DEVICE_ERROR);
2169                 return FALSE;
2170             }
2171
2172             if (self->secret_key == NULL || self->secret_key[0] == '\0') {
2173                 device_set_error(d_self,
2174                     g_strdup(_("No Amazon secret key specified")),
2175                     DEVICE_STATUS_DEVICE_ERROR);
2176                 return FALSE;
2177             }
2178         } else if (self->s3_api == S3_API_SWIFT_1) {
2179             if (self->swift_account_id == NULL ||
2180                 self->swift_account_id[0] == '\0') {
2181                 device_set_error(d_self,
2182                     g_strdup(_("No Swift account id specified")),
2183                     DEVICE_STATUS_DEVICE_ERROR);
2184                 return FALSE;
2185             }
2186             if (self->swift_access_key == NULL ||
2187                 self->swift_access_key[0] == '\0') {
2188                 device_set_error(d_self,
2189                     g_strdup(_("No Swift access key specified")),
2190                     DEVICE_STATUS_DEVICE_ERROR);
2191                 return FALSE;
2192             }
2193         } else if (self->s3_api == S3_API_SWIFT_2) {
2194             if (!((self->username && self->password && self->tenant_id) ||
2195                   (self->username && self->password && self->tenant_name) ||
2196                   (self->access_key && self->secret_key && self->tenant_id) ||
2197                   (self->access_key && self->secret_key && self->tenant_name))) {
2198                 device_set_error(d_self,
2199                     g_strdup(_("Missing authorization properties")),
2200                     DEVICE_STATUS_DEVICE_ERROR);
2201                 return FALSE;
2202             }
2203         } else if (self->s3_api == S3_API_OAUTH2) {
2204             if (self->client_id == NULL ||
2205                 self->client_id[0] == '\0') {
2206                 device_set_error(d_self,
2207                     g_strdup(_("Missing client_id properties")),
2208                     DEVICE_STATUS_DEVICE_ERROR);
2209                 return FALSE;
2210             }
2211             if (self->client_secret == NULL ||
2212                 self->client_secret[0] == '\0') {
2213                 device_set_error(d_self,
2214                     g_strdup(_("Missing client_secret properties")),
2215                     DEVICE_STATUS_DEVICE_ERROR);
2216                 return FALSE;
2217             }
2218             if (self->refresh_token == NULL ||
2219                 self->refresh_token[0] == '\0') {
2220                 device_set_error(d_self,
2221                     g_strdup(_("Missing refresh_token properties")),
2222                     DEVICE_STATUS_DEVICE_ERROR);
2223                 return FALSE;
2224             }
2225             if (self->project_id == NULL ||
2226                 self->project_id[0] == '\0') {
2227                 device_set_error(d_self,
2228                     g_strdup(_("Missing project_id properties")),
2229                     DEVICE_STATUS_DEVICE_ERROR);
2230                 return FALSE;
2231             }
2232         }
2233
2234         self->s3t = g_new0(S3_by_thread, self->nb_threads);
2235         if (self->s3t == NULL) {
2236             device_set_error(d_self,
2237                 g_strdup(_("Can't allocate S3Handle array")),
2238                 DEVICE_STATUS_DEVICE_ERROR);
2239             return FALSE;
2240         }
2241
2242         self->thread_idle_cond = g_cond_new();
2243         self->thread_idle_mutex = g_mutex_new();
2244
2245         for (thread = 0; thread < self->nb_threads; thread++) {
2246             self->s3t[thread].idle = 1;
2247             self->s3t[thread].done = 1;
2248             self->s3t[thread].eof = FALSE;
2249             self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
2250             self->s3t[thread].errmsg = NULL;
2251             self->s3t[thread].filename = NULL;
2252             self->s3t[thread].curl_buffer.buffer = NULL;
2253             self->s3t[thread].curl_buffer.buffer_len = 0;
2254             self->s3t[thread].now_mutex = g_mutex_new();
2255             self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key,
2256                                            self->swift_account_id,
2257                                            self->swift_access_key,
2258                                            self->host, self->service_path,
2259                                            self->use_subdomain,
2260                                            self->user_token, self->bucket_location,
2261                                            self->storage_class, self->ca_info,
2262                                            self->server_side_encryption,
2263                                            self->proxy,
2264                                            self->s3_api,
2265                                            self->username,
2266                                            self->password,
2267                                            self->tenant_id,
2268                                            self->tenant_name,
2269                                            self->client_id,
2270                                            self->client_secret,
2271                                            self->refresh_token,
2272                                            self->reuse_connection);
2273             if (self->s3t[thread].s3 == NULL) {
2274                 device_set_error(d_self,
2275                     stralloc(_("Internal error creating S3 handle")),
2276                     DEVICE_STATUS_DEVICE_ERROR);
2277                 self->nb_threads = thread+1;
2278                 return FALSE;
2279             }
2280         }
2281
2282         g_debug("Create %d threads", self->nb_threads);
2283         self->thread_pool_delete = g_thread_pool_new(s3_thread_delete_block,
2284                                                      self, self->nb_threads, 0,
2285                                                      NULL);
2286         self->thread_pool_write = g_thread_pool_new(s3_thread_write_block, self,
2287                                               self->nb_threads, 0, NULL);
2288         self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self,
2289                                               self->nb_threads, 0, NULL);
2290
2291         for (thread = 0; thread < self->nb_threads; thread++) {
2292             s3_verbose(self->s3t[thread].s3, self->verbose);
2293
2294             if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
2295                 device_set_error(d_self, g_strdup_printf(_(
2296                         "Error setting S3 SSL/TLS use "
2297                         "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
2298                         DEVICE_STATUS_DEVICE_ERROR);
2299                 return FALSE;
2300             }
2301
2302             if (self->max_send_speed &&
2303                 !s3_set_max_send_speed(self->s3t[thread].s3,
2304                                        self->max_send_speed)) {
2305                 device_set_error(d_self,
2306                         g_strdup("Could not set S3 maximum send speed"),
2307                         DEVICE_STATUS_DEVICE_ERROR);
2308                 return FALSE;
2309             }
2310
2311             if (self->max_recv_speed &&
2312                 !s3_set_max_recv_speed(self->s3t[thread].s3,
2313                                        self->max_recv_speed)) {
2314                 device_set_error(d_self,
2315                         g_strdup("Could not set S3 maximum recv speed"),
2316                         DEVICE_STATUS_DEVICE_ERROR);
2317                 return FALSE;
2318             }
2319         }
2320
2321         for (thread = 0; thread < self->nb_threads; thread++) {
2322             if (!s3_open2(self->s3t[thread].s3)) {
2323                 if (self->s3_api == S3_API_SWIFT_1 ||
2324                     self->s3_api == S3_API_SWIFT_2) {
2325                     s3_error(self->s3t[0].s3, NULL, &response_code,
2326                              &s3_error_code, NULL, &curl_code, NULL);
2327                     device_set_error(d_self,
2328                             g_strdup_printf(_("s3_open2 failed: %s"),
2329                                             s3_strerror(self->s3t[0].s3)),
2330                             DEVICE_STATUS_DEVICE_ERROR);
2331                     self->nb_threads = thread+1;
2332                     return FALSE;
2333                 } else {
2334                     device_set_error(d_self,
2335                                      g_strdup("s3_open2 failed"),
2336                                      DEVICE_STATUS_DEVICE_ERROR);
2337                     return FALSE;
2338                 }
2339             }
2340         }
2341     }
2342
2343     return TRUE;
2344 }
2345
2346 static gboolean
2347 make_bucket(
2348     Device * pself)
2349 {
2350     S3Device *self = S3_DEVICE(pself);
2351     guint response_code;
2352     s3_error_code_t s3_error_code;
2353     CURLcode curl_code;
2354
2355     if (s3_is_bucket_exists(self->s3t[0].s3, self->bucket, self->project_id)) {
2356         return TRUE;
2357     }
2358
2359     s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, &curl_code, NULL);
2360
2361     if (response_code == 0 && s3_error_code == 0 &&
2362         (curl_code == CURLE_COULDNT_CONNECT ||
2363          curl_code == CURLE_COULDNT_RESOLVE_HOST)) {
2364         device_set_error(pself,
2365             g_strdup_printf(_("While connecting to S3 bucket: %s"),
2366                             s3_strerror(self->s3t[0].s3)),
2367                 DEVICE_STATUS_DEVICE_ERROR);
2368         return FALSE;
2369     }
2370
2371     if (!self->create_bucket) {
2372         device_set_error(pself,
2373             g_strdup_printf(_("Can't list bucket: %s"),
2374                             s3_strerror(self->s3t[0].s3)),
2375                 DEVICE_STATUS_DEVICE_ERROR);
2376         return FALSE;
2377     }
2378
2379     if (!s3_make_bucket(self->s3t[0].s3, self->bucket, self->project_id)) {
2380         s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2381
2382         /* if it isn't an expected error (bucket already exists),
2383          * return FALSE */
2384         if (response_code != 409 ||
2385             (s3_error_code != S3_ERROR_BucketAlreadyExists &&
2386              s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) {
2387             device_set_error(pself,
2388                 g_strdup_printf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)),
2389                 DEVICE_STATUS_DEVICE_ERROR);
2390             return FALSE;
2391         }
2392     }
2393     return TRUE;
2394 }
2395
2396 static int progress_func(
2397     void *thread_data,
2398     double dltotal G_GNUC_UNUSED,
2399     double dlnow,
2400     double ultotal G_GNUC_UNUSED,
2401     double ulnow)
2402 {
2403     S3_by_thread *s3t = (S3_by_thread *)thread_data;
2404
2405     g_mutex_lock(s3t->now_mutex);
2406     s3t->dlnow = dlnow;
2407     s3t->ulnow = ulnow;
2408     g_mutex_unlock(s3t->now_mutex);
2409
2410     return 0;
2411 }
2412
2413 static DeviceStatusFlags
2414 s3_device_read_label(Device *pself) {
2415     S3Device *self = S3_DEVICE(pself);
2416     char *key;
2417     CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
2418     dumpfile_t *amanda_header;
2419     /* note that this may be called from s3_device_start, when
2420      * self->access_mode is not ACCESS_NULL */
2421
2422     amfree(pself->volume_label);
2423     amfree(pself->volume_time);
2424     dumpfile_free(pself->volume_header);
2425     pself->volume_header = NULL;
2426
2427     if (device_in_error(self)) return pself->status;
2428
2429     if (!setup_handle(self)) {
2430         /* setup_handle already set our error message */
2431         return pself->status;
2432     }
2433     reset_thread(self);
2434
2435     key = special_file_to_key(self, "tapestart", -1);
2436
2437     if (!make_bucket(pself)) {
2438         return pself->status;
2439     }
2440
2441     if (!s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) {
2442         guint response_code;
2443         s3_error_code_t s3_error_code;
2444         s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2445
2446         /* if it's an expected error (not found), just return FALSE */
2447         if (response_code == 404 &&
2448              (s3_error_code == S3_ERROR_None ||
2449               s3_error_code == S3_ERROR_Unknown ||
2450               s3_error_code == S3_ERROR_NoSuchKey ||
2451               s3_error_code == S3_ERROR_NoSuchEntity ||
2452               s3_error_code == S3_ERROR_NoSuchBucket)) {
2453             g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)"));
2454             device_set_error(pself,
2455                 stralloc(_("Amanda header not found -- unlabeled volume?")),
2456                   DEVICE_STATUS_DEVICE_ERROR
2457                 | DEVICE_STATUS_VOLUME_ERROR
2458                 | DEVICE_STATUS_VOLUME_UNLABELED);
2459             return pself->status;
2460         }
2461
2462         /* otherwise, log it and return */
2463         device_set_error(pself,
2464             vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3t[0].s3)),
2465             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
2466         return pself->status;
2467     }
2468
2469     /* handle an empty file gracefully */
2470     if (buf.buffer_len == 0) {
2471         device_set_error(pself, stralloc(_("Empty header file")), DEVICE_STATUS_VOLUME_ERROR);
2472         return pself->status;
2473     }
2474
2475     pself->header_block_size = buf.buffer_len;
2476     g_assert(buf.buffer != NULL);
2477     amanda_header = g_new(dumpfile_t, 1);
2478     parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
2479     pself->volume_header = amanda_header;
2480     g_free(buf.buffer);
2481
2482     if (amanda_header->type != F_TAPESTART) {
2483         device_set_error(pself, stralloc(_("Invalid amanda header")), DEVICE_STATUS_VOLUME_ERROR);
2484         return pself->status;
2485     }
2486
2487     pself->volume_label = g_strdup(amanda_header->name);
2488     pself->volume_time = g_strdup(amanda_header->datestamp);
2489     /* pself->volume_header is already set */
2490
2491     device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
2492
2493     return pself->status;
2494 }
2495
2496 static gboolean
2497 s3_device_start (Device * pself, DeviceAccessMode mode,
2498                  char * label, char * timestamp) {
2499     S3Device * self;
2500     GSList *keys;
2501     guint64 total_size = 0;
2502     gboolean result;
2503
2504     self = S3_DEVICE(pself);
2505
2506     if (device_in_error(self)) return FALSE;
2507
2508     if (!setup_handle(self)) {
2509         /* setup_handle already set our error message */
2510         return FALSE;
2511     }
2512
2513     reset_thread(self);
2514     pself->access_mode = mode;
2515     g_mutex_lock(pself->device_mutex);
2516     pself->in_file = FALSE;
2517     g_mutex_unlock(pself->device_mutex);
2518
2519     /* try creating the bucket, in case it doesn't exist */
2520     if (!make_bucket(pself)) {
2521         return FALSE;
2522     }
2523
2524     /* take care of any dirty work for this mode */
2525     switch (mode) {
2526         case ACCESS_READ:
2527             if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
2528                 /* s3_device_read_label already set our error message */
2529                 return FALSE;
2530             }
2531             break;
2532
2533         case ACCESS_WRITE:
2534             if (!delete_all_files(self)) {
2535                 return FALSE;
2536             }
2537
2538             /* write a new amanda header */
2539             if (!write_amanda_header(self, label, timestamp)) {
2540                 return FALSE;
2541             }
2542
2543             pself->volume_label = newstralloc(pself->volume_label, label);
2544             pself->volume_time = newstralloc(pself->volume_time, timestamp);
2545
2546             /* unset the VOLUME_UNLABELED flag, if it was set */
2547             device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
2548             break;
2549
2550         case ACCESS_APPEND:
2551             if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
2552                 /* s3_device_read_label already set our error message */
2553                 return FALSE;
2554             } else {
2555                 result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, NULL, &keys, &total_size);
2556                 if(!result) {
2557                     device_set_error(pself,
2558                                  vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
2559                                  DEVICE_STATUS_DEVICE_ERROR|DEVICE_STATUS_VOLUME_ERROR);
2560                     return FALSE;
2561                 } else {
2562                     self->volume_bytes = total_size;
2563                 }
2564             }
2565             return seek_to_end(self);
2566             break;
2567
2568         case ACCESS_NULL:
2569             g_assert_not_reached();
2570     }
2571
2572     return TRUE;
2573 }
2574
2575 static gboolean
2576 s3_device_finish (
2577     Device * pself)
2578 {
2579     S3Device *self = S3_DEVICE(pself);
2580
2581     reset_thread(self);
2582
2583     /* we're not in a file anymore */
2584     pself->access_mode = ACCESS_NULL;
2585
2586     if (device_in_error(pself)) return FALSE;
2587
2588     return TRUE;
2589 }
2590
2591 /* functions for writing */
2592
2593
2594 static guint64
2595 s3_device_get_bytes_read(
2596     Device * pself)
2597 {
2598     S3Device *self = S3_DEVICE(pself);
2599     int       thread;
2600     guint64   dltotal;
2601
2602     g_mutex_unlock(pself->device_mutex);
2603     /* Add per thread */
2604     g_mutex_lock(self->thread_idle_mutex);
2605     dltotal = self->dltotal;
2606     for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2607         g_mutex_lock(self->s3t[thread].now_mutex);
2608         dltotal += self->s3t[thread].dlnow;
2609         g_mutex_unlock(self->s3t[thread].now_mutex);
2610     }
2611     g_mutex_unlock(self->thread_idle_mutex);
2612     g_mutex_lock(pself->device_mutex);
2613
2614     return dltotal;
2615 }
2616
2617
2618 static guint64
2619 s3_device_get_bytes_written(
2620     Device * pself)
2621 {
2622     S3Device *self = S3_DEVICE(pself);
2623     int       thread;
2624     guint64   ultotal;
2625
2626     g_mutex_unlock(pself->device_mutex);
2627     /* Add per thread */
2628     g_mutex_lock(self->thread_idle_mutex);
2629     ultotal = self->ultotal;
2630     for (thread = 0; thread < self->nb_threads_backup; thread++) {
2631         g_mutex_lock(self->s3t[thread].now_mutex);
2632         ultotal += self->s3t[thread].ulnow;
2633         g_mutex_unlock(self->s3t[thread].now_mutex);
2634     }
2635     g_mutex_unlock(self->thread_idle_mutex);
2636     g_mutex_lock(pself->device_mutex);
2637
2638     return ultotal;
2639 }
2640
2641
2642 static gboolean
2643 s3_device_start_file (Device *pself, dumpfile_t *jobInfo) {
2644     S3Device *self = S3_DEVICE(pself);
2645     CurlBuffer amanda_header = {NULL, 0, 0, 0};
2646     gboolean result;
2647     size_t header_size;
2648     char  *key;
2649     int    thread;
2650
2651     if (device_in_error(self)) return FALSE;
2652
2653     reset_thread(self);
2654     pself->is_eom = FALSE;
2655
2656     /* Set the blocksize to zero, since there's no header to skip (it's stored
2657      * in a distinct file, rather than block zero) */
2658     jobInfo->blocksize = 0;
2659
2660     /* Build the amanda header. */
2661     header_size = 0; /* no minimum size */
2662     amanda_header.buffer = device_build_amanda_header(pself, jobInfo,
2663         &header_size);
2664     if (amanda_header.buffer == NULL) {
2665         device_set_error(pself,
2666             stralloc(_("Amanda file header won't fit in a single block!")),
2667             DEVICE_STATUS_DEVICE_ERROR);
2668         return FALSE;
2669     }
2670     amanda_header.buffer_len = header_size;
2671
2672     if(check_at_leom(self, header_size))
2673         pself->is_eom = TRUE;
2674
2675     if(check_at_peom(self, header_size)) {
2676         pself->is_eom = TRUE;
2677         device_set_error(pself,
2678             stralloc(_("No space left on device")),
2679             DEVICE_STATUS_DEVICE_ERROR);
2680         g_free(amanda_header.buffer);
2681         return FALSE;
2682     }
2683
2684     for (thread = 0; thread < self->nb_threads; thread++)  {
2685         self->s3t[thread].idle = 1;
2686         self->s3t[thread].ulnow = 0;
2687     }
2688
2689     /* set the file and block numbers correctly */
2690     pself->file = (pself->file > 0)? pself->file+1 : 1;
2691     pself->block = 0;
2692     g_mutex_lock(pself->device_mutex);
2693     pself->in_file = TRUE;
2694     pself->bytes_written = 0;
2695     g_mutex_unlock(pself->device_mutex);
2696     g_mutex_lock(self->thread_idle_mutex);
2697     self->ultotal = 0;
2698     g_mutex_unlock(self->thread_idle_mutex);
2699     /* write it out as a special block (not the 0th) */
2700     key = special_file_to_key(self, "filestart", pself->file);
2701     result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
2702                        &amanda_header, NULL, NULL);
2703     g_free(amanda_header.buffer);
2704     g_free(key);
2705     if (!result) {
2706         device_set_error(pself,
2707             vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3t[0].s3)),
2708             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
2709         return FALSE;
2710     }
2711
2712     self->volume_bytes += header_size;
2713
2714     return TRUE;
2715 }
2716
2717 static gboolean
2718 s3_device_write_block (Device * pself, guint size, gpointer data) {
2719     char *filename;
2720     S3Device * self = S3_DEVICE(pself);
2721     int idle_thread = 0;
2722     int thread = -1;
2723     int first_idle = -1;
2724
2725     g_assert (self != NULL);
2726     g_assert (data != NULL);
2727     if (device_in_error(self)) return FALSE;
2728
2729     if(check_at_leom(self, size))
2730         pself->is_eom = TRUE;
2731
2732     if(check_at_peom(self, size)) {
2733         pself->is_eom = TRUE;
2734         device_set_error(pself,
2735             stralloc(_("No space left on device")),
2736             DEVICE_STATUS_DEVICE_ERROR);
2737         return FALSE;
2738     }
2739
2740     filename = file_and_block_to_key(self, pself->file, pself->block);
2741
2742     g_mutex_lock(self->thread_idle_mutex);
2743     while (!idle_thread) {
2744         idle_thread = 0;
2745         for (thread = 0; thread < self->nb_threads_backup; thread++)  {
2746             if (self->s3t[thread].idle == 1) {
2747                 idle_thread++;
2748                 /* Check if the thread is in error */
2749                 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
2750                     device_set_error(pself, (char *)self->s3t[thread].errmsg,
2751                                      self->s3t[thread].errflags);
2752                     self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
2753                     self->s3t[thread].errmsg = NULL;
2754                     g_mutex_unlock(self->thread_idle_mutex);
2755                     return FALSE;
2756                 }
2757                 if (first_idle == -1) {
2758                     first_idle = thread;
2759                     break;
2760                 }
2761             }
2762         }
2763         if (!idle_thread) {
2764             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2765         }
2766     }
2767     thread = first_idle;
2768
2769     self->s3t[thread].idle = 0;
2770     self->s3t[thread].done = 0;
2771     if (self->s3t[thread].curl_buffer.buffer &&
2772         self->s3t[thread].curl_buffer.buffer_len < size) {
2773         g_free((char *)self->s3t[thread].curl_buffer.buffer);
2774         self->s3t[thread].curl_buffer.buffer = NULL;
2775         self->s3t[thread].curl_buffer.buffer_len = 0;
2776         self->s3t[thread].buffer_len = 0;
2777     }
2778     if (self->s3t[thread].curl_buffer.buffer == NULL) {
2779         self->s3t[thread].curl_buffer.buffer = g_malloc(size);
2780         self->s3t[thread].curl_buffer.buffer_len = size;
2781         self->s3t[thread].buffer_len = size;
2782     }
2783     memcpy((char *)self->s3t[thread].curl_buffer.buffer, data, size);
2784     self->s3t[thread].curl_buffer.buffer_pos = 0;
2785     self->s3t[thread].curl_buffer.buffer_len = size;
2786     self->s3t[thread].curl_buffer.max_buffer_size = 0;
2787     self->s3t[thread].filename = filename;
2788     g_thread_pool_push(self->thread_pool_write, &self->s3t[thread], NULL);
2789     g_mutex_unlock(self->thread_idle_mutex);
2790
2791     pself->block++;
2792     self->volume_bytes += size;
2793     return TRUE;
2794 }
2795
2796 static void
2797 s3_thread_write_block(
2798     gpointer thread_data,
2799     gpointer data)
2800 {
2801     S3_by_thread *s3t = (S3_by_thread *)thread_data;
2802     Device *pself = (Device *)data;
2803     S3Device *self = S3_DEVICE(pself);
2804     gboolean result;
2805
2806     result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename,
2807                        S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer,
2808                        progress_func, s3t);
2809     g_free((void *)s3t->filename);
2810     s3t->filename = NULL;
2811     if (!result) {
2812         s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
2813         s3t->errmsg = g_strdup_printf(_("While writing data block to S3: %s"), s3_strerror(s3t->s3));
2814     }
2815     g_mutex_lock(self->thread_idle_mutex);
2816     s3t->idle = 1;
2817     s3t->done = 1;
2818     if (result)
2819         self->ultotal += s3t->curl_buffer.buffer_len;
2820     s3t->curl_buffer.buffer_len = s3t->buffer_len;
2821     s3t->ulnow = 0;
2822     g_cond_broadcast(self->thread_idle_cond);
2823     g_mutex_unlock(self->thread_idle_mutex);
2824 }
2825
2826 static gboolean
2827 s3_device_finish_file (Device * pself) {
2828     S3Device *self = S3_DEVICE(pself);
2829
2830     /* Check all threads are done */
2831     int idle_thread = 0;
2832     int thread;
2833
2834     g_mutex_lock(self->thread_idle_mutex);
2835     while (idle_thread != self->nb_threads) {
2836         idle_thread = 0;
2837         for (thread = 0; thread < self->nb_threads; thread++)  {
2838             if (self->s3t[thread].idle == 1) {
2839                 idle_thread++;
2840             }
2841             /* check thread status */
2842             if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
2843                 device_set_error(pself, (char *)self->s3t[thread].errmsg,
2844                                  self->s3t[thread].errflags);
2845                 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
2846                 self->s3t[thread].errmsg = NULL;
2847             }
2848         }
2849         if (idle_thread != self->nb_threads) {
2850             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2851         }
2852     }
2853     self->ultotal = 0;
2854     g_mutex_unlock(self->thread_idle_mutex);
2855
2856     if (device_in_error(pself)) return FALSE;
2857
2858     /* we're not in a file anymore */
2859     g_mutex_lock(pself->device_mutex);
2860     pself->in_file = FALSE;
2861     pself->bytes_written = 0;;
2862     g_mutex_unlock(pself->device_mutex);
2863
2864     return TRUE;
2865 }
2866
2867 static gboolean
2868 s3_device_recycle_file(Device *pself, guint file) {
2869     S3Device *self = S3_DEVICE(pself);
2870     if (device_in_error(self)) return FALSE;
2871
2872     reset_thread(self);
2873     delete_file(self, file);
2874     s3_wait_thread_delete(self);
2875     return !device_in_error(self);
2876     /* delete_file already set our error message if necessary */
2877 }
2878
2879 static gboolean
2880 s3_device_erase(Device *pself) {
2881     S3Device *self = S3_DEVICE(pself);
2882     char *key = NULL;
2883     const char *errmsg = NULL;
2884     guint response_code;
2885     s3_error_code_t s3_error_code;
2886
2887     if (!setup_handle(self)) {
2888         /* error set by setup_handle */
2889         return FALSE;
2890     }
2891
2892     reset_thread(self);
2893     key = special_file_to_key(self, "tapestart", -1);
2894     if (!s3_delete(self->s3t[0].s3, self->bucket, key)) {
2895         s3_error(self->s3t[0].s3, &errmsg, NULL, NULL, NULL, NULL, NULL);
2896         device_set_error(pself,
2897             stralloc(errmsg),
2898             DEVICE_STATUS_DEVICE_ERROR);
2899         return FALSE;
2900     }
2901     g_free(key);
2902
2903     dumpfile_free(pself->volume_header);
2904     pself->volume_header = NULL;
2905
2906     if (!delete_all_files(self))
2907         return FALSE;
2908
2909     device_set_error(pself, g_strdup("Unlabeled volume"),
2910                      DEVICE_STATUS_VOLUME_UNLABELED);
2911
2912     if (self->create_bucket &&
2913         !s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
2914         s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2915
2916         /*
2917          * ignore the error if the bucket isn't empty (there may be data from elsewhere)
2918          * or the bucket not existing (already deleted perhaps?)
2919          */
2920         if (!(
2921                 (response_code == 409 && s3_error_code == S3_ERROR_BucketNotEmpty) ||
2922                 (response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket))) {
2923
2924             device_set_error(pself,
2925                 stralloc(errmsg),
2926                 DEVICE_STATUS_DEVICE_ERROR);
2927             return FALSE;
2928         }
2929     }
2930     self->volume_bytes = 0;
2931     return TRUE;
2932 }
2933
2934 /* functions for reading */
2935
2936 static dumpfile_t*
2937 s3_device_seek_file(Device *pself, guint file) {
2938     S3Device *self = S3_DEVICE(pself);
2939     gboolean result;
2940     char *key;
2941     CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
2942     dumpfile_t *amanda_header;
2943     const char *errmsg = NULL;
2944     int thread;
2945
2946     if (device_in_error(self)) return NULL;
2947
2948     reset_thread(self);
2949
2950     pself->file = file;
2951     pself->is_eof = FALSE;
2952     pself->block = 0;
2953     g_mutex_lock(pself->device_mutex);
2954     pself->in_file = FALSE;
2955     pself->bytes_read = 0;
2956     g_mutex_unlock(pself->device_mutex);
2957     self->next_block_to_read = 0;
2958     g_mutex_lock(self->thread_idle_mutex);
2959     self->dltotal = 0;
2960     g_mutex_unlock(self->thread_idle_mutex);
2961
2962     /* read it in */
2963     key = special_file_to_key(self, "filestart", pself->file);
2964     result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS,
2965         &buf, NULL, NULL);
2966     g_free(key);
2967
2968     if (!result) {
2969         guint response_code;
2970         s3_error_code_t s3_error_code;
2971         s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL);
2972
2973         /* if it's an expected error (not found), check what to do. */
2974         if (response_code == 404 &&
2975             (s3_error_code == S3_ERROR_None ||
2976              s3_error_code == S3_ERROR_NoSuchKey ||
2977              s3_error_code == S3_ERROR_NoSuchEntity)) {
2978             int next_file;
2979             next_file = find_next_file(self, pself->file);
2980             if (next_file > 0) {
2981                 /* Note short-circut of dispatcher. */
2982                 return s3_device_seek_file(pself, next_file);
2983             } else if (next_file == 0) {
2984                 /* No next file. Check if we are one past the end. */
2985                 key = special_file_to_key(self, "filestart", pself->file - 1);
2986                 result = s3_read(self->s3t[0].s3, self->bucket, key,
2987                     S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL);
2988                 g_free(key);
2989                 if (result) {
2990                     /* pself->file, etc. are already correct */
2991                     return make_tapeend_header();
2992                 } else {
2993                     device_set_error(pself,
2994                         stralloc(_("Attempt to read past tape-end file")),
2995                         DEVICE_STATUS_SUCCESS);
2996                     return NULL;
2997                 }
2998             }
2999         } else {
3000             /* An unexpected error occured finding out if we are the last file. */
3001             device_set_error(pself,
3002                 stralloc(errmsg),
3003                 DEVICE_STATUS_DEVICE_ERROR);
3004             return NULL;
3005         }
3006     }
3007
3008     /* and make a dumpfile_t out of it */
3009     g_assert(buf.buffer != NULL);
3010     amanda_header = g_new(dumpfile_t, 1);
3011     fh_init(amanda_header);
3012     parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
3013     g_free(buf.buffer);
3014
3015     switch (amanda_header->type) {
3016         case F_DUMPFILE:
3017         case F_CONT_DUMPFILE:
3018         case F_SPLIT_DUMPFILE:
3019             break;
3020
3021         default:
3022             device_set_error(pself,
3023                 stralloc(_("Invalid amanda header while reading file header")),
3024                 DEVICE_STATUS_VOLUME_ERROR);
3025             g_free(amanda_header);
3026             return NULL;
3027     }
3028
3029     for (thread = 0; thread < self->nb_threads; thread++)  {
3030         self->s3t[thread].idle = 1;
3031         self->s3t[thread].eof = FALSE;
3032         self->s3t[thread].ulnow = 0;
3033     }
3034     g_mutex_lock(pself->device_mutex);
3035     pself->in_file = TRUE;
3036     g_mutex_unlock(pself->device_mutex);
3037     return amanda_header;
3038 }
3039
3040 static gboolean
3041 s3_device_seek_block(Device *pself, guint64 block) {
3042     S3Device * self = S3_DEVICE(pself);
3043     if (device_in_error(pself)) return FALSE;
3044
3045     reset_thread(self);
3046     pself->block = block;
3047     self->next_block_to_read = block;
3048     return TRUE;
3049 }
3050
3051 static int
3052 s3_device_read_block (Device * pself, gpointer data, int *size_req) {
3053     S3Device * self = S3_DEVICE(pself);
3054     char *key;
3055     int thread;
3056     int done = 0;
3057
3058     g_assert (self != NULL);
3059     if (device_in_error(self)) return -1;
3060
3061     g_mutex_lock(self->thread_idle_mutex);
3062     /* start a read ahead for each thread */
3063     for (thread = 0; thread < self->nb_threads_recovery; thread++) {
3064         S3_by_thread *s3t = &self->s3t[thread];
3065         if (s3t->idle) {
3066             key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
3067             s3t->filename = key;
3068             s3t->done = 0;
3069             s3t->idle = 0;
3070             s3t->eof = FALSE;
3071             s3t->dlnow = 0;
3072             s3t->ulnow = 0;
3073             s3t->errflags = DEVICE_STATUS_SUCCESS;
3074             if (self->s3t[thread].curl_buffer.buffer &&
3075                 (int)self->s3t[thread].curl_buffer.buffer_len < *size_req) {
3076                 g_free(self->s3t[thread].curl_buffer.buffer);
3077                 self->s3t[thread].curl_buffer.buffer = NULL;
3078                 self->s3t[thread].curl_buffer.buffer_len = 0;
3079                 self->s3t[thread].buffer_len = 0;
3080             }
3081             if (!self->s3t[thread].curl_buffer.buffer) {
3082                 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
3083                 self->s3t[thread].curl_buffer.buffer_len = *size_req;
3084                 self->s3t[thread].buffer_len = *size_req;
3085             }
3086             s3t->curl_buffer.buffer_pos = 0;
3087             s3t->curl_buffer.max_buffer_size = S3_DEVICE_MAX_BLOCK_SIZE;
3088             self->next_block_to_read++;
3089             g_thread_pool_push(self->thread_pool_read, s3t, NULL);
3090         }
3091     }
3092
3093     /* get the file*/
3094     key = file_and_block_to_key(self, pself->file, pself->block);
3095     g_assert(key != NULL);
3096     while (!done) {
3097         /* find which thread read the key */
3098         for (thread = 0; thread < self->nb_threads_recovery; thread++) {
3099             S3_by_thread *s3t;
3100             s3t = &self->s3t[thread];
3101             if (!s3t->idle &&
3102                 s3t->done &&
3103                 strcmp(key, (char *)s3t->filename) == 0) {
3104                 if (s3t->eof) {
3105                     /* return eof */
3106                     g_free(key);
3107                     pself->is_eof = TRUE;
3108                     g_mutex_lock(pself->device_mutex);
3109                     pself->in_file = FALSE;
3110                     g_mutex_unlock(pself->device_mutex);
3111                     device_set_error(pself, stralloc(_("EOF")),
3112                                      DEVICE_STATUS_SUCCESS);
3113                     g_mutex_unlock(self->thread_idle_mutex);
3114                     return -1;
3115                 } else if (s3t->errflags != DEVICE_STATUS_SUCCESS) {
3116                     /* return the error */
3117                     device_set_error(pself, (char *)s3t->errmsg, s3t->errflags);
3118                     g_free(key);
3119                     g_mutex_unlock(self->thread_idle_mutex);
3120                     return -1;
3121
3122                 } else if ((guint)*size_req >= s3t->curl_buffer.buffer_pos) {
3123                     /* return the buffer */
3124                     g_mutex_unlock(self->thread_idle_mutex);
3125                     memcpy(data, s3t->curl_buffer.buffer,
3126                                  s3t->curl_buffer.buffer_pos);
3127                     *size_req = s3t->curl_buffer.buffer_pos;
3128                     g_free(key);
3129                     s3t->idle = 1;
3130                     g_free((char *)s3t->filename);
3131                     pself->block++;
3132                     done = 1;
3133                     g_mutex_lock(self->thread_idle_mutex);
3134                     break;
3135                 } else { /* buffer not enough large */
3136                     *size_req = s3t->curl_buffer.buffer_len;
3137                     g_free(key);
3138                     g_mutex_unlock(self->thread_idle_mutex);
3139                     return 0;
3140                 }
3141             }
3142         }
3143         if (!done) {
3144             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
3145         }
3146     }
3147
3148     /* start a read ahead for the thread */
3149     for (thread = 0; thread < self->nb_threads_recovery; thread++) {
3150         S3_by_thread *s3t = &self->s3t[thread];
3151         if (s3t->idle) {
3152             key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
3153             s3t->filename = key;
3154             s3t->done = 0;
3155             s3t->idle = 0;
3156             s3t->eof = FALSE;
3157             s3t->dlnow = 0;
3158             s3t->ulnow = 0;
3159             s3t->errflags = DEVICE_STATUS_SUCCESS;
3160             if (!self->s3t[thread].curl_buffer.buffer) {
3161                 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
3162                 self->s3t[thread].curl_buffer.buffer_len = *size_req;
3163             }
3164             s3t->curl_buffer.buffer_pos = 0;
3165             self->next_block_to_read++;
3166             g_thread_pool_push(self->thread_pool_read, s3t, NULL);
3167         }
3168     }
3169     g_mutex_unlock(self->thread_idle_mutex);
3170
3171     return *size_req;
3172
3173 }
3174
3175 static void
3176 s3_thread_read_block(
3177     gpointer thread_data,
3178     gpointer data)
3179 {
3180     S3_by_thread *s3t = (S3_by_thread *)thread_data;
3181     Device *pself = (Device *)data;
3182     S3Device *self = S3_DEVICE(pself);
3183     gboolean result;
3184
3185     result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename,
3186         S3_BUFFER_WRITE_FUNCS,
3187         (CurlBuffer *)&s3t->curl_buffer, progress_func, s3t);
3188
3189     g_mutex_lock(self->thread_idle_mutex);
3190     if (!result) {
3191         guint response_code;
3192         s3_error_code_t s3_error_code;
3193         s3_error(s3t->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
3194         /* if it's an expected error (not found), just return -1 */
3195         if (response_code == 404 &&
3196             (s3_error_code == S3_ERROR_None ||
3197              s3_error_code == S3_ERROR_Unknown ||
3198              s3_error_code == S3_ERROR_NoSuchKey ||
3199              s3_error_code == S3_ERROR_NoSuchEntity)) {
3200             s3t->eof = TRUE;
3201         } else {
3202
3203             /* otherwise, log it and return FALSE */
3204             s3t->errflags = DEVICE_STATUS_VOLUME_ERROR;
3205             s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"),
3206                                           s3_strerror(s3t->s3));
3207         }
3208     } else {
3209         self->dltotal += s3t->curl_buffer.buffer_len;
3210     }
3211     s3t->dlnow = 0;
3212     s3t->ulnow = 0;
3213     s3t->done = 1;
3214     g_cond_broadcast(self->thread_idle_cond);
3215     g_mutex_unlock(self->thread_idle_mutex);
3216
3217     return;
3218 }
3219
3220 static gboolean
3221 check_at_peom(S3Device *self, guint64 size)
3222 {
3223     if(self->enforce_volume_limit && (self->volume_limit > 0)) {
3224         guint64 newtotal = self->volume_bytes + size;
3225         if(newtotal > self->volume_limit) {
3226             return TRUE;
3227         }
3228     }
3229     return FALSE;
3230 }
3231
3232 static gboolean
3233 check_at_leom(S3Device *self, guint64 size)
3234 {
3235     guint64 block_size = DEVICE(self)->block_size;
3236     guint64 eom_warning_buffer = block_size *
3237                 (EOM_EARLY_WARNING_ZONE_BLOCKS + self->nb_threads);
3238
3239     if(!self->leom)
3240         return FALSE;
3241
3242     if(self->enforce_volume_limit && (self->volume_limit > 0)) {
3243         guint64 newtotal = self->volume_bytes + size + eom_warning_buffer;
3244         if(newtotal > self->volume_limit) {
3245            return TRUE;
3246         }
3247     }
3248     return FALSE;
3249 }
3250
3251 static void
3252 reset_thread(
3253     S3Device *self)
3254 {
3255     int thread;
3256     int nb_done = 0;
3257
3258     if (self->thread_idle_mutex) {
3259         g_mutex_lock(self->thread_idle_mutex);
3260         while(nb_done != self->nb_threads) {
3261             nb_done = 0;
3262             for (thread = 0; thread < self->nb_threads; thread++)  {
3263                 if (self->s3t[thread].done == 1)
3264                     nb_done++;
3265             }
3266             if (nb_done != self->nb_threads) {
3267                 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
3268             }
3269         }
3270         g_mutex_unlock(self->thread_idle_mutex);
3271     }
3272 }