Imported Upstream version 3.3.3
[debian/amanda] / device-src / s3-device.c
1 /*
2  * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
3  *
4  * This program is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU General Public License
6  * as published by the Free Software Foundation; either version 2
7  * of the License, or (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 /* An S3 device uses Amazon's S3 service (http://www.amazon.com/s3) to store
23  * data.  It stores data in keys named with a user-specified prefix, inside a
24  * user-specified bucket.  Data is stored in the form of numbered (large)
25  * blocks.
26  */
27
28 #include "amanda.h"
29 #include <string.h>
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 #include <unistd.h>
33 #include <dirent.h>
34 #include <regex.h>
35 #include <time.h>
36 #include "util.h"
37 #include "conffile.h"
38 #include "device.h"
39 #include "s3.h"
40 #include <curl/curl.h>
41 #ifdef HAVE_OPENSSL_HMAC_H
42 # include <openssl/hmac.h>
43 #else
44 # ifdef HAVE_CRYPTO_HMAC_H
45 #  include <crypto/hmac.h>
46 # else
47 #  ifdef HAVE_HMAC_H
48 #   include <hmac.h>
49 #  endif
50 # endif
51 #endif
52
53 /*
54  * Type checking and casting macros
55  */
56 #define TYPE_S3_DEVICE  (s3_device_get_type())
57 #define S3_DEVICE(obj)  G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device)
58 #define S3_DEVICE_CONST(obj)    G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device const)
59 #define S3_DEVICE_CLASS(klass)  G_TYPE_CHECK_CLASS_CAST((klass), s3_device_get_type(), S3DeviceClass)
60 #define IS_S3_DEVICE(obj)       G_TYPE_CHECK_INSTANCE_TYPE((obj), s3_device_get_type ())
61
62 #define S3_DEVICE_GET_CLASS(obj)        G_TYPE_INSTANCE_GET_CLASS((obj), s3_device_get_type(), S3DeviceClass)
63 static GType    s3_device_get_type      (void);
64
65 /*
66  * Main object structure
67  */
68 typedef struct _S3MetadataFile S3MetadataFile;
69 typedef struct _S3Device S3Device;
70
71 typedef struct _S3_by_thread S3_by_thread;
72 struct _S3_by_thread {
73     S3Handle * volatile          s3;
74     CurlBuffer volatile          curl_buffer;
75     guint volatile               buffer_len;
76     int volatile                 idle;
77     int volatile                 eof;
78     int volatile                 done;
79     char volatile * volatile     filename;
80     DeviceStatusFlags volatile   errflags;      /* device_status */
81     char volatile * volatile     errmsg;        /* device error message */
82     GMutex                      *now_mutex;
83     guint64                      dlnow, ulnow;
84 };
85
86 struct _S3Device {
87     Device __parent__;
88
89     /* The "easy" curl handle we use to access Amazon S3 */
90     S3_by_thread *s3t;
91
92     /* S3 access information */
93     char *bucket;
94     char *prefix;
95
96     /* The S3 access information. */
97     char *secret_key;
98     char *access_key;
99     char *user_token;
100
101     /* The Openstack swift information. */
102     char *swift_account_id;
103     char *swift_access_key;
104
105     char *username;
106     char *password;
107     char *tenant_id;
108     char *tenant_name;
109
110     char *bucket_location;
111     char *storage_class;
112     char *host;
113     char *service_path;
114     char *server_side_encryption;
115     char *proxy;
116
117     char *ca_info;
118
119     /* a cache for unsuccessful reads (where we get the file but the caller
120      * doesn't have space for it or doesn't want it), where we expect the
121      * next call will request the same file.
122      */
123     char *cached_buf;
124     char *cached_key;
125     int cached_size;
126
127     /* Produce verbose output? */
128     gboolean verbose;
129
130     /* create the bucket? */
131     gboolean create_bucket;
132
133     /* Use SSL? */
134     gboolean use_ssl;
135     S3_api s3_api;
136
137     /* Throttling */
138     guint64 max_send_speed;
139     guint64 max_recv_speed;
140
141     gboolean leom;
142     guint64 volume_bytes;
143     guint64 volume_limit;
144     gboolean enforce_volume_limit;
145     gboolean use_subdomain;
146     gboolean use_s3_multi_delete;
147
148     int          nb_threads;
149     int          nb_threads_backup;
150     int          nb_threads_recovery;
151     GThreadPool *thread_pool_delete;
152     GThreadPool *thread_pool_write;
153     GThreadPool *thread_pool_read;
154     GCond       *thread_idle_cond;
155     GMutex      *thread_idle_mutex;
156     int          next_block_to_read;
157     GSList      *keys;
158
159     guint64      dltotal;
160     guint64      ultotal;
161
162     /* google OAUTH2 */
163     char        *client_id;
164     char        *client_secret;
165     char        *refresh_token;
166     char        *project_id;
167
168     gboolean     reuse_connection;
169     
170     /* CAStor */
171     char        *reps;
172     char        *reps_bucket;
173 };
174
175 /*
176  * Class definition
177  */
178 typedef struct _S3DeviceClass S3DeviceClass;
179 struct _S3DeviceClass {
180     DeviceClass __parent__;
181 };
182
183
184 /*
185  * Constants and static data
186  */
187
188 #define S3_DEVICE_NAME "s3"
189
190 /* Maximum key length as specified in the S3 documentation
191  * (*excluding* null terminator) */
192 #define S3_MAX_KEY_LENGTH 1024
193
194 /* Note: for compatability, min can only be decreased and max increased */
195 #define S3_DEVICE_MIN_BLOCK_SIZE 1024
196 #define S3_DEVICE_MAX_BLOCK_SIZE (3*1024*1024*1024ULL)
197 #define S3_DEVICE_DEFAULT_BLOCK_SIZE (10*1024*1024)
198 #define EOM_EARLY_WARNING_ZONE_BLOCKS 4
199
200 /* This goes in lieu of file number for metadata. */
201 #define SPECIAL_INFIX "special-"
202
203 /* pointer to the class of our parent */
204 static DeviceClass *parent_class = NULL;
205
206 /*
207  * device-specific properties
208  */
209
210 /* Authentication information for Amazon S3. Both of these are strings. */
211 static DevicePropertyBase device_property_s3_access_key;
212 static DevicePropertyBase device_property_s3_secret_key;
213 #define PROPERTY_S3_SECRET_KEY (device_property_s3_secret_key.ID)
214 #define PROPERTY_S3_ACCESS_KEY (device_property_s3_access_key.ID)
215
216 /* Authentication information for Openstack Swift. Both of these are strings. */
217 static DevicePropertyBase device_property_swift_account_id;
218 static DevicePropertyBase device_property_swift_access_key;
219 #define PROPERTY_SWIFT_ACCOUNT_ID (device_property_swift_account_id.ID)
220 #define PROPERTY_SWIFT_ACCESS_KEY (device_property_swift_access_key.ID)
221
222 /* Authentication information for Openstack Swift. Both of these are strings. */
223 static DevicePropertyBase device_property_username;
224 static DevicePropertyBase device_property_password;
225 static DevicePropertyBase device_property_tenant_id;
226 static DevicePropertyBase device_property_tenant_name;
227 #define PROPERTY_USERNAME (device_property_username.ID)
228 #define PROPERTY_PASSWORD (device_property_password.ID)
229 #define PROPERTY_TENANT_ID (device_property_tenant_id.ID)
230 #define PROPERTY_TENANT_NAME (device_property_tenant_name.ID)
231
232 /* Host and path */
233 static DevicePropertyBase device_property_s3_host;
234 static DevicePropertyBase device_property_s3_service_path;
235 #define PROPERTY_S3_HOST (device_property_s3_host.ID)
236 #define PROPERTY_S3_SERVICE_PATH (device_property_s3_service_path.ID)
237
238 /* Same, but for S3 with DevPay. */
239 static DevicePropertyBase device_property_s3_user_token;
240 #define PROPERTY_S3_USER_TOKEN (device_property_s3_user_token.ID)
241
242 /* Location constraint for new buckets created on Amazon S3. */
243 static DevicePropertyBase device_property_s3_bucket_location;
244 #define PROPERTY_S3_BUCKET_LOCATION (device_property_s3_bucket_location.ID)
245
246 /* Storage class */
247 static DevicePropertyBase device_property_s3_storage_class;
248 #define PROPERTY_S3_STORAGE_CLASS (device_property_s3_storage_class.ID)
249
250 /* Server side encryption */
251 static DevicePropertyBase device_property_s3_server_side_encryption;
252 #define PROPERTY_S3_SERVER_SIDE_ENCRYPTION (device_property_s3_server_side_encryption.ID)
253
254 /* proxy */
255 static DevicePropertyBase device_property_proxy;
256 #define PROPERTY_PROXY (device_property_proxy.ID)
257
258 /* Path to certificate authority certificate */
259 static DevicePropertyBase device_property_ssl_ca_info;
260 #define PROPERTY_SSL_CA_INFO (device_property_ssl_ca_info.ID)
261
262 /* Which strotage api to use. */
263 static DevicePropertyBase device_property_storage_api;
264 #define PROPERTY_STORAGE_API (device_property_storage_api.ID)
265
266 /* Whether to use openstack protocol. */
267 /* DEPRECATED */
268 static DevicePropertyBase device_property_openstack_swift_api;
269 #define PROPERTY_OPENSTACK_SWIFT_API (device_property_openstack_swift_api.ID)
270
271 /* Whether to use SSL with Amazon S3. */
272 static DevicePropertyBase device_property_s3_ssl;
273 #define PROPERTY_S3_SSL (device_property_s3_ssl.ID)
274
275 /* Whether to re-use connection. */
276 static DevicePropertyBase device_property_reuse_connection;
277 #define PROPERTY_REUSE_CONNECTION (device_property_reuse_connection.ID)
278
279 /* Speed limits for sending and receiving */
280 static DevicePropertyBase device_property_max_send_speed;
281 static DevicePropertyBase device_property_max_recv_speed;
282 #define PROPERTY_MAX_SEND_SPEED (device_property_max_send_speed.ID)
283 #define PROPERTY_MAX_RECV_SPEED (device_property_max_recv_speed.ID)
284
285 /* Whether to use subdomain */
286 static DevicePropertyBase device_property_s3_subdomain;
287 #define PROPERTY_S3_SUBDOMAIN (device_property_s3_subdomain.ID)
288
289 /* Number of threads to use */
290 static DevicePropertyBase device_property_nb_threads_backup;
291 #define PROPERTY_NB_THREADS_BACKUP (device_property_nb_threads_backup.ID)
292 static DevicePropertyBase device_property_nb_threads_recovery;
293 #define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID)
294
295 /* If the s3 server have the multi-delete functionality */
296 static DevicePropertyBase device_property_s3_multi_delete;
297 #define PROPERTY_S3_MULTI_DELETE (device_property_s3_multi_delete.ID)
298
299 /* The client_id for OAUTH2 */
300 static DevicePropertyBase device_property_client_id;
301 #define PROPERTY_CLIENT_ID (device_property_client_id.ID)
302
303 /* The client_secret for OAUTH2 */
304 static DevicePropertyBase device_property_client_secret;
305 #define PROPERTY_CLIENT_SECRET (device_property_client_secret.ID)
306
307 /* The refresh token for OAUTH2 */
308 static DevicePropertyBase device_property_refresh_token;
309 #define PROPERTY_REFRESH_TOKEN (device_property_refresh_token.ID)
310
311 /* The PROJECT ID */
312 static DevicePropertyBase device_property_project_id;
313 #define PROPERTY_PROJECT_ID (device_property_project_id.ID)
314
315 /* The PROJECT ID */
316 static DevicePropertyBase device_property_create_bucket;
317 #define PROPERTY_CREATE_BUCKET (device_property_create_bucket.ID)
318
319 /* CAStor replication values for objects and buckets */
320 static DevicePropertyBase device_property_s3_reps;
321 #define PROPERTY_S3_REPS (device_property_s3_reps.ID)
322 #define S3_DEVICE_REPS_DEFAULT "2"
323 static DevicePropertyBase device_property_s3_reps_bucket;
324 #define PROPERTY_S3_REPS_BUCKET (device_property_s3_reps_bucket.ID)
325 #define S3_DEVICE_REPS_BUCKET_DEFAULT "4"
326
327 /*
328  * prototypes
329  */
330
331 void s3_device_register(void);
332
333 /*
334  * utility functions */
335
336 /* Given file and block numbers, return an S3 key.
337  *
338  * @param self: the S3Device object
339  * @param file: the file number
340  * @param block: the block within that file
341  * @returns: a newly allocated string containing an S3 key.
342  */
343 static char *
344 file_and_block_to_key(S3Device *self,
345                       int file,
346                       guint64 block);
347
348 /* Given the name of a special file (such as 'tapestart'), generate
349  * the S3 key to use for that file.
350  *
351  * @param self: the S3Device object
352  * @param special_name: name of the special file
353  * @param file: a file number to include; omitted if -1
354  * @returns: a newly alocated string containing an S3 key.
355  */
356 static char *
357 special_file_to_key(S3Device *self,
358                     char *special_name,
359                     int file);
360 /* Write an amanda header file to S3.
361  *
362  * @param self: the S3Device object
363  * @param label: the volume label
364  * @param timestamp: the volume timestamp
365  */
366 static gboolean
367 write_amanda_header(S3Device *self,
368                     char *label,
369                     char * timestamp);
370
371 /* "Fast forward" this device to the end by looking up the largest file number
372  * present and setting the current file number one greater.
373  *
374  * @param self: the S3Device object
375  */
376 static gboolean
377 seek_to_end(S3Device *self);
378
379 /* Find the number of the last file that contains any data (even just a header).
380  *
381  * @param self: the S3Device object
382  * @returns: the last file, or -1 in event of an error
383  */
384 static int
385 find_last_file(S3Device *self);
386
387 /* Delete all blocks in the given file, including the filestart block
388  *
389  * @param self: the S3Device object
390  * @param file: the file to delete
391  */
392 static gboolean
393 delete_file(S3Device *self,
394             int file);
395
396
397 /* Delete all files in the given device
398  *
399  * @param self: the S3Device object
400  */
401 static gboolean
402 delete_all_files(S3Device *self);
403
404 /* Set up self->s3t as best as possible.
405  *
406  * The return value is TRUE iff self->s3t is useable.
407  *
408  * @param self: the S3Device object
409  * @returns: TRUE if the handle is set up
410  */
411 static gboolean
412 setup_handle(S3Device * self);
413
414 static void
415 s3_wait_thread_delete(S3Device *self);
416
417 /*
418  * class mechanics */
419
420 static void
421 s3_device_init(S3Device * o);
422
423 static void
424 s3_device_class_init(S3DeviceClass * c);
425
426 static void
427 s3_device_finalize(GObject * o);
428
429 static Device*
430 s3_device_factory(char * device_name, char * device_type, char * device_node);
431
432 /*
433  * Property{Get,Set}Fns */
434
435 static gboolean s3_device_set_access_key_fn(Device *self,
436     DevicePropertyBase *base, GValue *val,
437     PropertySurety surety, PropertySource source);
438
439 static gboolean s3_device_set_secret_key_fn(Device *self,
440     DevicePropertyBase *base, GValue *val,
441     PropertySurety surety, PropertySource source);
442
443 static gboolean s3_device_set_swift_account_id_fn(Device *self,
444     DevicePropertyBase *base, GValue *val,
445     PropertySurety surety, PropertySource source);
446
447 static gboolean s3_device_set_swift_access_key_fn(Device *self,
448     DevicePropertyBase *base, GValue *val,
449     PropertySurety surety, PropertySource source);
450
451 static gboolean s3_device_set_username(Device *self,
452     DevicePropertyBase *base, GValue *val,
453     PropertySurety surety, PropertySource source);
454
455 static gboolean s3_device_set_password(Device *self,
456     DevicePropertyBase *base, GValue *val,
457     PropertySurety surety, PropertySource source);
458
459 static gboolean s3_device_set_tenant_id(Device *self,
460     DevicePropertyBase *base, GValue *val,
461     PropertySurety surety, PropertySource source);
462
463 static gboolean s3_device_set_tenant_name(Device *self,
464     DevicePropertyBase *base, GValue *val,
465     PropertySurety surety, PropertySource source);
466
467 static gboolean s3_device_set_user_token_fn(Device *self,
468     DevicePropertyBase *base, GValue *val,
469     PropertySurety surety, PropertySource source);
470
471 static gboolean s3_device_set_bucket_location_fn(Device *self,
472     DevicePropertyBase *base, GValue *val,
473     PropertySurety surety, PropertySource source);
474
475 static gboolean s3_device_set_storage_class_fn(Device *self,
476     DevicePropertyBase *base, GValue *val,
477     PropertySurety surety, PropertySource source);
478
479 static gboolean s3_device_set_server_side_encryption_fn(Device *self,
480     DevicePropertyBase *base, GValue *val,
481     PropertySurety surety, PropertySource source);
482
483 static gboolean s3_device_set_proxy_fn(Device *self,
484     DevicePropertyBase *base, GValue *val,
485     PropertySurety surety, PropertySource source);
486
487 static gboolean s3_device_set_ca_info_fn(Device *self,
488     DevicePropertyBase *base, GValue *val,
489     PropertySurety surety, PropertySource source);
490
491 static gboolean s3_device_set_verbose_fn(Device *self,
492     DevicePropertyBase *base, GValue *val,
493     PropertySurety surety, PropertySource source);
494
495 static gboolean s3_device_set_create_bucket_fn(Device *self,
496     DevicePropertyBase *base, GValue *val,
497     PropertySurety surety, PropertySource source);
498
499 static gboolean s3_device_set_storage_api(Device *self,
500     DevicePropertyBase *base, GValue *val,
501     PropertySurety surety, PropertySource source);
502
503 static gboolean s3_device_set_openstack_swift_api_fn(Device *self,
504     DevicePropertyBase *base, GValue *val,
505     PropertySurety surety, PropertySource source);
506
507 static gboolean s3_device_set_s3_multi_delete_fn(Device *self,
508     DevicePropertyBase *base, GValue *val,
509     PropertySurety surety, PropertySource source);
510
511 static gboolean s3_device_set_ssl_fn(Device *self,
512     DevicePropertyBase *base, GValue *val,
513     PropertySurety surety, PropertySource source);
514
515 static gboolean s3_device_set_reuse_connection_fn(Device *self,
516     DevicePropertyBase *base, GValue *val,
517     PropertySurety surety, PropertySource source);
518
519 static gboolean s3_device_set_max_send_speed_fn(Device *self,
520     DevicePropertyBase *base, GValue *val,
521     PropertySurety surety, PropertySource source);
522
523 static gboolean s3_device_set_max_recv_speed_fn(Device *self,
524     DevicePropertyBase *base, GValue *val,
525     PropertySurety surety, PropertySource source);
526
527 static gboolean s3_device_set_nb_threads_backup(Device *self,
528     DevicePropertyBase *base, GValue *val,
529     PropertySurety surety, PropertySource source);
530
531 static gboolean s3_device_set_nb_threads_recovery(Device *self,
532     DevicePropertyBase *base, GValue *val,
533     PropertySurety surety, PropertySource source);
534
535 static gboolean s3_device_set_max_volume_usage_fn(Device *p_self,
536     DevicePropertyBase *base, GValue *val,
537     PropertySurety surety, PropertySource source);
538
539 static gboolean property_set_leom_fn(Device *p_self,
540     DevicePropertyBase *base, GValue *val,
541     PropertySurety surety, PropertySource source);
542
543 static gboolean s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
544     DevicePropertyBase *base, GValue *val,
545     PropertySurety surety, PropertySource source);
546
547 static gboolean s3_device_set_use_subdomain_fn(Device *p_self,
548     DevicePropertyBase *base, GValue *val,
549     PropertySurety surety, PropertySource source);
550
551 static gboolean s3_device_set_host_fn(Device *p_self,
552     DevicePropertyBase *base, GValue *val,
553     PropertySurety surety, PropertySource source);
554
555 static gboolean s3_device_set_service_path_fn(Device *p_self,
556     DevicePropertyBase *base, GValue *val,
557     PropertySurety surety, PropertySource source);
558
559 static gboolean s3_device_set_client_id_fn(Device *p_self,
560     DevicePropertyBase *base, GValue *val,
561     PropertySurety surety, PropertySource source);
562
563 static gboolean s3_device_set_client_secret_fn(Device *p_self,
564     DevicePropertyBase *base, GValue *val,
565     PropertySurety surety, PropertySource source);
566
567 static gboolean s3_device_set_refresh_token_fn(Device *p_self,
568     DevicePropertyBase *base, GValue *val,
569     PropertySurety surety, PropertySource source);
570
571 static gboolean s3_device_set_project_id_fn(Device *p_self,
572     DevicePropertyBase *base, GValue *val,
573     PropertySurety surety, PropertySource source);
574
575 static gboolean s3_device_set_reps_fn(Device *self,
576     DevicePropertyBase *base, GValue *val,
577     PropertySurety surety, PropertySource source);
578
579 static gboolean s3_device_set_reps_bucket_fn(Device *self,
580     DevicePropertyBase *base, GValue *val,
581     PropertySurety surety, PropertySource source);
582
583 static void s3_thread_read_block(gpointer thread_data,
584                                  gpointer data);
585 static void s3_thread_write_block(gpointer thread_data,
586                                   gpointer data);
587 static gboolean make_bucket(Device * pself);
588
589
590 /* Wait that all threads are done */
591 static void reset_thread(S3Device *self);
592
593 /*
594  * virtual functions */
595
596 static void
597 s3_device_open_device(Device *pself, char *device_name,
598                   char * device_type, char * device_node);
599
600 static DeviceStatusFlags s3_device_read_label(Device * self);
601
602 static gboolean
603 s3_device_start(Device * self,
604                 DeviceAccessMode mode,
605                 char * label,
606                 char * timestamp);
607
608 static gboolean
609 s3_device_finish(Device * self);
610
611 static guint64
612 s3_device_get_bytes_read(Device * self);
613
614 static guint64
615 s3_device_get_bytes_written(Device * self);
616
617 static gboolean
618 s3_device_start_file(Device * self,
619                      dumpfile_t * jobInfo);
620
621 static gboolean
622 s3_device_write_block(Device * self,
623                       guint size,
624                       gpointer data);
625
626 static gboolean
627 s3_device_finish_file(Device * self);
628
629 static dumpfile_t*
630 s3_device_seek_file(Device *pself,
631                     guint file);
632
633 static gboolean
634 s3_device_seek_block(Device *pself,
635                      guint64 block);
636
637 static int
638 s3_device_read_block(Device * pself,
639                      gpointer data,
640                      int *size_req);
641
642 static gboolean
643 s3_device_recycle_file(Device *pself,
644                        guint file);
645
646 static gboolean
647 s3_device_erase(Device *pself);
648
649 static gboolean
650 check_at_leom(S3Device *self,
651                 guint64 size);
652
653 static gboolean
654 check_at_peom(S3Device *self,
655                 guint64 size);
656
657 /*
658  * Private functions
659  */
660
661 static char *
662 file_and_block_to_key(S3Device *self,
663                       int file,
664                       guint64 block)
665 {
666     char *s3_key = g_strdup_printf("%sf%08x-b%016llx.data",
667                                    self->prefix, file, (long long unsigned int)block);
668     g_assert(strlen(s3_key) <= S3_MAX_KEY_LENGTH);
669     return s3_key;
670 }
671
672 static char *
673 special_file_to_key(S3Device *self,
674                     char *special_name,
675                     int file)
676 {
677     if (file == -1)
678         return g_strdup_printf("%s" SPECIAL_INFIX "%s", self->prefix, special_name);
679     else
680         return g_strdup_printf("%sf%08x-%s", self->prefix, file, special_name);
681 }
682
683 static gboolean
684 write_amanda_header(S3Device *self,
685                     char *label,
686                     char * timestamp)
687 {
688     CurlBuffer amanda_header = {NULL, 0, 0, 0};
689     char * key = NULL;
690     gboolean result;
691     dumpfile_t * dumpinfo = NULL;
692     Device *d_self = DEVICE(self);
693     size_t header_size;
694
695     /* build the header */
696     header_size = 0; /* no minimum size */
697     dumpinfo = make_tapestart_header(DEVICE(self), label, timestamp);
698     amanda_header.buffer = device_build_amanda_header(DEVICE(self), dumpinfo,
699         &header_size);
700     if (amanda_header.buffer == NULL) {
701         device_set_error(d_self,
702             stralloc(_("Amanda tapestart header won't fit in a single block!")),
703             DEVICE_STATUS_DEVICE_ERROR);
704         dumpfile_free(dumpinfo);
705         g_free(amanda_header.buffer);
706         return FALSE;
707     }
708
709     if(check_at_leom(self, header_size))
710         d_self->is_eom = TRUE;
711
712     if(check_at_peom(self, header_size)) {
713         d_self->is_eom = TRUE;
714         device_set_error(d_self,
715             stralloc(_("No space left on device")),
716             DEVICE_STATUS_DEVICE_ERROR);
717         g_free(amanda_header.buffer);
718         return FALSE;
719     }
720
721     /* write out the header and flush the uploads. */
722     key = special_file_to_key(self, "tapestart", -1);
723     g_assert(header_size < G_MAXUINT); /* for cast to guint */
724     amanda_header.buffer_len = (guint)header_size;
725     result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
726                        &amanda_header, NULL, NULL);
727     g_free(amanda_header.buffer);
728     g_free(key);
729
730     if (!result) {
731         device_set_error(d_self,
732             vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3t[0].s3)),
733             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
734         dumpfile_free(dumpinfo);
735     } else {
736         dumpfile_free(d_self->volume_header);
737         d_self->volume_header = dumpinfo;
738         self->volume_bytes += header_size;
739     }
740     d_self->header_block_size = header_size;
741     return result;
742 }
743
744 static gboolean
745 seek_to_end(S3Device *self) {
746     int last_file;
747
748     Device *pself = DEVICE(self);
749
750     last_file = find_last_file(self);
751     if (last_file < 0)
752         return FALSE;
753
754     pself->file = last_file;
755
756     return TRUE;
757 }
758
759 /* Convert an object name into a file number, assuming the given prefix
760  * length. Returns -1 if the object name is invalid, or 0 if the object name
761  * is a "special" key. */
762 static int key_to_file(guint prefix_len, const char * key) {
763     int file;
764     int i;
765
766     /* skip the prefix */
767     if (strlen(key) <= prefix_len)
768         return -1;
769
770     key += prefix_len;
771
772     if (strncmp(key, SPECIAL_INFIX, strlen(SPECIAL_INFIX)) == 0) {
773         return 0;
774     }
775
776     /* check that key starts with 'f' */
777     if (key[0] != 'f')
778         return -1;
779     key++;
780
781     /* check that key is of the form "%08x-" */
782     for (i = 0; i < 8; i++) {
783         if (!(key[i] >= '0' && key[i] <= '9') &&
784             !(key[i] >= 'a' && key[i] <= 'f') &&
785             !(key[i] >= 'A' && key[i] <= 'F')) break;
786     }
787     if (key[i] != '-') return -1;
788     if (i < 8) return -1;
789
790     /* convert the file number */
791     errno = 0;
792     file = strtoul(key, NULL, 16);
793     if (errno != 0) {
794         g_warning(_("unparseable file number '%s'"), key);
795         return -1;
796     }
797
798     return file;
799 }
800
801 /* Find the number of the last file that contains any data (even just a header).
802  * Returns -1 in event of an error
803  */
804 static int
805 find_last_file(S3Device *self) {
806     gboolean result;
807     GSList *keys;
808     unsigned int prefix_len = strlen(self->prefix);
809     int last_file = 0;
810     Device *d_self = DEVICE(self);
811
812     /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
813     result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-", &keys, NULL);
814     if (!result) {
815         device_set_error(d_self,
816             vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
817             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
818         return -1;
819     }
820
821     for (; keys; keys = g_slist_remove(keys, keys->data)) {
822         int file = key_to_file(prefix_len, keys->data);
823
824         /* and if it's the last, keep it */
825         if (file > last_file)
826             last_file = file;
827     }
828
829     return last_file;
830 }
831
832 /* Find the number of the file following the requested one, if any.
833  * Returns 0 if there is no such file or -1 in event of an error
834  */
835 static int
836 find_next_file(S3Device *self, int last_file) {
837     gboolean result;
838     GSList *keys;
839     unsigned int prefix_len = strlen(self->prefix);
840     int next_file = 0;
841     Device *d_self = DEVICE(self);
842
843     /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
844     result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-",
845                           &keys, NULL);
846     if (!result) {
847         device_set_error(d_self,
848             vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
849             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
850         return -1;
851     }
852
853     for (; keys; keys = g_slist_remove(keys, keys->data)) {
854         int file;
855
856         file = key_to_file(prefix_len, (char*)keys->data);
857
858         if (file < 0) {
859             /* Set this in case we don't find a next file; this is not a
860              * hard error, so if we can find a next file we'll return that
861              * instead. */
862             next_file = -1;
863         }
864
865         if (file < next_file && file > last_file) {
866             next_file = file;
867         }
868     }
869
870     return next_file;
871 }
872
873 static gboolean
874 delete_file(S3Device *self,
875             int file)
876 {
877     int thread = -1;
878
879     gboolean result;
880     GSList *keys;
881     guint64 total_size = 0;
882     Device *d_self = DEVICE(self);
883     char *my_prefix;
884
885     if (file == -1) {
886         my_prefix = g_strdup_printf("%sf", self->prefix);
887     } else {
888         my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
889     }
890
891     result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL,
892                           &keys, &total_size);
893     if (!result) {
894         device_set_error(d_self,
895                 g_strdup_printf(_("While listing S3 keys: %s"),
896                     s3_strerror(self->s3t[0].s3)),
897                 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
898         return FALSE;
899     }
900
901     g_mutex_lock(self->thread_idle_mutex);
902     if (!self->keys) {
903         self->keys = keys;
904     } else {
905         self->keys = g_slist_concat(self->keys, keys);
906     }
907
908     // start the threads
909     for (thread = 0; thread < self->nb_threads; thread++)  {
910         if (self->s3t[thread].idle == 1) {
911             /* Check if the thread is in error */
912             if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
913                 device_set_error(d_self,
914                                  (char *)self->s3t[thread].errmsg,
915                                  self->s3t[thread].errflags);
916                 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
917                 self->s3t[thread].errmsg = NULL;
918                 g_mutex_unlock(self->thread_idle_mutex);
919                 s3_wait_thread_delete(self);
920                 return FALSE;
921             }
922             self->s3t[thread].idle = 0;
923             self->s3t[thread].done = 0;
924             g_thread_pool_push(self->thread_pool_delete, &self->s3t[thread],
925                                NULL);
926         }
927     }
928     g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
929     g_mutex_unlock(self->thread_idle_mutex);
930
931     self->volume_bytes = total_size;
932
933     s3_wait_thread_delete(self);
934
935     return TRUE;
936 }
937
938 static void
939 s3_thread_delete_block(
940     gpointer thread_data,
941     gpointer data)
942 {
943     static int count = 0;
944     S3_by_thread *s3t = (S3_by_thread *)thread_data;
945     Device *pself = (Device *)data;
946     S3Device *self = S3_DEVICE(pself);
947     int result = 1;
948     char *filename;
949
950     g_mutex_lock(self->thread_idle_mutex);
951     while (result && self->keys) {
952         if (self->use_s3_multi_delete) {
953             char **filenames = g_new(char *, 1001);
954             char **f = filenames;
955             int  n = 0;
956             while (self->keys && n<1000) {
957                 *f++ = self->keys->data;
958                 self->keys = g_slist_remove(self->keys, self->keys->data);
959                 n++;
960             }
961             *f++ = NULL;
962             g_mutex_unlock(self->thread_idle_mutex);
963             result = s3_multi_delete(s3t->s3, (const char *)self->bucket,
964                                               (const char **)filenames);
965             if (result != 1) {
966                 char **f;
967
968                 if (result == 2) {
969                     g_debug("Deleting multiple keys not implemented");
970                 } else { /* result == 0 */
971                     g_debug("Deleteing multiple keys failed: %s",
972                             s3_strerror(s3t->s3));
973                 }
974
975                 self->use_s3_multi_delete = 0;
976                 /* re-add all filenames */
977                 f = filenames;
978                 g_mutex_lock(self->thread_idle_mutex);
979                 while(*f) {
980                     self->keys = g_slist_prepend(self->keys, *f++);
981                 }
982                 g_mutex_unlock(self->thread_idle_mutex);
983                 g_free(filenames);
984                 result = 1;
985                 g_mutex_lock(self->thread_idle_mutex);
986                 continue;
987             }
988             f = filenames;
989             while(*f) {
990                 g_free(*f++);
991             }
992             g_free(filenames);
993         } else {
994             filename = self->keys->data;
995             self->keys = g_slist_remove(self->keys, self->keys->data);
996             count++;
997             if (count >= 1000) {
998                 g_debug("Deleting %s ...", filename);
999                 count = 0;
1000             }
1001             g_mutex_unlock(self->thread_idle_mutex);
1002             result = s3_delete(s3t->s3, (const char *)self->bucket,
1003                                         (const char *)filename);
1004             if (!result) {
1005                 s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
1006                 s3t->errmsg = g_strdup_printf(_("While deleting key '%s': %s"),
1007                                           filename, s3_strerror(s3t->s3));
1008             }
1009             g_free(filename);
1010         }
1011         g_mutex_lock(self->thread_idle_mutex);
1012     }
1013     s3t->idle = 1;
1014     s3t->done = 1;
1015     g_cond_broadcast(self->thread_idle_cond);
1016     g_mutex_unlock(self->thread_idle_mutex);
1017 }
1018
1019 static void
1020 s3_wait_thread_delete(S3Device *self)
1021 {
1022     Device *d_self = (Device *)self;
1023     int idle_thread = 0;
1024     int thread;
1025
1026     g_mutex_lock(self->thread_idle_mutex);
1027     while (idle_thread != self->nb_threads) {
1028         idle_thread = 0;
1029         for (thread = 0; thread < self->nb_threads; thread++)  {
1030             if (self->s3t[thread].idle == 1) {
1031                 idle_thread++;
1032             }
1033             /* Check if the thread is in error */
1034             if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
1035                 device_set_error(d_self, (char *)self->s3t[thread].errmsg,
1036                                              self->s3t[thread].errflags);
1037                 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
1038                 self->s3t[thread].errmsg = NULL;
1039             }
1040         }
1041         if (idle_thread != self->nb_threads) {
1042             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
1043         }
1044     }
1045     g_mutex_unlock(self->thread_idle_mutex);
1046 }
1047
1048
1049 static gboolean
1050 delete_all_files(S3Device *self)
1051 {
1052     return delete_file(self, -1);
1053 }
1054
1055 /*
1056  * Class mechanics
1057  */
1058
1059 void
1060 s3_device_register(void)
1061 {
1062     static const char * device_prefix_list[] = { S3_DEVICE_NAME, NULL };
1063     g_assert(s3_init());
1064
1065     /* set up our properties */
1066     device_property_fill_and_register(&device_property_s3_secret_key,
1067                                       G_TYPE_STRING, "s3_secret_key",
1068        "Secret access key to authenticate with Amazon S3");
1069     device_property_fill_and_register(&device_property_s3_access_key,
1070                                       G_TYPE_STRING, "s3_access_key",
1071        "Access key ID to authenticate with Amazon S3");
1072     device_property_fill_and_register(&device_property_swift_account_id,
1073                                       G_TYPE_STRING, "swift_account_id",
1074        "Account ID to authenticate with openstack swift");
1075     device_property_fill_and_register(&device_property_swift_access_key,
1076                                       G_TYPE_STRING, "swift_access_key",
1077        "Access key to authenticate with openstack swift");
1078     device_property_fill_and_register(&device_property_username,
1079                                       G_TYPE_STRING, "username",
1080        "Username to authenticate with");
1081     device_property_fill_and_register(&device_property_password,
1082                                       G_TYPE_STRING, "password",
1083        "password to authenticate with");
1084     device_property_fill_and_register(&device_property_tenant_id,
1085                                       G_TYPE_STRING, "tenant_id",
1086        "tenant_id to authenticate with");
1087     device_property_fill_and_register(&device_property_tenant_name,
1088                                       G_TYPE_STRING, "tenant_name",
1089        "tenant_name to authenticate with");
1090     device_property_fill_and_register(&device_property_s3_host,
1091                                       G_TYPE_STRING, "s3_host",
1092        "hostname:port of the server");
1093     device_property_fill_and_register(&device_property_s3_service_path,
1094                                       G_TYPE_STRING, "s3_service_path",
1095        "path to add in the url");
1096     device_property_fill_and_register(&device_property_s3_user_token,
1097                                       G_TYPE_STRING, "s3_user_token",
1098        "User token for authentication Amazon devpay requests");
1099     device_property_fill_and_register(&device_property_s3_bucket_location,
1100                                       G_TYPE_STRING, "s3_bucket_location",
1101        "Location constraint for buckets on Amazon S3");
1102     device_property_fill_and_register(&device_property_s3_storage_class,
1103                                       G_TYPE_STRING, "s3_storage_class",
1104        "Storage class as specified by Amazon (STANDARD or REDUCED_REDUNDANCY)");
1105     device_property_fill_and_register(&device_property_s3_server_side_encryption,
1106                                       G_TYPE_STRING, "s3_server_side_encryption",
1107        "Serve side encryption as specified by Amazon (AES256)");
1108     device_property_fill_and_register(&device_property_proxy,
1109                                       G_TYPE_STRING, "proxy",
1110        "The proxy");
1111     device_property_fill_and_register(&device_property_ssl_ca_info,
1112                                       G_TYPE_STRING, "ssl_ca_info",
1113        "Path to certificate authority certificate");
1114     device_property_fill_and_register(&device_property_storage_api,
1115                                       G_TYPE_STRING, "storage_api",
1116        "Which cloud API to use.");
1117     device_property_fill_and_register(&device_property_openstack_swift_api,
1118                                       G_TYPE_STRING, "openstack_swift_api",
1119        "Whether to use openstack protocol");
1120     device_property_fill_and_register(&device_property_client_id,
1121                                       G_TYPE_STRING, "client_id",
1122        "client_id for use with oauth2");
1123     device_property_fill_and_register(&device_property_client_secret,
1124                                       G_TYPE_STRING, "client_secret",
1125        "client_secret for use with oauth2");
1126     device_property_fill_and_register(&device_property_refresh_token,
1127                                       G_TYPE_STRING, "refresh_token",
1128        "refresh_token for use with oauth2");
1129     device_property_fill_and_register(&device_property_project_id,
1130                                       G_TYPE_STRING, "project_id",
1131        "project id for use with google");
1132     device_property_fill_and_register(&device_property_s3_ssl,
1133                                       G_TYPE_BOOLEAN, "s3_ssl",
1134        "Whether to use SSL with Amazon S3");
1135     device_property_fill_and_register(&device_property_reuse_connection,
1136                                       G_TYPE_BOOLEAN, "reuse_connection",
1137        "Whether to reuse connection");
1138     device_property_fill_and_register(&device_property_create_bucket,
1139                                       G_TYPE_BOOLEAN, "create_bucket",
1140        "Whether to create/delete bucket");
1141     device_property_fill_and_register(&device_property_s3_subdomain,
1142                                       G_TYPE_BOOLEAN, "s3_subdomain",
1143        "Whether to use subdomain");
1144     device_property_fill_and_register(&device_property_max_send_speed,
1145                                       G_TYPE_UINT64, "max_send_speed",
1146        "Maximum average upload speed (bytes/sec)");
1147     device_property_fill_and_register(&device_property_max_recv_speed,
1148                                       G_TYPE_UINT64, "max_recv_speed",
1149        "Maximum average download speed (bytes/sec)");
1150     device_property_fill_and_register(&device_property_nb_threads_backup,
1151                                       G_TYPE_UINT64, "nb_threads_backup",
1152        "Number of writer thread");
1153     device_property_fill_and_register(&device_property_nb_threads_recovery,
1154                                       G_TYPE_UINT64, "nb_threads_recovery",
1155        "Number of reader thread");
1156     device_property_fill_and_register(&device_property_s3_multi_delete,
1157                                       G_TYPE_BOOLEAN, "s3_multi_delete",
1158        "Whether to use multi-delete");
1159     device_property_fill_and_register(&device_property_s3_reps,
1160                                       G_TYPE_STRING, "reps",
1161        "Number of replicas for data objects in CAStor");
1162     device_property_fill_and_register(&device_property_s3_reps_bucket,
1163                                       G_TYPE_STRING, "reps_bucket",
1164        "Number of replicas for automatically created buckets in CAStor");
1165
1166     /* register the device itself */
1167     register_device(s3_device_factory, device_prefix_list);
1168 }
1169
1170 static GType
1171 s3_device_get_type(void)
1172 {
1173     static GType type = 0;
1174
1175     if G_UNLIKELY(type == 0) {
1176         static const GTypeInfo info = {
1177             sizeof (S3DeviceClass),
1178             (GBaseInitFunc) NULL,
1179             (GBaseFinalizeFunc) NULL,
1180             (GClassInitFunc) s3_device_class_init,
1181             (GClassFinalizeFunc) NULL,
1182             NULL /* class_data */,
1183             sizeof (S3Device),
1184             0 /* n_preallocs */,
1185             (GInstanceInitFunc) s3_device_init,
1186             NULL
1187         };
1188
1189         type = g_type_register_static (TYPE_DEVICE, "S3Device", &info,
1190                                        (GTypeFlags)0);
1191     }
1192
1193     return type;
1194 }
1195
1196 static void
1197 s3_device_init(S3Device * self)
1198 {
1199     Device * dself = DEVICE(self);
1200     GValue response;
1201
1202     self->s3_api = S3_API_S3;
1203     self->volume_bytes = 0;
1204     self->volume_limit = 0;
1205     self->leom = TRUE;
1206     self->enforce_volume_limit = FALSE;
1207     self->use_subdomain = FALSE;
1208     self->nb_threads = 1;
1209     self->nb_threads_backup = 1;
1210     self->nb_threads_recovery = 1;
1211     self->thread_pool_delete = NULL;
1212     self->thread_pool_write = NULL;
1213     self->thread_pool_read = NULL;
1214     self->thread_idle_cond = NULL;
1215     self->thread_idle_mutex = NULL;
1216     self->use_s3_multi_delete = 1;
1217     self->reps = NULL;
1218     self->reps_bucket = NULL;
1219
1220     /* Register property values
1221      * Note: Some aren't added until s3_device_open_device()
1222      */
1223     bzero(&response, sizeof(response));
1224
1225     g_value_init(&response, CONCURRENCY_PARADIGM_TYPE);
1226     g_value_set_enum(&response, CONCURRENCY_PARADIGM_SHARED_READ);
1227     device_set_simple_property(dself, PROPERTY_CONCURRENCY,
1228             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1229     g_value_unset(&response);
1230
1231     g_value_init(&response, STREAMING_REQUIREMENT_TYPE);
1232     g_value_set_enum(&response, STREAMING_REQUIREMENT_NONE);
1233     device_set_simple_property(dself, PROPERTY_STREAMING,
1234             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1235     g_value_unset(&response);
1236
1237     g_value_init(&response, G_TYPE_BOOLEAN);
1238     g_value_set_boolean(&response, TRUE);
1239     device_set_simple_property(dself, PROPERTY_APPENDABLE,
1240             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1241     g_value_unset(&response);
1242
1243     g_value_init(&response, G_TYPE_BOOLEAN);
1244     g_value_set_boolean(&response, TRUE);
1245     device_set_simple_property(dself, PROPERTY_PARTIAL_DELETION,
1246             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1247     g_value_unset(&response);
1248
1249     g_value_init(&response, G_TYPE_BOOLEAN);
1250     g_value_set_boolean(&response, TRUE);
1251     device_set_simple_property(dself, PROPERTY_FULL_DELETION,
1252             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1253     g_value_unset(&response);
1254
1255     g_value_init(&response, G_TYPE_BOOLEAN);
1256     g_value_set_boolean(&response, TRUE); /* well, there *is* no EOM on S3 .. */
1257     device_set_simple_property(dself, PROPERTY_LEOM,
1258             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1259     g_value_unset(&response);
1260
1261     g_value_init(&response, G_TYPE_BOOLEAN);
1262     g_value_set_boolean(&response, FALSE);
1263     device_set_simple_property(dself, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
1264             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1265     g_value_unset(&response);
1266
1267     g_value_init(&response, G_TYPE_BOOLEAN);
1268     g_value_set_boolean(&response, FALSE);
1269     device_set_simple_property(dself, PROPERTY_S3_SUBDOMAIN,
1270             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1271     g_value_unset(&response);
1272
1273     g_value_init(&response, G_TYPE_BOOLEAN);
1274     g_value_set_boolean(&response, FALSE);
1275     device_set_simple_property(dself, PROPERTY_COMPRESSION,
1276             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1277     g_value_unset(&response);
1278
1279     g_value_init(&response, MEDIA_ACCESS_MODE_TYPE);
1280     g_value_set_enum(&response, MEDIA_ACCESS_MODE_READ_WRITE);
1281     device_set_simple_property(dself, PROPERTY_MEDIUM_ACCESS_TYPE,
1282             &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1283     g_value_unset(&response);
1284
1285 }
1286
1287 static void
1288 s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED)
1289 {
1290     GObjectClass *g_object_class = (GObjectClass*) c;
1291     DeviceClass *device_class = (DeviceClass *)c;
1292
1293     parent_class = g_type_class_ref (TYPE_DEVICE);
1294
1295     device_class->open_device = s3_device_open_device;
1296     device_class->read_label = s3_device_read_label;
1297     device_class->start = s3_device_start;
1298     device_class->finish = s3_device_finish;
1299     device_class->get_bytes_read = s3_device_get_bytes_read;
1300     device_class->get_bytes_written = s3_device_get_bytes_written;
1301
1302     device_class->start_file = s3_device_start_file;
1303     device_class->write_block = s3_device_write_block;
1304     device_class->finish_file = s3_device_finish_file;
1305
1306     device_class->seek_file = s3_device_seek_file;
1307     device_class->seek_block = s3_device_seek_block;
1308     device_class->read_block = s3_device_read_block;
1309     device_class->recycle_file = s3_device_recycle_file;
1310
1311     device_class->erase = s3_device_erase;
1312
1313     g_object_class->finalize = s3_device_finalize;
1314
1315     device_class_register_property(device_class, PROPERTY_S3_ACCESS_KEY,
1316             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1317             device_simple_property_get_fn,
1318             s3_device_set_access_key_fn);
1319
1320     device_class_register_property(device_class, PROPERTY_S3_SECRET_KEY,
1321             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1322             device_simple_property_get_fn,
1323             s3_device_set_secret_key_fn);
1324
1325     device_class_register_property(device_class, PROPERTY_SWIFT_ACCOUNT_ID,
1326             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1327             device_simple_property_get_fn,
1328             s3_device_set_swift_account_id_fn);
1329
1330     device_class_register_property(device_class, PROPERTY_SWIFT_ACCESS_KEY,
1331             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1332             device_simple_property_get_fn,
1333             s3_device_set_swift_access_key_fn);
1334
1335     device_class_register_property(device_class, PROPERTY_USERNAME,
1336             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1337             device_simple_property_get_fn,
1338             s3_device_set_username);
1339
1340     device_class_register_property(device_class, PROPERTY_PASSWORD,
1341             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1342             device_simple_property_get_fn,
1343             s3_device_set_password);
1344
1345     device_class_register_property(device_class, PROPERTY_TENANT_ID,
1346             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1347             device_simple_property_get_fn,
1348             s3_device_set_tenant_id);
1349
1350     device_class_register_property(device_class, PROPERTY_TENANT_NAME,
1351             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1352             device_simple_property_get_fn,
1353             s3_device_set_tenant_name);
1354
1355     device_class_register_property(device_class, PROPERTY_S3_HOST,
1356             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1357             device_simple_property_get_fn,
1358             s3_device_set_host_fn);
1359
1360     device_class_register_property(device_class, PROPERTY_S3_SERVICE_PATH,
1361             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1362             device_simple_property_get_fn,
1363             s3_device_set_service_path_fn);
1364
1365     device_class_register_property(device_class, PROPERTY_S3_USER_TOKEN,
1366             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1367             device_simple_property_get_fn,
1368             s3_device_set_user_token_fn);
1369
1370     device_class_register_property(device_class, PROPERTY_S3_BUCKET_LOCATION,
1371             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1372             device_simple_property_get_fn,
1373             s3_device_set_bucket_location_fn);
1374
1375     device_class_register_property(device_class, PROPERTY_S3_STORAGE_CLASS,
1376             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1377             device_simple_property_get_fn,
1378             s3_device_set_storage_class_fn);
1379
1380     device_class_register_property(device_class, PROPERTY_S3_SERVER_SIDE_ENCRYPTION,
1381             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1382             device_simple_property_get_fn,
1383             s3_device_set_server_side_encryption_fn);
1384
1385     device_class_register_property(device_class, PROPERTY_PROXY,
1386             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1387             device_simple_property_get_fn,
1388             s3_device_set_proxy_fn);
1389
1390     device_class_register_property(device_class, PROPERTY_SSL_CA_INFO,
1391             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1392             device_simple_property_get_fn,
1393             s3_device_set_ca_info_fn);
1394
1395     device_class_register_property(device_class, PROPERTY_VERBOSE,
1396             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1397             device_simple_property_get_fn,
1398             s3_device_set_verbose_fn);
1399
1400     device_class_register_property(device_class, PROPERTY_CREATE_BUCKET,
1401             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1402             device_simple_property_get_fn,
1403             s3_device_set_create_bucket_fn);
1404
1405     device_class_register_property(device_class, PROPERTY_STORAGE_API,
1406             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1407             device_simple_property_get_fn,
1408             s3_device_set_storage_api);
1409
1410     device_class_register_property(device_class, PROPERTY_OPENSTACK_SWIFT_API,
1411             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1412             device_simple_property_get_fn,
1413             s3_device_set_openstack_swift_api_fn);
1414
1415     device_class_register_property(device_class, PROPERTY_S3_MULTI_DELETE,
1416             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1417             device_simple_property_get_fn,
1418             s3_device_set_s3_multi_delete_fn);
1419
1420     device_class_register_property(device_class, PROPERTY_S3_SSL,
1421             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1422             device_simple_property_get_fn,
1423             s3_device_set_ssl_fn);
1424
1425     device_class_register_property(device_class, PROPERTY_REUSE_CONNECTION,
1426             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1427             device_simple_property_get_fn,
1428             s3_device_set_reuse_connection_fn);
1429
1430     device_class_register_property(device_class, PROPERTY_MAX_SEND_SPEED,
1431             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1432             device_simple_property_get_fn,
1433             s3_device_set_max_send_speed_fn);
1434
1435     device_class_register_property(device_class, PROPERTY_MAX_RECV_SPEED,
1436             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1437             device_simple_property_get_fn,
1438             s3_device_set_max_recv_speed_fn);
1439
1440     device_class_register_property(device_class, PROPERTY_NB_THREADS_BACKUP,
1441             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1442             device_simple_property_get_fn,
1443             s3_device_set_nb_threads_backup);
1444
1445     device_class_register_property(device_class, PROPERTY_NB_THREADS_RECOVERY,
1446             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1447             device_simple_property_get_fn,
1448             s3_device_set_nb_threads_recovery);
1449
1450     device_class_register_property(device_class, PROPERTY_COMPRESSION,
1451             PROPERTY_ACCESS_GET_MASK,
1452             device_simple_property_get_fn,
1453             NULL);
1454
1455     device_class_register_property(device_class, PROPERTY_LEOM,
1456             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1457             device_simple_property_get_fn,
1458             property_set_leom_fn);
1459
1460     device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
1461             (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1462                 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1463             device_simple_property_get_fn,
1464             s3_device_set_max_volume_usage_fn);
1465
1466     device_class_register_property(device_class, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
1467             (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1468                 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1469             device_simple_property_get_fn,
1470             s3_device_set_enforce_max_volume_usage_fn);
1471
1472     device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN,
1473             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1474             device_simple_property_get_fn,
1475             s3_device_set_use_subdomain_fn);
1476
1477     device_class_register_property(device_class, PROPERTY_CLIENT_ID,
1478             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1479             device_simple_property_get_fn,
1480             s3_device_set_client_id_fn);
1481
1482     device_class_register_property(device_class, PROPERTY_CLIENT_SECRET,
1483             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1484             device_simple_property_get_fn,
1485             s3_device_set_client_secret_fn);
1486
1487     device_class_register_property(device_class, PROPERTY_REFRESH_TOKEN,
1488             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1489             device_simple_property_get_fn,
1490             s3_device_set_refresh_token_fn);
1491
1492     device_class_register_property(device_class, PROPERTY_PROJECT_ID,
1493             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1494             device_simple_property_get_fn,
1495             s3_device_set_project_id_fn);
1496
1497     device_class_register_property(device_class, PROPERTY_S3_REPS,
1498             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1499             device_simple_property_get_fn,
1500             s3_device_set_reps_fn);
1501
1502     device_class_register_property(device_class, PROPERTY_S3_REPS_BUCKET,
1503             PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1504             device_simple_property_get_fn,
1505             s3_device_set_reps_bucket_fn);
1506 }
1507
1508 static gboolean
1509 s3_device_set_access_key_fn(Device *p_self, DevicePropertyBase *base,
1510     GValue *val, PropertySurety surety, PropertySource source)
1511 {
1512     S3Device *self = S3_DEVICE(p_self);
1513
1514     amfree(self->access_key);
1515     self->access_key = g_value_dup_string(val);
1516     device_clear_volume_details(p_self);
1517
1518     return device_simple_property_set_fn(p_self, base, val, surety, source);
1519 }
1520
1521 static gboolean
1522 s3_device_set_secret_key_fn(Device *p_self, DevicePropertyBase *base,
1523     GValue *val, PropertySurety surety, PropertySource source)
1524 {
1525     S3Device *self = S3_DEVICE(p_self);
1526
1527     amfree(self->secret_key);
1528     self->secret_key = g_value_dup_string(val);
1529     device_clear_volume_details(p_self);
1530
1531     return device_simple_property_set_fn(p_self, base, val, surety, source);
1532 }
1533
1534 static gboolean
1535 s3_device_set_swift_account_id_fn(Device *p_self, DevicePropertyBase *base,
1536     GValue *val, PropertySurety surety, PropertySource source)
1537 {
1538     S3Device *self = S3_DEVICE(p_self);
1539
1540     amfree(self->swift_account_id);
1541     self->swift_account_id = g_value_dup_string(val);
1542     device_clear_volume_details(p_self);
1543
1544     return device_simple_property_set_fn(p_self, base, val, surety, source);
1545 }
1546
1547 static gboolean
1548 s3_device_set_swift_access_key_fn(Device *p_self, DevicePropertyBase *base,
1549     GValue *val, PropertySurety surety, PropertySource source)
1550 {
1551     S3Device *self = S3_DEVICE(p_self);
1552
1553     amfree(self->swift_access_key);
1554     self->swift_access_key = g_value_dup_string(val);
1555     device_clear_volume_details(p_self);
1556
1557     return device_simple_property_set_fn(p_self, base, val, surety, source);
1558 }
1559
1560 static gboolean
1561 s3_device_set_username(Device *p_self, DevicePropertyBase *base,
1562     GValue *val, PropertySurety surety, PropertySource source)
1563 {
1564     S3Device *self = S3_DEVICE(p_self);
1565
1566     amfree(self->username);
1567     self->username = g_value_dup_string(val);
1568     device_clear_volume_details(p_self);
1569
1570     return device_simple_property_set_fn(p_self, base, val, surety, source);
1571 }
1572
1573 static gboolean
1574 s3_device_set_password(Device *p_self, DevicePropertyBase *base,
1575     GValue *val, PropertySurety surety, PropertySource source)
1576 {
1577     S3Device *self = S3_DEVICE(p_self);
1578
1579     amfree(self->password);
1580     self->password = g_value_dup_string(val);
1581     device_clear_volume_details(p_self);
1582
1583     return device_simple_property_set_fn(p_self, base, val, surety, source);
1584 }
1585
1586 static gboolean
1587 s3_device_set_tenant_id(Device *p_self, DevicePropertyBase *base,
1588     GValue *val, PropertySurety surety, PropertySource source)
1589 {
1590     S3Device *self = S3_DEVICE(p_self);
1591
1592     amfree(self->tenant_id);
1593     self->tenant_id = g_value_dup_string(val);
1594     device_clear_volume_details(p_self);
1595
1596     return device_simple_property_set_fn(p_self, base, val, surety, source);
1597 }
1598
1599 static gboolean
1600 s3_device_set_tenant_name(Device *p_self, DevicePropertyBase *base,
1601     GValue *val, PropertySurety surety, PropertySource source)
1602 {
1603     S3Device *self = S3_DEVICE(p_self);
1604
1605     amfree(self->tenant_name);
1606     self->tenant_name = g_value_dup_string(val);
1607     device_clear_volume_details(p_self);
1608
1609     return device_simple_property_set_fn(p_self, base, val, surety, source);
1610 }
1611
1612 static gboolean
1613 s3_device_set_host_fn(Device *p_self,
1614     DevicePropertyBase *base, GValue *val,
1615     PropertySurety surety, PropertySource source)
1616 {
1617     S3Device *self = S3_DEVICE(p_self);
1618
1619     amfree(self->host);
1620     self->host = g_value_dup_string(val);
1621     device_clear_volume_details(p_self);
1622
1623     return device_simple_property_set_fn(p_self, base, val, surety, source);
1624 }
1625
1626 static gboolean
1627 s3_device_set_service_path_fn(Device *p_self,
1628     DevicePropertyBase *base, GValue *val,
1629     PropertySurety surety, PropertySource source)
1630 {
1631     S3Device *self = S3_DEVICE(p_self);
1632
1633     amfree(self->service_path);
1634     self->service_path = g_value_dup_string(val);
1635     device_clear_volume_details(p_self);
1636
1637     return device_simple_property_set_fn(p_self, base, val, surety, source);
1638 }
1639
1640 static gboolean
1641 s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base,
1642     GValue *val, PropertySurety surety, PropertySource source)
1643 {
1644     S3Device *self = S3_DEVICE(p_self);
1645
1646     amfree(self->user_token);
1647     self->user_token = g_value_dup_string(val);
1648     device_clear_volume_details(p_self);
1649
1650     return device_simple_property_set_fn(p_self, base, val, surety, source);
1651 }
1652
1653 static gboolean
1654 s3_device_set_bucket_location_fn(Device *p_self, DevicePropertyBase *base,
1655     GValue *val, PropertySurety surety, PropertySource source)
1656 {
1657     S3Device *self = S3_DEVICE(p_self);
1658     char *str_val = g_value_dup_string(val);
1659
1660     if (str_val[0] && self->use_ssl && !s3_curl_location_compat()) {
1661         device_set_error(p_self, stralloc(_(
1662                 "Location constraint given for Amazon S3 bucket, "
1663                 "but libcurl is too old support wildcard certificates.")),
1664             DEVICE_STATUS_DEVICE_ERROR);
1665         goto fail;
1666     }
1667
1668     if (str_val[0] && !s3_bucket_location_compat(self->bucket)) {
1669         device_set_error(p_self, g_strdup_printf(_(
1670                 "Location constraint given for Amazon S3 bucket, "
1671                 "but the bucket name (%s) is not usable as a subdomain."),
1672                 self->bucket),
1673             DEVICE_STATUS_DEVICE_ERROR);
1674         goto fail;
1675     }
1676
1677     amfree(self->bucket_location);
1678     self->bucket_location = str_val;
1679     device_clear_volume_details(p_self);
1680
1681     return device_simple_property_set_fn(p_self, base, val, surety, source);
1682 fail:
1683     g_free(str_val);
1684     return FALSE;
1685 }
1686
1687 static gboolean
1688 s3_device_set_storage_class_fn(Device *p_self, DevicePropertyBase *base,
1689     GValue *val, PropertySurety surety, PropertySource source)
1690 {
1691     S3Device *self = S3_DEVICE(p_self);
1692     char *str_val = g_value_dup_string(val);
1693
1694     amfree(self->storage_class);
1695     self->storage_class = str_val;
1696     device_clear_volume_details(p_self);
1697
1698     return device_simple_property_set_fn(p_self, base, val, surety, source);
1699 }
1700
1701 static gboolean
1702 s3_device_set_server_side_encryption_fn(Device *p_self, DevicePropertyBase *base,
1703     GValue *val, PropertySurety surety, PropertySource source)
1704 {
1705     S3Device *self = S3_DEVICE(p_self);
1706     char *str_val = g_value_dup_string(val);
1707
1708     amfree(self->server_side_encryption);
1709     self->server_side_encryption = str_val;
1710     device_clear_volume_details(p_self);
1711
1712     return device_simple_property_set_fn(p_self, base, val, surety, source);
1713 }
1714
1715 static gboolean
1716 s3_device_set_proxy_fn(Device *p_self, DevicePropertyBase *base,
1717     GValue *val, PropertySurety surety, PropertySource source)
1718 {
1719     S3Device *self = S3_DEVICE(p_self);
1720     char *str_val = g_value_dup_string(val);
1721
1722     amfree(self->proxy);
1723     self->proxy = str_val;
1724     device_clear_volume_details(p_self);
1725
1726     return device_simple_property_set_fn(p_self, base, val, surety, source);
1727 }
1728
1729 static gboolean
1730 s3_device_set_ca_info_fn(Device *p_self, DevicePropertyBase *base,
1731     GValue *val, PropertySurety surety, PropertySource source)
1732 {
1733     S3Device *self = S3_DEVICE(p_self);
1734
1735     amfree(self->ca_info);
1736     self->ca_info = g_value_dup_string(val);
1737     device_clear_volume_details(p_self);
1738
1739     return device_simple_property_set_fn(p_self, base, val, surety, source);
1740 }
1741
1742 static gboolean
1743 s3_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base,
1744     GValue *val, PropertySurety surety, PropertySource source)
1745 {
1746     S3Device *self = S3_DEVICE(p_self);
1747     int       thread;
1748
1749     self->verbose = g_value_get_boolean(val);
1750     /* Our S3 handle may not yet have been instantiated; if so, it will
1751      * get the proper verbose setting when it is created */
1752     if (self->s3t) {
1753         for (thread = 0; thread < self->nb_threads; thread++) {
1754             if (self->s3t[thread].s3)
1755                 s3_verbose(self->s3t[thread].s3, self->verbose);
1756         }
1757     }
1758
1759     return device_simple_property_set_fn(p_self, base, val, surety, source);
1760 }
1761
1762 static gboolean
1763 s3_device_set_create_bucket_fn(Device *p_self, DevicePropertyBase *base,
1764     GValue *val, PropertySurety surety, PropertySource source)
1765 {
1766     S3Device *self = S3_DEVICE(p_self);
1767     int       thread;
1768
1769     self->create_bucket = g_value_get_boolean(val);
1770     /* Our S3 handle may not yet have been instantiated; if so, it will
1771      * get the proper verbose setting when it is created */
1772     if (self->s3t) {
1773         for (thread = 0; thread < self->nb_threads; thread++) {
1774             if (self->s3t[thread].s3)
1775                 s3_verbose(self->s3t[thread].s3, self->verbose);
1776         }
1777     }
1778
1779     return device_simple_property_set_fn(p_self, base, val, surety, source);
1780 }
1781
1782 static gboolean
1783 s3_device_set_storage_api(Device *p_self, DevicePropertyBase *base,
1784     GValue *val, PropertySurety surety, PropertySource source)
1785 {
1786     S3Device *self = S3_DEVICE(p_self);
1787
1788     const char *storage_api = g_value_get_string(val);
1789     if (g_str_equal(storage_api, "S3")) {
1790         self->s3_api = S3_API_S3;
1791     } else if (g_str_equal(storage_api, "SWIFT-1.0")) {
1792         self->s3_api = S3_API_SWIFT_1;
1793     } else if (g_str_equal(storage_api, "SWIFT-2.0")) {
1794         self->s3_api = S3_API_SWIFT_2;
1795     } else if (g_str_equal(storage_api, "OAUTH2")) {
1796         self->s3_api = S3_API_OAUTH2;
1797     } else if (g_str_equal(storage_api, "CASTOR")) {
1798 #if LIBCURL_VERSION_NUM >= 0x071301
1799         curl_version_info_data *info;
1800         /* check the runtime version too */
1801         info = curl_version_info(CURLVERSION_NOW);
1802         if (info->version_num >= 0x071301) {
1803             self->s3_api = S3_API_CASTOR;
1804         } else {
1805             device_set_error(p_self, g_strdup_printf(_(
1806                         "Error setting STORAGE-API to castor "
1807                         "(You must install libcurl 7.19.1 or newer)")),
1808                     DEVICE_STATUS_DEVICE_ERROR);
1809             return FALSE;
1810         }
1811 #else
1812         device_set_error(p_self, g_strdup_printf(_(
1813                         "Error setting STORAGE-API to castor "
1814                         "This amanda is compiled with a too old libcurl, you must compile with libcurl 7.19.1 or newer")),
1815                     DEVICE_STATUS_DEVICE_ERROR);
1816         return FALSE;
1817 #endif
1818     } else {
1819         g_debug("Invalid STORAGE_API, using \"S3\".");
1820         self->s3_api = S3_API_S3;
1821     }
1822
1823     return device_simple_property_set_fn(p_self, base, val, surety, source);
1824 }
1825
1826 static gboolean
1827 s3_device_set_openstack_swift_api_fn(Device *p_self, DevicePropertyBase *base,
1828     GValue *val, PropertySurety surety, PropertySource source)
1829 {
1830
1831     const gboolean openstack_swift_api = g_value_get_boolean(val);
1832     if (openstack_swift_api) {
1833         GValue storage_api_val;
1834         g_value_init(&storage_api_val, G_TYPE_STRING);
1835         g_value_set_static_string(&storage_api_val, "SWIFT-1.0");
1836         return s3_device_set_storage_api(p_self, base, &storage_api_val,
1837                                          surety, source);
1838     }
1839     return TRUE;
1840 }
1841
1842 static gboolean
1843 s3_device_set_s3_multi_delete_fn(Device *p_self,
1844     DevicePropertyBase *base, GValue *val,
1845     PropertySurety surety, PropertySource source)
1846 {
1847     S3Device *self = S3_DEVICE(p_self);
1848
1849     self->use_s3_multi_delete = g_value_get_boolean(val);
1850
1851     return device_simple_property_set_fn(p_self, base, val, surety, source);
1852 }
1853
1854 static gboolean
1855 s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base,
1856     GValue *val, PropertySurety surety, PropertySource source)
1857 {
1858     S3Device *self = S3_DEVICE(p_self);
1859     gboolean new_val;
1860     int      thread;
1861
1862     new_val = g_value_get_boolean(val);
1863     /* Our S3 handle may not yet have been instantiated; if so, it will
1864      * get the proper use_ssl setting when it is created */
1865     if (self->s3t) {
1866         for (thread = 0; thread < self->nb_threads; thread++) {
1867             if (self->s3t[thread].s3 && !s3_use_ssl(self->s3t[thread].s3, new_val)) {
1868                 device_set_error(p_self, g_strdup_printf(_(
1869                         "Error setting S3 SSL/TLS use "
1870                         "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
1871                     DEVICE_STATUS_DEVICE_ERROR);
1872                 return FALSE;
1873             }
1874         }
1875     }
1876     self->use_ssl = new_val;
1877
1878     return device_simple_property_set_fn(p_self, base, val, surety, source);
1879 }
1880
1881 static gboolean
1882 s3_device_set_reuse_connection_fn(Device *p_self, DevicePropertyBase *base,
1883     GValue *val, PropertySurety surety, PropertySource source)
1884 {
1885     S3Device *self = S3_DEVICE(p_self);
1886
1887     self->reuse_connection = g_value_get_boolean(val);
1888
1889     return device_simple_property_set_fn(p_self, base, val, surety, source);
1890 }
1891
1892 static gboolean
1893 s3_device_set_max_send_speed_fn(Device *p_self,
1894     DevicePropertyBase *base, GValue *val,
1895     PropertySurety surety, PropertySource source)
1896 {
1897     S3Device *self = S3_DEVICE(p_self);
1898     guint64 new_val;
1899     int     thread;
1900
1901     new_val = g_value_get_uint64(val);
1902     if (self->s3t) {
1903         for (thread = 0; thread < self->nb_threads; thread++) {
1904             if (self->s3t[thread].s3 && !s3_set_max_send_speed(self->s3t[thread].s3, new_val)) {
1905                 device_set_error(p_self,
1906                         g_strdup("Could not set S3 maximum send speed"),
1907                         DEVICE_STATUS_DEVICE_ERROR);
1908                 return FALSE;
1909             }
1910         }
1911     }
1912     self->max_send_speed = new_val;
1913
1914     return device_simple_property_set_fn(p_self, base, val, surety, source);
1915 }
1916
1917 static gboolean
1918 s3_device_set_max_recv_speed_fn(Device *p_self,
1919     DevicePropertyBase *base, GValue *val,
1920     PropertySurety surety, PropertySource source)
1921 {
1922     S3Device *self = S3_DEVICE(p_self);
1923     guint64 new_val;
1924     int     thread;
1925
1926     new_val = g_value_get_uint64(val);
1927     if (self->s3t) {
1928         for (thread = 0; thread < self->nb_threads; thread++) {
1929             if (self->s3t[thread].s3 &&
1930                 !s3_set_max_recv_speed(self->s3t[thread].s3, new_val)) {
1931                 device_set_error(p_self,
1932                         g_strdup("Could not set S3 maximum recv speed"),
1933                         DEVICE_STATUS_DEVICE_ERROR);
1934                 return FALSE;
1935             }
1936         }
1937     }
1938     self->max_recv_speed = new_val;
1939
1940     return device_simple_property_set_fn(p_self, base, val, surety, source);
1941 }
1942
1943 static gboolean
1944 s3_device_set_nb_threads_backup(Device *p_self,
1945     DevicePropertyBase *base, GValue *val,
1946     PropertySurety surety, PropertySource source)
1947 {
1948     S3Device *self = S3_DEVICE(p_self);
1949     guint64 new_val;
1950
1951     new_val = g_value_get_uint64(val);
1952     self->nb_threads_backup = new_val;
1953     if (self->nb_threads_backup > self->nb_threads) {
1954         self->nb_threads = self->nb_threads_backup;
1955     }
1956
1957     return device_simple_property_set_fn(p_self, base, val, surety, source);
1958 }
1959
1960 static gboolean
1961 s3_device_set_nb_threads_recovery(Device *p_self,
1962     DevicePropertyBase *base, GValue *val,
1963     PropertySurety surety, PropertySource source)
1964 {
1965     S3Device *self = S3_DEVICE(p_self);
1966     guint64 new_val;
1967
1968     new_val = g_value_get_uint64(val);
1969     self->nb_threads_recovery = new_val;
1970     if (self->nb_threads_recovery > self->nb_threads) {
1971         self->nb_threads = self->nb_threads_recovery;
1972     }
1973
1974     return device_simple_property_set_fn(p_self, base, val, surety, source);
1975 }
1976
1977 static gboolean
1978 s3_device_set_max_volume_usage_fn(Device *p_self,
1979     DevicePropertyBase *base, GValue *val,
1980     PropertySurety surety, PropertySource source)
1981 {
1982     S3Device *self = S3_DEVICE(p_self);
1983
1984     self->volume_limit = g_value_get_uint64(val);
1985
1986     return device_simple_property_set_fn(p_self, base, val, surety, source);
1987
1988 }
1989
1990 static gboolean
1991 s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
1992     DevicePropertyBase *base, GValue *val,
1993     PropertySurety surety, PropertySource source)
1994 {
1995     S3Device *self = S3_DEVICE(p_self);
1996
1997     self->enforce_volume_limit = g_value_get_boolean(val);
1998
1999     return device_simple_property_set_fn(p_self, base, val, surety, source);
2000
2001 }
2002
2003 static gboolean
2004 s3_device_set_use_subdomain_fn(Device *p_self,
2005     DevicePropertyBase *base, GValue *val,
2006     PropertySurety surety, PropertySource source)
2007 {
2008     S3Device *self = S3_DEVICE(p_self);
2009
2010     self->use_subdomain = g_value_get_boolean(val);
2011
2012     return device_simple_property_set_fn(p_self, base, val, surety, source);
2013 }
2014
2015 static gboolean
2016 property_set_leom_fn(Device *p_self,
2017     DevicePropertyBase *base, GValue *val,
2018     PropertySurety surety, PropertySource source)
2019 {
2020     S3Device *self = S3_DEVICE(p_self);
2021
2022     self->leom = g_value_get_boolean(val);
2023
2024     return device_simple_property_set_fn(p_self, base, val, surety, source);
2025 }
2026
2027 static gboolean
2028 s3_device_set_client_id_fn(Device *p_self,
2029     DevicePropertyBase *base, GValue *val,
2030     PropertySurety surety, PropertySource source)
2031 {
2032     S3Device *self = S3_DEVICE(p_self);
2033
2034     amfree(self->client_id);
2035     self->client_id = g_value_dup_string(val);
2036
2037     return device_simple_property_set_fn(p_self, base, val, surety, source);
2038 }
2039
2040 static gboolean
2041 s3_device_set_client_secret_fn(Device *p_self,
2042     DevicePropertyBase *base, GValue *val,
2043     PropertySurety surety, PropertySource source)
2044 {
2045     S3Device *self = S3_DEVICE(p_self);
2046
2047     amfree(self->client_secret);
2048     self->client_secret = g_value_dup_string(val);
2049
2050     return device_simple_property_set_fn(p_self, base, val, surety, source);
2051 }
2052
2053 static gboolean
2054 s3_device_set_refresh_token_fn(Device *p_self,
2055     DevicePropertyBase *base, GValue *val,
2056     PropertySurety surety, PropertySource source)
2057 {
2058     S3Device *self = S3_DEVICE(p_self);
2059
2060     amfree(self->refresh_token);
2061     self->refresh_token = g_value_dup_string(val);
2062
2063     return device_simple_property_set_fn(p_self, base, val, surety, source);
2064 }
2065
2066 static gboolean
2067 s3_device_set_project_id_fn(Device *p_self,
2068     DevicePropertyBase *base, GValue *val,
2069     PropertySurety surety, PropertySource source)
2070 {
2071     S3Device *self = S3_DEVICE(p_self);
2072
2073     amfree(self->project_id);
2074     self->project_id = g_value_dup_string(val);
2075
2076     return device_simple_property_set_fn(p_self, base, val, surety, source);
2077 }
2078
2079 static gboolean
2080 s3_device_set_reps_fn(Device *p_self, DevicePropertyBase *base,
2081     GValue *val, PropertySurety surety, PropertySource source)
2082 {
2083     S3Device *self = S3_DEVICE(p_self);
2084
2085     amfree(self->reps);
2086     self->reps = g_value_dup_string(val);
2087     device_clear_volume_details(p_self);
2088
2089     return device_simple_property_set_fn(p_self, base, val, surety, source);
2090 }
2091
2092 static gboolean
2093 s3_device_set_reps_bucket_fn(Device *p_self, DevicePropertyBase *base,
2094     GValue *val, PropertySurety surety, PropertySource source)
2095 {
2096     S3Device *self = S3_DEVICE(p_self);
2097
2098     amfree(self->reps_bucket);
2099     self->reps_bucket = g_value_dup_string(val);
2100     device_clear_volume_details(p_self);
2101
2102     return device_simple_property_set_fn(p_self, base, val, surety, source);
2103 }
2104
2105 static Device*
2106 s3_device_factory(char * device_name, char * device_type, char * device_node)
2107 {
2108     Device *rval;
2109     g_assert(0 == strcmp(device_type, S3_DEVICE_NAME));
2110     rval = DEVICE(g_object_new(TYPE_S3_DEVICE, NULL));
2111
2112     device_open_device(rval, device_name, device_type, device_node);
2113     return rval;
2114 }
2115
2116 /*
2117  * Virtual function overrides
2118  */
2119
2120 static void
2121 s3_device_open_device(Device *pself, char *device_name,
2122                         char * device_type, char * device_node)
2123 {
2124     S3Device *self = S3_DEVICE(pself);
2125     char * name_colon;
2126     GValue tmp_value;
2127
2128     pself->min_block_size = S3_DEVICE_MIN_BLOCK_SIZE;
2129     pself->max_block_size = S3_DEVICE_MAX_BLOCK_SIZE;
2130     pself->block_size = S3_DEVICE_DEFAULT_BLOCK_SIZE;
2131
2132     /* Device name may be bucket/prefix, to support multiple volumes in a
2133      * single bucket. */
2134     name_colon = strchr(device_node, '/');
2135     if (name_colon == NULL) {
2136         self->bucket = g_strdup(device_node);
2137         self->prefix = g_strdup("");
2138     } else {
2139         self->bucket = g_strndup(device_node, name_colon - device_node);
2140         self->prefix = g_strdup(name_colon + 1);
2141     }
2142
2143     if (self->bucket == NULL || self->bucket[0] == '\0') {
2144         device_set_error(pself,
2145             vstrallocf(_("Empty bucket name in device %s"), device_name),
2146             DEVICE_STATUS_DEVICE_ERROR);
2147         amfree(self->bucket);
2148         amfree(self->prefix);
2149         return;
2150     }
2151
2152     if (self->reps == NULL) {
2153         self->reps = g_strdup(S3_DEVICE_REPS_DEFAULT);
2154     }
2155
2156     if (self->reps_bucket == NULL) {
2157         self->reps_bucket = g_strdup(S3_DEVICE_REPS_BUCKET_DEFAULT);
2158     }
2159
2160     g_debug(_("S3 driver using bucket '%s', prefix '%s'"), self->bucket, self->prefix);
2161
2162     /* default values */
2163     self->verbose = FALSE;
2164     self->s3_api = S3_API_S3;
2165
2166     /* use SSL if available */
2167     self->use_ssl = s3_curl_supports_ssl();
2168     bzero(&tmp_value, sizeof(GValue));
2169     g_value_init(&tmp_value, G_TYPE_BOOLEAN);
2170     g_value_set_boolean(&tmp_value, self->use_ssl);
2171     device_set_simple_property(pself, device_property_s3_ssl.ID,
2172         &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
2173
2174     /* reuse connection */
2175     self->reuse_connection = TRUE;
2176     bzero(&tmp_value, sizeof(GValue));
2177     g_value_init(&tmp_value, G_TYPE_BOOLEAN);
2178     g_value_set_boolean(&tmp_value, self->reuse_connection);
2179     device_set_simple_property(pself, device_property_reuse_connection.ID,
2180         &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
2181
2182     /* Set default create_bucket */
2183     self->create_bucket = TRUE;
2184     bzero(&tmp_value, sizeof(GValue));
2185     g_value_init(&tmp_value, G_TYPE_BOOLEAN);
2186     g_value_set_boolean(&tmp_value, self->create_bucket);
2187     device_set_simple_property(pself, device_property_create_bucket.ID,
2188         &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
2189
2190     if (parent_class->open_device) {
2191         parent_class->open_device(pself, device_name, device_type, device_node);
2192     }
2193 }
2194
2195 static void s3_device_finalize(GObject * obj_self) {
2196     S3Device *self = S3_DEVICE (obj_self);
2197     int thread;
2198
2199     if(G_OBJECT_CLASS(parent_class)->finalize)
2200         (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
2201
2202     if (self->thread_pool_delete) {
2203         g_thread_pool_free(self->thread_pool_delete, 1, 1);
2204         self->thread_pool_delete = NULL;
2205     }
2206     if (self->thread_pool_write) {
2207         g_thread_pool_free(self->thread_pool_write, 1, 1);
2208         self->thread_pool_write = NULL;
2209     }
2210     if (self->thread_pool_read) {
2211         g_thread_pool_free(self->thread_pool_read, 1, 1);
2212         self->thread_pool_read = NULL;
2213     }
2214     if (self->thread_idle_mutex) {
2215         g_mutex_free(self->thread_idle_mutex);
2216         self->thread_idle_mutex = NULL;
2217     }
2218     if (self->thread_idle_cond) {
2219         g_cond_free(self->thread_idle_cond);
2220         self->thread_idle_cond = NULL;
2221     }
2222     if (self->s3t) {
2223         for (thread = 0; thread < self->nb_threads; thread++) {
2224             g_mutex_free(self->s3t[thread].now_mutex);
2225             if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3);
2226             g_free(self->s3t[thread].curl_buffer.buffer);
2227         }
2228         g_free(self->s3t);
2229     }
2230     if(self->bucket) g_free(self->bucket);
2231     if(self->prefix) g_free(self->prefix);
2232     if(self->access_key) g_free(self->access_key);
2233     if(self->secret_key) g_free(self->secret_key);
2234     if(self->swift_account_id) g_free(self->swift_account_id);
2235     if(self->swift_access_key) g_free(self->swift_access_key);
2236     if(self->username) g_free(self->username);
2237     if(self->password) g_free(self->password);
2238     if(self->tenant_id) g_free(self->tenant_id);
2239     if(self->tenant_name) g_free(self->tenant_name);
2240     if(self->host) g_free(self->host);
2241     if(self->service_path) g_free(self->service_path);
2242     if(self->user_token) g_free(self->user_token);
2243     if(self->bucket_location) g_free(self->bucket_location);
2244     if(self->storage_class) g_free(self->storage_class);
2245     if(self->server_side_encryption) g_free(self->server_side_encryption);
2246     if(self->proxy) g_free(self->proxy);
2247     if(self->ca_info) g_free(self->ca_info);
2248     if(self->reps) g_free(self->reps);
2249     if(self->reps_bucket) g_free(self->reps_bucket);
2250 }
2251
2252 static gboolean setup_handle(S3Device * self) {
2253     Device *d_self = DEVICE(self);
2254     int thread;
2255     guint response_code;
2256     s3_error_code_t s3_error_code;
2257     CURLcode curl_code;
2258
2259     if (self->s3t == NULL) {
2260         if (self->s3_api == S3_API_S3) {
2261             if (self->access_key == NULL || self->access_key[0] == '\0') {
2262                 device_set_error(d_self,
2263                     g_strdup(_("No Amazon access key specified")),
2264                     DEVICE_STATUS_DEVICE_ERROR);
2265                 return FALSE;
2266             }
2267
2268             if (self->secret_key == NULL || self->secret_key[0] == '\0') {
2269                 device_set_error(d_self,
2270                     g_strdup(_("No Amazon secret key specified")),
2271                     DEVICE_STATUS_DEVICE_ERROR);
2272                 return FALSE;
2273             }
2274         } else if (self->s3_api == S3_API_SWIFT_1) {
2275             if (self->swift_account_id == NULL ||
2276                 self->swift_account_id[0] == '\0') {
2277                 device_set_error(d_self,
2278                     g_strdup(_("No Swift account id specified")),
2279                     DEVICE_STATUS_DEVICE_ERROR);
2280                 return FALSE;
2281             }
2282             if (self->swift_access_key == NULL ||
2283                 self->swift_access_key[0] == '\0') {
2284                 device_set_error(d_self,
2285                     g_strdup(_("No Swift access key specified")),
2286                     DEVICE_STATUS_DEVICE_ERROR);
2287                 return FALSE;
2288             }
2289         } else if (self->s3_api == S3_API_SWIFT_2) {
2290             if (!((self->username && self->password && self->tenant_id) ||
2291                   (self->username && self->password && self->tenant_name) ||
2292                   (self->access_key && self->secret_key && self->tenant_id) ||
2293                   (self->access_key && self->secret_key && self->tenant_name))) {
2294                 device_set_error(d_self,
2295                     g_strdup(_("Missing authorization properties")),
2296                     DEVICE_STATUS_DEVICE_ERROR);
2297                 return FALSE;
2298             }
2299         } else if (self->s3_api == S3_API_OAUTH2) {
2300             if (self->client_id == NULL ||
2301                 self->client_id[0] == '\0') {
2302                 device_set_error(d_self,
2303                     g_strdup(_("Missing client_id properties")),
2304                     DEVICE_STATUS_DEVICE_ERROR);
2305                 return FALSE;
2306             }
2307             if (self->client_secret == NULL ||
2308                 self->client_secret[0] == '\0') {
2309                 device_set_error(d_self,
2310                     g_strdup(_("Missing client_secret properties")),
2311                     DEVICE_STATUS_DEVICE_ERROR);
2312                 return FALSE;
2313             }
2314             if (self->refresh_token == NULL ||
2315                 self->refresh_token[0] == '\0') {
2316                 device_set_error(d_self,
2317                     g_strdup(_("Missing refresh_token properties")),
2318                     DEVICE_STATUS_DEVICE_ERROR);
2319                 return FALSE;
2320             }
2321             if (self->project_id == NULL ||
2322                 self->project_id[0] == '\0') {
2323                 device_set_error(d_self,
2324                     g_strdup(_("Missing project_id properties")),
2325                     DEVICE_STATUS_DEVICE_ERROR);
2326                 return FALSE;
2327             }
2328         } else if (self->s3_api == S3_API_CASTOR) {
2329             self->use_s3_multi_delete = 0;
2330             self->use_subdomain = FALSE;
2331             if(self->service_path) {
2332                 g_free(self->service_path);
2333                 self->service_path = NULL;
2334             }
2335         }
2336
2337         self->s3t = g_new0(S3_by_thread, self->nb_threads);
2338         if (self->s3t == NULL) {
2339             device_set_error(d_self,
2340                 g_strdup(_("Can't allocate S3Handle array")),
2341                 DEVICE_STATUS_DEVICE_ERROR);
2342             return FALSE;
2343         }
2344
2345         self->thread_idle_cond = g_cond_new();
2346         self->thread_idle_mutex = g_mutex_new();
2347
2348         for (thread = 0; thread < self->nb_threads; thread++) {
2349             self->s3t[thread].idle = 1;
2350             self->s3t[thread].done = 1;
2351             self->s3t[thread].eof = FALSE;
2352             self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
2353             self->s3t[thread].errmsg = NULL;
2354             self->s3t[thread].filename = NULL;
2355             self->s3t[thread].curl_buffer.buffer = NULL;
2356             self->s3t[thread].curl_buffer.buffer_len = 0;
2357             self->s3t[thread].now_mutex = g_mutex_new();
2358             self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key,
2359                                            self->swift_account_id,
2360                                            self->swift_access_key,
2361                                            self->host, self->service_path,
2362                                            self->use_subdomain,
2363                                            self->user_token, self->bucket_location,
2364                                            self->storage_class, self->ca_info,
2365                                            self->server_side_encryption,
2366                                            self->proxy,
2367                                            self->s3_api,
2368                                            self->username,
2369                                            self->password,
2370                                            self->tenant_id,
2371                                            self->tenant_name,
2372                                            self->client_id,
2373                                            self->client_secret,
2374                                            self->refresh_token,
2375                                            self->reuse_connection,
2376                                            self->reps, self->reps_bucket);
2377             if (self->s3t[thread].s3 == NULL) {
2378                 device_set_error(d_self,
2379                     stralloc(_("Internal error creating S3 handle")),
2380                     DEVICE_STATUS_DEVICE_ERROR);
2381                 self->nb_threads = thread+1;
2382                 return FALSE;
2383             }
2384         }
2385
2386         g_debug("Create %d threads", self->nb_threads);
2387         self->thread_pool_delete = g_thread_pool_new(s3_thread_delete_block,
2388                                                      self, self->nb_threads, 0,
2389                                                      NULL);
2390         self->thread_pool_write = g_thread_pool_new(s3_thread_write_block, self,
2391                                               self->nb_threads, 0, NULL);
2392         self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self,
2393                                               self->nb_threads, 0, NULL);
2394
2395         for (thread = 0; thread < self->nb_threads; thread++) {
2396             s3_verbose(self->s3t[thread].s3, self->verbose);
2397
2398             if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
2399                 device_set_error(d_self, g_strdup_printf(_(
2400                         "Error setting S3 SSL/TLS use "
2401                         "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
2402                         DEVICE_STATUS_DEVICE_ERROR);
2403                 return FALSE;
2404             }
2405
2406             if (self->max_send_speed &&
2407                 !s3_set_max_send_speed(self->s3t[thread].s3,
2408                                        self->max_send_speed)) {
2409                 device_set_error(d_self,
2410                         g_strdup("Could not set S3 maximum send speed"),
2411                         DEVICE_STATUS_DEVICE_ERROR);
2412                 return FALSE;
2413             }
2414
2415             if (self->max_recv_speed &&
2416                 !s3_set_max_recv_speed(self->s3t[thread].s3,
2417                                        self->max_recv_speed)) {
2418                 device_set_error(d_self,
2419                         g_strdup("Could not set S3 maximum recv speed"),
2420                         DEVICE_STATUS_DEVICE_ERROR);
2421                 return FALSE;
2422             }
2423         }
2424
2425         for (thread = 0; thread < self->nb_threads; thread++) {
2426             if (!s3_open2(self->s3t[thread].s3)) {
2427                 if (self->s3_api == S3_API_SWIFT_1 ||
2428                     self->s3_api == S3_API_SWIFT_2) {
2429                     s3_error(self->s3t[0].s3, NULL, &response_code,
2430                              &s3_error_code, NULL, &curl_code, NULL);
2431                     device_set_error(d_self,
2432                             g_strdup_printf(_("s3_open2 failed: %s"),
2433                                             s3_strerror(self->s3t[0].s3)),
2434                             DEVICE_STATUS_DEVICE_ERROR);
2435                     self->nb_threads = thread+1;
2436                     return FALSE;
2437                 } else {
2438                     device_set_error(d_self,
2439                                      g_strdup("s3_open2 failed"),
2440                                      DEVICE_STATUS_DEVICE_ERROR);
2441                     return FALSE;
2442                 }
2443             }
2444         }
2445     }
2446
2447     return TRUE;
2448 }
2449
2450 static gboolean
2451 make_bucket(
2452     Device * pself)
2453 {
2454     S3Device *self = S3_DEVICE(pself);
2455     guint response_code;
2456     s3_error_code_t s3_error_code;
2457     CURLcode curl_code;
2458
2459     if (s3_is_bucket_exists(self->s3t[0].s3, self->bucket, self->project_id)) {
2460         return TRUE;
2461     }
2462
2463     s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, &curl_code, NULL);
2464
2465     if (response_code == 0 && s3_error_code == 0 &&
2466         (curl_code == CURLE_COULDNT_CONNECT ||
2467          curl_code == CURLE_COULDNT_RESOLVE_HOST)) {
2468         device_set_error(pself,
2469             g_strdup_printf(_("While connecting to S3 bucket: %s"),
2470                             s3_strerror(self->s3t[0].s3)),
2471                 DEVICE_STATUS_DEVICE_ERROR);
2472         return FALSE;
2473     }
2474
2475     if (!self->create_bucket) {
2476         device_set_error(pself,
2477             g_strdup_printf(_("Can't list bucket: %s"),
2478                             s3_strerror(self->s3t[0].s3)),
2479                 DEVICE_STATUS_DEVICE_ERROR);
2480         return FALSE;
2481     }
2482
2483     if (!s3_make_bucket(self->s3t[0].s3, self->bucket, self->project_id)) {
2484         s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2485
2486         /* if it isn't an expected error (bucket already exists),
2487          * return FALSE */
2488         if (response_code != 409 ||
2489             (s3_error_code != S3_ERROR_BucketAlreadyExists &&
2490              s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) {
2491             device_set_error(pself,
2492                 g_strdup_printf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)),
2493                 DEVICE_STATUS_DEVICE_ERROR);
2494             return FALSE;
2495         }
2496     }
2497     return TRUE;
2498 }
2499
2500 static int progress_func(
2501     void *thread_data,
2502     double dltotal G_GNUC_UNUSED,
2503     double dlnow,
2504     double ultotal G_GNUC_UNUSED,
2505     double ulnow)
2506 {
2507     S3_by_thread *s3t = (S3_by_thread *)thread_data;
2508
2509     g_mutex_lock(s3t->now_mutex);
2510     s3t->dlnow = dlnow;
2511     s3t->ulnow = ulnow;
2512     g_mutex_unlock(s3t->now_mutex);
2513
2514     return 0;
2515 }
2516
2517 static DeviceStatusFlags
2518 s3_device_read_label(Device *pself) {
2519     S3Device *self = S3_DEVICE(pself);
2520     char *key;
2521     CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
2522     dumpfile_t *amanda_header;
2523     /* note that this may be called from s3_device_start, when
2524      * self->access_mode is not ACCESS_NULL */
2525
2526     amfree(pself->volume_label);
2527     amfree(pself->volume_time);
2528     dumpfile_free(pself->volume_header);
2529     pself->volume_header = NULL;
2530
2531     if (device_in_error(self)) return pself->status;
2532
2533     if (!setup_handle(self)) {
2534         /* setup_handle already set our error message */
2535         return pself->status;
2536     }
2537     reset_thread(self);
2538
2539     key = special_file_to_key(self, "tapestart", -1);
2540
2541     if (!make_bucket(pself)) {
2542         return pself->status;
2543     }
2544
2545     if (!s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) {
2546         guint response_code;
2547         s3_error_code_t s3_error_code;
2548         s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2549
2550         /* if it's an expected error (not found), just return FALSE */
2551         if (response_code == 404 &&
2552              (s3_error_code == S3_ERROR_None ||
2553               s3_error_code == S3_ERROR_Unknown ||
2554               s3_error_code == S3_ERROR_NoSuchKey ||
2555               s3_error_code == S3_ERROR_NoSuchEntity ||
2556               s3_error_code == S3_ERROR_NoSuchBucket)) {
2557             g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)"));
2558             device_set_error(pself,
2559                 stralloc(_("Amanda header not found -- unlabeled volume?")),
2560                   DEVICE_STATUS_DEVICE_ERROR
2561                 | DEVICE_STATUS_VOLUME_ERROR
2562                 | DEVICE_STATUS_VOLUME_UNLABELED);
2563             return pself->status;
2564         }
2565
2566         /* otherwise, log it and return */
2567         device_set_error(pself,
2568             vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3t[0].s3)),
2569             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
2570         return pself->status;
2571     }
2572
2573     /* handle an empty file gracefully */
2574     if (buf.buffer_len == 0) {
2575         device_set_error(pself, stralloc(_("Empty header file")), DEVICE_STATUS_VOLUME_ERROR);
2576         return pself->status;
2577     }
2578
2579     pself->header_block_size = buf.buffer_len;
2580     g_assert(buf.buffer != NULL);
2581     amanda_header = g_new(dumpfile_t, 1);
2582     parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
2583     pself->volume_header = amanda_header;
2584     g_free(buf.buffer);
2585
2586     if (amanda_header->type != F_TAPESTART) {
2587         device_set_error(pself, stralloc(_("Invalid amanda header")), DEVICE_STATUS_VOLUME_ERROR);
2588         return pself->status;
2589     }
2590
2591     pself->volume_label = g_strdup(amanda_header->name);
2592     pself->volume_time = g_strdup(amanda_header->datestamp);
2593     /* pself->volume_header is already set */
2594
2595     device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
2596
2597     return pself->status;
2598 }
2599
2600 static gboolean
2601 s3_device_start (Device * pself, DeviceAccessMode mode,
2602                  char * label, char * timestamp) {
2603     S3Device * self;
2604     GSList *keys;
2605     guint64 total_size = 0;
2606     gboolean result;
2607
2608     self = S3_DEVICE(pself);
2609
2610     if (device_in_error(self)) return FALSE;
2611
2612     if (!setup_handle(self)) {
2613         /* setup_handle already set our error message */
2614         return FALSE;
2615     }
2616
2617     reset_thread(self);
2618     pself->access_mode = mode;
2619     g_mutex_lock(pself->device_mutex);
2620     pself->in_file = FALSE;
2621     g_mutex_unlock(pself->device_mutex);
2622
2623     /* try creating the bucket, in case it doesn't exist */
2624     if (!make_bucket(pself)) {
2625         return FALSE;
2626     }
2627
2628     /* take care of any dirty work for this mode */
2629     switch (mode) {
2630         case ACCESS_READ:
2631             if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
2632                 /* s3_device_read_label already set our error message */
2633                 return FALSE;
2634             }
2635             break;
2636
2637         case ACCESS_WRITE:
2638             if (!delete_all_files(self)) {
2639                 return FALSE;
2640             }
2641
2642             /* write a new amanda header */
2643             if (!write_amanda_header(self, label, timestamp)) {
2644                 return FALSE;
2645             }
2646
2647             pself->volume_label = newstralloc(pself->volume_label, label);
2648             pself->volume_time = newstralloc(pself->volume_time, timestamp);
2649
2650             /* unset the VOLUME_UNLABELED flag, if it was set */
2651             device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
2652             break;
2653
2654         case ACCESS_APPEND:
2655             if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
2656                 /* s3_device_read_label already set our error message */
2657                 return FALSE;
2658             } else {
2659                 result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, NULL, &keys, &total_size);
2660                 if(!result) {
2661                     device_set_error(pself,
2662                                  vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
2663                                  DEVICE_STATUS_DEVICE_ERROR|DEVICE_STATUS_VOLUME_ERROR);
2664                     return FALSE;
2665                 } else {
2666                     self->volume_bytes = total_size;
2667                 }
2668             }
2669             return seek_to_end(self);
2670             break;
2671
2672         case ACCESS_NULL:
2673             g_assert_not_reached();
2674     }
2675
2676     return TRUE;
2677 }
2678
2679 static gboolean
2680 s3_device_finish (
2681     Device * pself)
2682 {
2683     S3Device *self = S3_DEVICE(pself);
2684
2685     reset_thread(self);
2686
2687     /* we're not in a file anymore */
2688     pself->access_mode = ACCESS_NULL;
2689
2690     if (device_in_error(pself)) return FALSE;
2691
2692     return TRUE;
2693 }
2694
2695 /* functions for writing */
2696
2697
2698 static guint64
2699 s3_device_get_bytes_read(
2700     Device * pself)
2701 {
2702     S3Device *self = S3_DEVICE(pself);
2703     int       thread;
2704     guint64   dltotal;
2705
2706     g_mutex_unlock(pself->device_mutex);
2707     /* Add per thread */
2708     g_mutex_lock(self->thread_idle_mutex);
2709     dltotal = self->dltotal;
2710     for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2711         g_mutex_lock(self->s3t[thread].now_mutex);
2712         dltotal += self->s3t[thread].dlnow;
2713         g_mutex_unlock(self->s3t[thread].now_mutex);
2714     }
2715     g_mutex_unlock(self->thread_idle_mutex);
2716     g_mutex_lock(pself->device_mutex);
2717
2718     return dltotal;
2719 }
2720
2721
2722 static guint64
2723 s3_device_get_bytes_written(
2724     Device * pself)
2725 {
2726     S3Device *self = S3_DEVICE(pself);
2727     int       thread;
2728     guint64   ultotal;
2729
2730     g_mutex_unlock(pself->device_mutex);
2731     /* Add per thread */
2732     g_mutex_lock(self->thread_idle_mutex);
2733     ultotal = self->ultotal;
2734     for (thread = 0; thread < self->nb_threads_backup; thread++) {
2735         g_mutex_lock(self->s3t[thread].now_mutex);
2736         ultotal += self->s3t[thread].ulnow;
2737         g_mutex_unlock(self->s3t[thread].now_mutex);
2738     }
2739     g_mutex_unlock(self->thread_idle_mutex);
2740     g_mutex_lock(pself->device_mutex);
2741
2742     return ultotal;
2743 }
2744
2745
2746 static gboolean
2747 s3_device_start_file (Device *pself, dumpfile_t *jobInfo) {
2748     S3Device *self = S3_DEVICE(pself);
2749     CurlBuffer amanda_header = {NULL, 0, 0, 0};
2750     gboolean result;
2751     size_t header_size;
2752     char  *key;
2753     int    thread;
2754
2755     if (device_in_error(self)) return FALSE;
2756
2757     reset_thread(self);
2758     pself->is_eom = FALSE;
2759
2760     /* Set the blocksize to zero, since there's no header to skip (it's stored
2761      * in a distinct file, rather than block zero) */
2762     jobInfo->blocksize = 0;
2763
2764     /* Build the amanda header. */
2765     header_size = 0; /* no minimum size */
2766     amanda_header.buffer = device_build_amanda_header(pself, jobInfo,
2767         &header_size);
2768     if (amanda_header.buffer == NULL) {
2769         device_set_error(pself,
2770             stralloc(_("Amanda file header won't fit in a single block!")),
2771             DEVICE_STATUS_DEVICE_ERROR);
2772         return FALSE;
2773     }
2774     amanda_header.buffer_len = header_size;
2775
2776     if(check_at_leom(self, header_size))
2777         pself->is_eom = TRUE;
2778
2779     if(check_at_peom(self, header_size)) {
2780         pself->is_eom = TRUE;
2781         device_set_error(pself,
2782             stralloc(_("No space left on device")),
2783             DEVICE_STATUS_DEVICE_ERROR);
2784         g_free(amanda_header.buffer);
2785         return FALSE;
2786     }
2787
2788     for (thread = 0; thread < self->nb_threads; thread++)  {
2789         self->s3t[thread].idle = 1;
2790         self->s3t[thread].ulnow = 0;
2791     }
2792
2793     /* set the file and block numbers correctly */
2794     pself->file = (pself->file > 0)? pself->file+1 : 1;
2795     pself->block = 0;
2796     g_mutex_lock(pself->device_mutex);
2797     pself->in_file = TRUE;
2798     pself->bytes_written = 0;
2799     g_mutex_unlock(pself->device_mutex);
2800     g_mutex_lock(self->thread_idle_mutex);
2801     self->ultotal = 0;
2802     g_mutex_unlock(self->thread_idle_mutex);
2803     /* write it out as a special block (not the 0th) */
2804     key = special_file_to_key(self, "filestart", pself->file);
2805     result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
2806                        &amanda_header, NULL, NULL);
2807     g_free(amanda_header.buffer);
2808     g_free(key);
2809     if (!result) {
2810         device_set_error(pself,
2811             vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3t[0].s3)),
2812             DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
2813         return FALSE;
2814     }
2815
2816     self->volume_bytes += header_size;
2817
2818     return TRUE;
2819 }
2820
2821 static gboolean
2822 s3_device_write_block (Device * pself, guint size, gpointer data) {
2823     char *filename;
2824     S3Device * self = S3_DEVICE(pself);
2825     int idle_thread = 0;
2826     int thread = -1;
2827     int first_idle = -1;
2828
2829     g_assert (self != NULL);
2830     g_assert (data != NULL);
2831     if (device_in_error(self)) return FALSE;
2832
2833     if(check_at_leom(self, size))
2834         pself->is_eom = TRUE;
2835
2836     if(check_at_peom(self, size)) {
2837         pself->is_eom = TRUE;
2838         device_set_error(pself,
2839             stralloc(_("No space left on device")),
2840             DEVICE_STATUS_DEVICE_ERROR);
2841         return FALSE;
2842     }
2843
2844     filename = file_and_block_to_key(self, pself->file, pself->block);
2845
2846     g_mutex_lock(self->thread_idle_mutex);
2847     while (!idle_thread) {
2848         idle_thread = 0;
2849         for (thread = 0; thread < self->nb_threads_backup; thread++)  {
2850             if (self->s3t[thread].idle == 1) {
2851                 idle_thread++;
2852                 /* Check if the thread is in error */
2853                 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
2854                     device_set_error(pself, (char *)self->s3t[thread].errmsg,
2855                                      self->s3t[thread].errflags);
2856                     self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
2857                     self->s3t[thread].errmsg = NULL;
2858                     g_mutex_unlock(self->thread_idle_mutex);
2859                     return FALSE;
2860                 }
2861                 if (first_idle == -1) {
2862                     first_idle = thread;
2863                     break;
2864                 }
2865             }
2866         }
2867         if (!idle_thread) {
2868             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2869         }
2870     }
2871     thread = first_idle;
2872
2873     self->s3t[thread].idle = 0;
2874     self->s3t[thread].done = 0;
2875     if (self->s3t[thread].curl_buffer.buffer &&
2876         self->s3t[thread].curl_buffer.buffer_len < size) {
2877         g_free((char *)self->s3t[thread].curl_buffer.buffer);
2878         self->s3t[thread].curl_buffer.buffer = NULL;
2879         self->s3t[thread].curl_buffer.buffer_len = 0;
2880         self->s3t[thread].buffer_len = 0;
2881     }
2882     if (self->s3t[thread].curl_buffer.buffer == NULL) {
2883         self->s3t[thread].curl_buffer.buffer = g_malloc(size);
2884         self->s3t[thread].curl_buffer.buffer_len = size;
2885         self->s3t[thread].buffer_len = size;
2886     }
2887     memcpy((char *)self->s3t[thread].curl_buffer.buffer, data, size);
2888     self->s3t[thread].curl_buffer.buffer_pos = 0;
2889     self->s3t[thread].curl_buffer.buffer_len = size;
2890     self->s3t[thread].curl_buffer.max_buffer_size = 0;
2891     self->s3t[thread].filename = filename;
2892     g_thread_pool_push(self->thread_pool_write, &self->s3t[thread], NULL);
2893     g_mutex_unlock(self->thread_idle_mutex);
2894
2895     pself->block++;
2896     self->volume_bytes += size;
2897     return TRUE;
2898 }
2899
2900 static void
2901 s3_thread_write_block(
2902     gpointer thread_data,
2903     gpointer data)
2904 {
2905     S3_by_thread *s3t = (S3_by_thread *)thread_data;
2906     Device *pself = (Device *)data;
2907     S3Device *self = S3_DEVICE(pself);
2908     gboolean result;
2909
2910     result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename,
2911                        S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer,
2912                        progress_func, s3t);
2913     g_free((void *)s3t->filename);
2914     s3t->filename = NULL;
2915     if (!result) {
2916         s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
2917         s3t->errmsg = g_strdup_printf(_("While writing data block to S3: %s"), s3_strerror(s3t->s3));
2918     }
2919     g_mutex_lock(self->thread_idle_mutex);
2920     s3t->idle = 1;
2921     s3t->done = 1;
2922     if (result)
2923         self->ultotal += s3t->curl_buffer.buffer_len;
2924     s3t->curl_buffer.buffer_len = s3t->buffer_len;
2925     s3t->ulnow = 0;
2926     g_cond_broadcast(self->thread_idle_cond);
2927     g_mutex_unlock(self->thread_idle_mutex);
2928 }
2929
2930 static gboolean
2931 s3_device_finish_file (Device * pself) {
2932     S3Device *self = S3_DEVICE(pself);
2933
2934     /* Check all threads are done */
2935     int idle_thread = 0;
2936     int thread;
2937
2938     g_mutex_lock(self->thread_idle_mutex);
2939     while (idle_thread != self->nb_threads) {
2940         idle_thread = 0;
2941         for (thread = 0; thread < self->nb_threads; thread++)  {
2942             if (self->s3t[thread].idle == 1) {
2943                 idle_thread++;
2944             }
2945             /* check thread status */
2946             if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
2947                 device_set_error(pself, (char *)self->s3t[thread].errmsg,
2948                                  self->s3t[thread].errflags);
2949                 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
2950                 self->s3t[thread].errmsg = NULL;
2951             }
2952         }
2953         if (idle_thread != self->nb_threads) {
2954             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2955         }
2956     }
2957     self->ultotal = 0;
2958     g_mutex_unlock(self->thread_idle_mutex);
2959
2960     if (device_in_error(pself)) return FALSE;
2961
2962     /* we're not in a file anymore */
2963     g_mutex_lock(pself->device_mutex);
2964     pself->in_file = FALSE;
2965     pself->bytes_written = 0;;
2966     g_mutex_unlock(pself->device_mutex);
2967
2968     return TRUE;
2969 }
2970
2971 static gboolean
2972 s3_device_recycle_file(Device *pself, guint file) {
2973     S3Device *self = S3_DEVICE(pself);
2974     if (device_in_error(self)) return FALSE;
2975
2976     reset_thread(self);
2977     delete_file(self, file);
2978     s3_wait_thread_delete(self);
2979     return !device_in_error(self);
2980     /* delete_file already set our error message if necessary */
2981 }
2982
2983 static gboolean
2984 s3_device_erase(Device *pself) {
2985     S3Device *self = S3_DEVICE(pself);
2986     char *key = NULL;
2987     const char *errmsg = NULL;
2988     guint response_code;
2989     s3_error_code_t s3_error_code;
2990
2991     if (!setup_handle(self)) {
2992         /* error set by setup_handle */
2993         return FALSE;
2994     }
2995
2996     reset_thread(self);
2997     key = special_file_to_key(self, "tapestart", -1);
2998     if (!s3_delete(self->s3t[0].s3, self->bucket, key)) {
2999         s3_error(self->s3t[0].s3, &errmsg, NULL, NULL, NULL, NULL, NULL);
3000         device_set_error(pself,
3001             stralloc(errmsg),
3002             DEVICE_STATUS_DEVICE_ERROR);
3003         return FALSE;
3004     }
3005     g_free(key);
3006
3007     dumpfile_free(pself->volume_header);
3008     pself->volume_header = NULL;
3009
3010     if (!delete_all_files(self))
3011         return FALSE;
3012
3013     device_set_error(pself, g_strdup("Unlabeled volume"),
3014                      DEVICE_STATUS_VOLUME_UNLABELED);
3015
3016     if (self->create_bucket &&
3017         !s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
3018         s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
3019
3020         /*
3021          * ignore the error if the bucket isn't empty (there may be data from elsewhere)
3022          * or the bucket not existing (already deleted perhaps?)
3023          */
3024         if (!(
3025                 (response_code == 409 && s3_error_code == S3_ERROR_BucketNotEmpty) ||
3026                 (response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket))) {
3027
3028             device_set_error(pself,
3029                 stralloc(errmsg),
3030                 DEVICE_STATUS_DEVICE_ERROR);
3031             return FALSE;
3032         }
3033     }
3034     self->volume_bytes = 0;
3035     return TRUE;
3036 }
3037
3038 /* functions for reading */
3039
3040 static dumpfile_t*
3041 s3_device_seek_file(Device *pself, guint file) {
3042     S3Device *self = S3_DEVICE(pself);
3043     gboolean result;
3044     char *key;
3045     CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
3046     dumpfile_t *amanda_header;
3047     const char *errmsg = NULL;
3048     int thread;
3049
3050     if (device_in_error(self)) return NULL;
3051
3052     reset_thread(self);
3053
3054     pself->file = file;
3055     pself->is_eof = FALSE;
3056     pself->block = 0;
3057     g_mutex_lock(pself->device_mutex);
3058     pself->in_file = FALSE;
3059     pself->bytes_read = 0;
3060     g_mutex_unlock(pself->device_mutex);
3061     self->next_block_to_read = 0;
3062     g_mutex_lock(self->thread_idle_mutex);
3063     self->dltotal = 0;
3064     g_mutex_unlock(self->thread_idle_mutex);
3065
3066     /* read it in */
3067     key = special_file_to_key(self, "filestart", pself->file);
3068     result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS,
3069         &buf, NULL, NULL);
3070     g_free(key);
3071
3072     if (!result) {
3073         guint response_code;
3074         s3_error_code_t s3_error_code;
3075         s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL);
3076
3077         /* if it's an expected error (not found), check what to do. */
3078         if (response_code == 404 &&
3079             (s3_error_code == S3_ERROR_None ||
3080              s3_error_code == S3_ERROR_NoSuchKey ||
3081              s3_error_code == S3_ERROR_NoSuchEntity)) {
3082             int next_file;
3083             next_file = find_next_file(self, pself->file);
3084             if (next_file > 0) {
3085                 /* Note short-circut of dispatcher. */
3086                 return s3_device_seek_file(pself, next_file);
3087             } else if (next_file == 0) {
3088                 /* No next file. Check if we are one past the end. */
3089                 key = special_file_to_key(self, "filestart", pself->file - 1);
3090                 result = s3_read(self->s3t[0].s3, self->bucket, key,
3091                     S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL);
3092                 g_free(key);
3093                 if (result) {
3094                     /* pself->file, etc. are already correct */
3095                     return make_tapeend_header();
3096                 } else {
3097                     device_set_error(pself,
3098                         stralloc(_("Attempt to read past tape-end file")),
3099                         DEVICE_STATUS_SUCCESS);
3100                     return NULL;
3101                 }
3102             }
3103         } else {
3104             /* An unexpected error occured finding out if we are the last file. */
3105             device_set_error(pself,
3106                 stralloc(errmsg),
3107                 DEVICE_STATUS_DEVICE_ERROR);
3108             return NULL;
3109         }
3110     }
3111
3112     /* and make a dumpfile_t out of it */
3113     g_assert(buf.buffer != NULL);
3114     amanda_header = g_new(dumpfile_t, 1);
3115     fh_init(amanda_header);
3116     parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
3117     g_free(buf.buffer);
3118
3119     switch (amanda_header->type) {
3120         case F_DUMPFILE:
3121         case F_CONT_DUMPFILE:
3122         case F_SPLIT_DUMPFILE:
3123             break;
3124
3125         default:
3126             device_set_error(pself,
3127                 stralloc(_("Invalid amanda header while reading file header")),
3128                 DEVICE_STATUS_VOLUME_ERROR);
3129             g_free(amanda_header);
3130             return NULL;
3131     }
3132
3133     for (thread = 0; thread < self->nb_threads; thread++)  {
3134         self->s3t[thread].idle = 1;
3135         self->s3t[thread].eof = FALSE;
3136         self->s3t[thread].ulnow = 0;
3137     }
3138     g_mutex_lock(pself->device_mutex);
3139     pself->in_file = TRUE;
3140     g_mutex_unlock(pself->device_mutex);
3141     return amanda_header;
3142 }
3143
3144 static gboolean
3145 s3_device_seek_block(Device *pself, guint64 block) {
3146     S3Device * self = S3_DEVICE(pself);
3147     if (device_in_error(pself)) return FALSE;
3148
3149     reset_thread(self);
3150     pself->block = block;
3151     self->next_block_to_read = block;
3152     return TRUE;
3153 }
3154
3155 static int
3156 s3_device_read_block (Device * pself, gpointer data, int *size_req) {
3157     S3Device * self = S3_DEVICE(pself);
3158     char *key;
3159     int thread;
3160     int done = 0;
3161
3162     g_assert (self != NULL);
3163     if (device_in_error(self)) return -1;
3164
3165     g_mutex_lock(self->thread_idle_mutex);
3166     /* start a read ahead for each thread */
3167     for (thread = 0; thread < self->nb_threads_recovery; thread++) {
3168         S3_by_thread *s3t = &self->s3t[thread];
3169         if (s3t->idle) {
3170             key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
3171             s3t->filename = key;
3172             s3t->done = 0;
3173             s3t->idle = 0;
3174             s3t->eof = FALSE;
3175             s3t->dlnow = 0;
3176             s3t->ulnow = 0;
3177             s3t->errflags = DEVICE_STATUS_SUCCESS;
3178             if (self->s3t[thread].curl_buffer.buffer &&
3179                 (int)self->s3t[thread].curl_buffer.buffer_len < *size_req) {
3180                 g_free(self->s3t[thread].curl_buffer.buffer);
3181                 self->s3t[thread].curl_buffer.buffer = NULL;
3182                 self->s3t[thread].curl_buffer.buffer_len = 0;
3183                 self->s3t[thread].buffer_len = 0;
3184             }
3185             if (!self->s3t[thread].curl_buffer.buffer) {
3186                 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
3187                 self->s3t[thread].curl_buffer.buffer_len = *size_req;
3188                 self->s3t[thread].buffer_len = *size_req;
3189             }
3190             s3t->curl_buffer.buffer_pos = 0;
3191             s3t->curl_buffer.max_buffer_size = S3_DEVICE_MAX_BLOCK_SIZE;
3192             self->next_block_to_read++;
3193             g_thread_pool_push(self->thread_pool_read, s3t, NULL);
3194         }
3195     }
3196
3197     /* get the file*/
3198     key = file_and_block_to_key(self, pself->file, pself->block);
3199     g_assert(key != NULL);
3200     while (!done) {
3201         /* find which thread read the key */
3202         for (thread = 0; thread < self->nb_threads_recovery; thread++) {
3203             S3_by_thread *s3t;
3204             s3t = &self->s3t[thread];
3205             if (!s3t->idle &&
3206                 s3t->done &&
3207                 strcmp(key, (char *)s3t->filename) == 0) {
3208                 if (s3t->eof) {
3209                     /* return eof */
3210                     g_free(key);
3211                     pself->is_eof = TRUE;
3212                     g_mutex_lock(pself->device_mutex);
3213                     pself->in_file = FALSE;
3214                     g_mutex_unlock(pself->device_mutex);
3215                     device_set_error(pself, stralloc(_("EOF")),
3216                                      DEVICE_STATUS_SUCCESS);
3217                     g_mutex_unlock(self->thread_idle_mutex);
3218                     return -1;
3219                 } else if (s3t->errflags != DEVICE_STATUS_SUCCESS) {
3220                     /* return the error */
3221                     device_set_error(pself, (char *)s3t->errmsg, s3t->errflags);
3222                     g_free(key);
3223                     g_mutex_unlock(self->thread_idle_mutex);
3224                     return -1;
3225
3226                 } else if ((guint)*size_req >= s3t->curl_buffer.buffer_pos) {
3227                     /* return the buffer */
3228                     g_mutex_unlock(self->thread_idle_mutex);
3229                     memcpy(data, s3t->curl_buffer.buffer,
3230                                  s3t->curl_buffer.buffer_pos);
3231                     *size_req = s3t->curl_buffer.buffer_pos;
3232                     g_free(key);
3233                     s3t->idle = 1;
3234                     g_free((char *)s3t->filename);
3235                     pself->block++;
3236                     done = 1;
3237                     g_mutex_lock(self->thread_idle_mutex);
3238                     break;
3239                 } else { /* buffer not enough large */
3240                     *size_req = s3t->curl_buffer.buffer_len;
3241                     g_free(key);
3242                     g_mutex_unlock(self->thread_idle_mutex);
3243                     return 0;
3244                 }
3245             }
3246         }
3247         if (!done) {
3248             g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
3249         }
3250     }
3251
3252     /* start a read ahead for the thread */
3253     for (thread = 0; thread < self->nb_threads_recovery; thread++) {
3254         S3_by_thread *s3t = &self->s3t[thread];
3255         if (s3t->idle) {
3256             key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
3257             s3t->filename = key;
3258             s3t->done = 0;
3259             s3t->idle = 0;
3260             s3t->eof = FALSE;
3261             s3t->dlnow = 0;
3262             s3t->ulnow = 0;
3263             s3t->errflags = DEVICE_STATUS_SUCCESS;
3264             if (!self->s3t[thread].curl_buffer.buffer) {
3265                 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
3266                 self->s3t[thread].curl_buffer.buffer_len = *size_req;
3267             }
3268             s3t->curl_buffer.buffer_pos = 0;
3269             self->next_block_to_read++;
3270             g_thread_pool_push(self->thread_pool_read, s3t, NULL);
3271         }
3272     }
3273     g_mutex_unlock(self->thread_idle_mutex);
3274
3275     return *size_req;
3276
3277 }
3278
3279 static void
3280 s3_thread_read_block(
3281     gpointer thread_data,
3282     gpointer data)
3283 {
3284     S3_by_thread *s3t = (S3_by_thread *)thread_data;
3285     Device *pself = (Device *)data;
3286     S3Device *self = S3_DEVICE(pself);
3287     gboolean result;
3288
3289     result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename,
3290         S3_BUFFER_WRITE_FUNCS,
3291         (CurlBuffer *)&s3t->curl_buffer, progress_func, s3t);
3292
3293     g_mutex_lock(self->thread_idle_mutex);
3294     if (!result) {
3295         guint response_code;
3296         s3_error_code_t s3_error_code;
3297         s3_error(s3t->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
3298         /* if it's an expected error (not found), just return -1 */
3299         if (response_code == 404 &&
3300             (s3_error_code == S3_ERROR_None ||
3301              s3_error_code == S3_ERROR_Unknown ||
3302              s3_error_code == S3_ERROR_NoSuchKey ||
3303              s3_error_code == S3_ERROR_NoSuchEntity)) {
3304             s3t->eof = TRUE;
3305         } else {
3306
3307             /* otherwise, log it and return FALSE */
3308             s3t->errflags = DEVICE_STATUS_VOLUME_ERROR;
3309             s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"),
3310                                           s3_strerror(s3t->s3));
3311         }
3312     } else {
3313         self->dltotal += s3t->curl_buffer.buffer_len;
3314     }
3315     s3t->dlnow = 0;
3316     s3t->ulnow = 0;
3317     s3t->done = 1;
3318     g_cond_broadcast(self->thread_idle_cond);
3319     g_mutex_unlock(self->thread_idle_mutex);
3320
3321     return;
3322 }
3323
3324 static gboolean
3325 check_at_peom(S3Device *self, guint64 size)
3326 {
3327     if(self->enforce_volume_limit && (self->volume_limit > 0)) {
3328         guint64 newtotal = self->volume_bytes + size;
3329         if(newtotal > self->volume_limit) {
3330             return TRUE;
3331         }
3332     }
3333     return FALSE;
3334 }
3335
3336 static gboolean
3337 check_at_leom(S3Device *self, guint64 size)
3338 {
3339     guint64 block_size = DEVICE(self)->block_size;
3340     guint64 eom_warning_buffer = block_size *
3341                 (EOM_EARLY_WARNING_ZONE_BLOCKS + self->nb_threads);
3342
3343     if(!self->leom)
3344         return FALSE;
3345
3346     if(self->enforce_volume_limit && (self->volume_limit > 0)) {
3347         guint64 newtotal = self->volume_bytes + size + eom_warning_buffer;
3348         if(newtotal > self->volume_limit) {
3349            return TRUE;
3350         }
3351     }
3352     return FALSE;
3353 }
3354
3355 static void
3356 reset_thread(
3357     S3Device *self)
3358 {
3359     int thread;
3360     int nb_done = 0;
3361
3362     if (self->thread_idle_mutex) {
3363         g_mutex_lock(self->thread_idle_mutex);
3364         while(nb_done != self->nb_threads) {
3365             nb_done = 0;
3366             for (thread = 0; thread < self->nb_threads; thread++)  {
3367                 if (self->s3t[thread].done == 1)
3368                     nb_done++;
3369             }
3370             if (nb_done != self->nb_threads) {
3371                 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
3372             }
3373         }
3374         g_mutex_unlock(self->thread_idle_mutex);
3375     }
3376 }