2 * Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 as published
6 * by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21 /* An S3 device uses Amazon's S3 service (http://www.amazon.com/s3) to store
22 * data. It stores data in keys named with a user-specified prefix, inside a
23 * user-specified bucket. Data is stored in the form of numbered (large)
29 #include <sys/types.h>
39 #include <curl/curl.h>
40 #ifdef HAVE_OPENSSL_HMAC_H
41 # include <openssl/hmac.h>
43 # ifdef HAVE_CRYPTO_HMAC_H
44 # include <crypto/hmac.h>
53 * Type checking and casting macros
55 #define TYPE_S3_DEVICE (s3_device_get_type())
56 #define S3_DEVICE(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device)
57 #define S3_DEVICE_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device const)
58 #define S3_DEVICE_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), s3_device_get_type(), S3DeviceClass)
59 #define IS_S3_DEVICE(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), s3_device_get_type ())
61 #define S3_DEVICE_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), s3_device_get_type(), S3DeviceClass)
62 static GType s3_device_get_type (void);
65 * Main object structure
67 typedef struct _S3MetadataFile S3MetadataFile;
68 typedef struct _S3Device S3Device;
70 typedef struct _S3_by_thread S3_by_thread;
71 struct _S3_by_thread {
72 S3Handle * volatile s3;
73 CurlBuffer volatile curl_buffer;
74 guint volatile buffer_len;
78 char volatile * volatile filename;
79 DeviceStatusFlags volatile errflags; /* device_status */
80 char volatile * volatile errmsg; /* device error message */
88 /* The "easy" curl handle we use to access Amazon S3 */
91 /* S3 access information */
95 /* The S3 access information. */
100 /* The Openstack swift information. */
101 char *swift_account_id;
102 char *swift_access_key;
109 char *bucket_location;
113 char *server_side_encryption;
118 /* a cache for unsuccessful reads (where we get the file but the caller
119 * doesn't have space for it or doesn't want it), where we expect the
120 * next call will request the same file.
126 /* Produce verbose output? */
129 /* create the bucket? */
130 gboolean create_bucket;
137 guint64 max_send_speed;
138 guint64 max_recv_speed;
141 guint64 volume_bytes;
142 guint64 volume_limit;
143 gboolean enforce_volume_limit;
144 gboolean use_subdomain;
145 gboolean use_s3_multi_delete;
148 int nb_threads_backup;
149 int nb_threads_recovery;
150 GThreadPool *thread_pool_delete;
151 GThreadPool *thread_pool_write;
152 GThreadPool *thread_pool_read;
153 GCond *thread_idle_cond;
154 GMutex *thread_idle_mutex;
155 int next_block_to_read;
167 gboolean reuse_connection;
173 typedef struct _S3DeviceClass S3DeviceClass;
174 struct _S3DeviceClass {
175 DeviceClass __parent__;
180 * Constants and static data
183 #define S3_DEVICE_NAME "s3"
185 /* Maximum key length as specified in the S3 documentation
186 * (*excluding* null terminator) */
187 #define S3_MAX_KEY_LENGTH 1024
189 /* Note: for compatability, min can only be decreased and max increased */
190 #define S3_DEVICE_MIN_BLOCK_SIZE 1024
191 #define S3_DEVICE_MAX_BLOCK_SIZE (3*1024*1024*1024ULL)
192 #define S3_DEVICE_DEFAULT_BLOCK_SIZE (10*1024*1024)
193 #define EOM_EARLY_WARNING_ZONE_BLOCKS 4
195 /* This goes in lieu of file number for metadata. */
196 #define SPECIAL_INFIX "special-"
198 /* pointer to the class of our parent */
199 static DeviceClass *parent_class = NULL;
202 * device-specific properties
205 /* Authentication information for Amazon S3. Both of these are strings. */
206 static DevicePropertyBase device_property_s3_access_key;
207 static DevicePropertyBase device_property_s3_secret_key;
208 #define PROPERTY_S3_SECRET_KEY (device_property_s3_secret_key.ID)
209 #define PROPERTY_S3_ACCESS_KEY (device_property_s3_access_key.ID)
211 /* Authentication information for Openstack Swift. Both of these are strings. */
212 static DevicePropertyBase device_property_swift_account_id;
213 static DevicePropertyBase device_property_swift_access_key;
214 #define PROPERTY_SWIFT_ACCOUNT_ID (device_property_swift_account_id.ID)
215 #define PROPERTY_SWIFT_ACCESS_KEY (device_property_swift_access_key.ID)
217 /* Authentication information for Openstack Swift. Both of these are strings. */
218 static DevicePropertyBase device_property_username;
219 static DevicePropertyBase device_property_password;
220 static DevicePropertyBase device_property_tenant_id;
221 static DevicePropertyBase device_property_tenant_name;
222 #define PROPERTY_USERNAME (device_property_username.ID)
223 #define PROPERTY_PASSWORD (device_property_password.ID)
224 #define PROPERTY_TENANT_ID (device_property_tenant_id.ID)
225 #define PROPERTY_TENANT_NAME (device_property_tenant_name.ID)
228 static DevicePropertyBase device_property_s3_host;
229 static DevicePropertyBase device_property_s3_service_path;
230 #define PROPERTY_S3_HOST (device_property_s3_host.ID)
231 #define PROPERTY_S3_SERVICE_PATH (device_property_s3_service_path.ID)
233 /* Same, but for S3 with DevPay. */
234 static DevicePropertyBase device_property_s3_user_token;
235 #define PROPERTY_S3_USER_TOKEN (device_property_s3_user_token.ID)
237 /* Location constraint for new buckets created on Amazon S3. */
238 static DevicePropertyBase device_property_s3_bucket_location;
239 #define PROPERTY_S3_BUCKET_LOCATION (device_property_s3_bucket_location.ID)
242 static DevicePropertyBase device_property_s3_storage_class;
243 #define PROPERTY_S3_STORAGE_CLASS (device_property_s3_storage_class.ID)
245 /* Server side encryption */
246 static DevicePropertyBase device_property_s3_server_side_encryption;
247 #define PROPERTY_S3_SERVER_SIDE_ENCRYPTION (device_property_s3_server_side_encryption.ID)
250 static DevicePropertyBase device_property_proxy;
251 #define PROPERTY_PROXY (device_property_proxy.ID)
253 /* Path to certificate authority certificate */
254 static DevicePropertyBase device_property_ssl_ca_info;
255 #define PROPERTY_SSL_CA_INFO (device_property_ssl_ca_info.ID)
257 /* Which strotage api to use. */
258 static DevicePropertyBase device_property_storage_api;
259 #define PROPERTY_STORAGE_API (device_property_storage_api.ID)
261 /* Whether to use openstack protocol. */
263 static DevicePropertyBase device_property_openstack_swift_api;
264 #define PROPERTY_OPENSTACK_SWIFT_API (device_property_openstack_swift_api.ID)
266 /* Whether to use SSL with Amazon S3. */
267 static DevicePropertyBase device_property_s3_ssl;
268 #define PROPERTY_S3_SSL (device_property_s3_ssl.ID)
270 /* Whether to re-use connection. */
271 static DevicePropertyBase device_property_reuse_connection;
272 #define PROPERTY_REUSE_CONNECTION (device_property_reuse_connection.ID)
274 /* Speed limits for sending and receiving */
275 static DevicePropertyBase device_property_max_send_speed;
276 static DevicePropertyBase device_property_max_recv_speed;
277 #define PROPERTY_MAX_SEND_SPEED (device_property_max_send_speed.ID)
278 #define PROPERTY_MAX_RECV_SPEED (device_property_max_recv_speed.ID)
280 /* Whether to use subdomain */
281 static DevicePropertyBase device_property_s3_subdomain;
282 #define PROPERTY_S3_SUBDOMAIN (device_property_s3_subdomain.ID)
284 /* Number of threads to use */
285 static DevicePropertyBase device_property_nb_threads_backup;
286 #define PROPERTY_NB_THREADS_BACKUP (device_property_nb_threads_backup.ID)
287 static DevicePropertyBase device_property_nb_threads_recovery;
288 #define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID)
290 /* If the s3 server have the multi-delete functionality */
291 static DevicePropertyBase device_property_s3_multi_delete;
292 #define PROPERTY_S3_MULTI_DELETE (device_property_s3_multi_delete.ID)
294 /* The client_id for OAUTH2 */
295 static DevicePropertyBase device_property_client_id;
296 #define PROPERTY_CLIENT_ID (device_property_client_id.ID)
298 /* The client_secret for OAUTH2 */
299 static DevicePropertyBase device_property_client_secret;
300 #define PROPERTY_CLIENT_SECRET (device_property_client_secret.ID)
302 /* The refresh token for OAUTH2 */
303 static DevicePropertyBase device_property_refresh_token;
304 #define PROPERTY_REFRESH_TOKEN (device_property_refresh_token.ID)
307 static DevicePropertyBase device_property_project_id;
308 #define PROPERTY_PROJECT_ID (device_property_project_id.ID)
311 static DevicePropertyBase device_property_create_bucket;
312 #define PROPERTY_CREATE_BUCKET (device_property_create_bucket.ID)
318 void s3_device_register(void);
321 * utility functions */
323 /* Given file and block numbers, return an S3 key.
325 * @param self: the S3Device object
326 * @param file: the file number
327 * @param block: the block within that file
328 * @returns: a newly allocated string containing an S3 key.
331 file_and_block_to_key(S3Device *self,
335 /* Given the name of a special file (such as 'tapestart'), generate
336 * the S3 key to use for that file.
338 * @param self: the S3Device object
339 * @param special_name: name of the special file
340 * @param file: a file number to include; omitted if -1
341 * @returns: a newly alocated string containing an S3 key.
344 special_file_to_key(S3Device *self,
347 /* Write an amanda header file to S3.
349 * @param self: the S3Device object
350 * @param label: the volume label
351 * @param timestamp: the volume timestamp
354 write_amanda_header(S3Device *self,
358 /* "Fast forward" this device to the end by looking up the largest file number
359 * present and setting the current file number one greater.
361 * @param self: the S3Device object
364 seek_to_end(S3Device *self);
366 /* Find the number of the last file that contains any data (even just a header).
368 * @param self: the S3Device object
369 * @returns: the last file, or -1 in event of an error
372 find_last_file(S3Device *self);
374 /* Delete all blocks in the given file, including the filestart block
376 * @param self: the S3Device object
377 * @param file: the file to delete
380 delete_file(S3Device *self,
384 /* Delete all files in the given device
386 * @param self: the S3Device object
389 delete_all_files(S3Device *self);
391 /* Set up self->s3t as best as possible.
393 * The return value is TRUE iff self->s3t is useable.
395 * @param self: the S3Device object
396 * @returns: TRUE if the handle is set up
399 setup_handle(S3Device * self);
402 s3_wait_thread_delete(S3Device *self);
408 s3_device_init(S3Device * o);
411 s3_device_class_init(S3DeviceClass * c);
414 s3_device_finalize(GObject * o);
417 s3_device_factory(char * device_name, char * device_type, char * device_node);
420 * Property{Get,Set}Fns */
422 static gboolean s3_device_set_access_key_fn(Device *self,
423 DevicePropertyBase *base, GValue *val,
424 PropertySurety surety, PropertySource source);
426 static gboolean s3_device_set_secret_key_fn(Device *self,
427 DevicePropertyBase *base, GValue *val,
428 PropertySurety surety, PropertySource source);
430 static gboolean s3_device_set_swift_account_id_fn(Device *self,
431 DevicePropertyBase *base, GValue *val,
432 PropertySurety surety, PropertySource source);
434 static gboolean s3_device_set_swift_access_key_fn(Device *self,
435 DevicePropertyBase *base, GValue *val,
436 PropertySurety surety, PropertySource source);
438 static gboolean s3_device_set_username(Device *self,
439 DevicePropertyBase *base, GValue *val,
440 PropertySurety surety, PropertySource source);
442 static gboolean s3_device_set_password(Device *self,
443 DevicePropertyBase *base, GValue *val,
444 PropertySurety surety, PropertySource source);
446 static gboolean s3_device_set_tenant_id(Device *self,
447 DevicePropertyBase *base, GValue *val,
448 PropertySurety surety, PropertySource source);
450 static gboolean s3_device_set_tenant_name(Device *self,
451 DevicePropertyBase *base, GValue *val,
452 PropertySurety surety, PropertySource source);
454 static gboolean s3_device_set_user_token_fn(Device *self,
455 DevicePropertyBase *base, GValue *val,
456 PropertySurety surety, PropertySource source);
458 static gboolean s3_device_set_bucket_location_fn(Device *self,
459 DevicePropertyBase *base, GValue *val,
460 PropertySurety surety, PropertySource source);
462 static gboolean s3_device_set_storage_class_fn(Device *self,
463 DevicePropertyBase *base, GValue *val,
464 PropertySurety surety, PropertySource source);
466 static gboolean s3_device_set_server_side_encryption_fn(Device *self,
467 DevicePropertyBase *base, GValue *val,
468 PropertySurety surety, PropertySource source);
470 static gboolean s3_device_set_proxy_fn(Device *self,
471 DevicePropertyBase *base, GValue *val,
472 PropertySurety surety, PropertySource source);
474 static gboolean s3_device_set_ca_info_fn(Device *self,
475 DevicePropertyBase *base, GValue *val,
476 PropertySurety surety, PropertySource source);
478 static gboolean s3_device_set_verbose_fn(Device *self,
479 DevicePropertyBase *base, GValue *val,
480 PropertySurety surety, PropertySource source);
482 static gboolean s3_device_set_create_bucket_fn(Device *self,
483 DevicePropertyBase *base, GValue *val,
484 PropertySurety surety, PropertySource source);
486 static gboolean s3_device_set_storage_api(Device *self,
487 DevicePropertyBase *base, GValue *val,
488 PropertySurety surety, PropertySource source);
490 static gboolean s3_device_set_openstack_swift_api_fn(Device *self,
491 DevicePropertyBase *base, GValue *val,
492 PropertySurety surety, PropertySource source);
494 static gboolean s3_device_set_s3_multi_delete_fn(Device *self,
495 DevicePropertyBase *base, GValue *val,
496 PropertySurety surety, PropertySource source);
498 static gboolean s3_device_set_ssl_fn(Device *self,
499 DevicePropertyBase *base, GValue *val,
500 PropertySurety surety, PropertySource source);
502 static gboolean s3_device_set_reuse_connection_fn(Device *self,
503 DevicePropertyBase *base, GValue *val,
504 PropertySurety surety, PropertySource source);
506 static gboolean s3_device_set_max_send_speed_fn(Device *self,
507 DevicePropertyBase *base, GValue *val,
508 PropertySurety surety, PropertySource source);
510 static gboolean s3_device_set_max_recv_speed_fn(Device *self,
511 DevicePropertyBase *base, GValue *val,
512 PropertySurety surety, PropertySource source);
514 static gboolean s3_device_set_nb_threads_backup(Device *self,
515 DevicePropertyBase *base, GValue *val,
516 PropertySurety surety, PropertySource source);
518 static gboolean s3_device_set_nb_threads_recovery(Device *self,
519 DevicePropertyBase *base, GValue *val,
520 PropertySurety surety, PropertySource source);
522 static gboolean s3_device_set_max_volume_usage_fn(Device *p_self,
523 DevicePropertyBase *base, GValue *val,
524 PropertySurety surety, PropertySource source);
526 static gboolean property_set_leom_fn(Device *p_self,
527 DevicePropertyBase *base, GValue *val,
528 PropertySurety surety, PropertySource source);
530 static gboolean s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
531 DevicePropertyBase *base, GValue *val,
532 PropertySurety surety, PropertySource source);
534 static gboolean s3_device_set_use_subdomain_fn(Device *p_self,
535 DevicePropertyBase *base, GValue *val,
536 PropertySurety surety, PropertySource source);
538 static gboolean s3_device_set_host_fn(Device *p_self,
539 DevicePropertyBase *base, GValue *val,
540 PropertySurety surety, PropertySource source);
542 static gboolean s3_device_set_service_path_fn(Device *p_self,
543 DevicePropertyBase *base, GValue *val,
544 PropertySurety surety, PropertySource source);
546 static gboolean s3_device_set_client_id_fn(Device *p_self,
547 DevicePropertyBase *base, GValue *val,
548 PropertySurety surety, PropertySource source);
550 static gboolean s3_device_set_client_secret_fn(Device *p_self,
551 DevicePropertyBase *base, GValue *val,
552 PropertySurety surety, PropertySource source);
554 static gboolean s3_device_set_refresh_token_fn(Device *p_self,
555 DevicePropertyBase *base, GValue *val,
556 PropertySurety surety, PropertySource source);
558 static gboolean s3_device_set_project_id_fn(Device *p_self,
559 DevicePropertyBase *base, GValue *val,
560 PropertySurety surety, PropertySource source);
562 static void s3_thread_read_block(gpointer thread_data,
564 static void s3_thread_write_block(gpointer thread_data,
566 static gboolean make_bucket(Device * pself);
569 /* Wait that all threads are done */
570 static void reset_thread(S3Device *self);
573 * virtual functions */
576 s3_device_open_device(Device *pself, char *device_name,
577 char * device_type, char * device_node);
579 static DeviceStatusFlags s3_device_read_label(Device * self);
582 s3_device_start(Device * self,
583 DeviceAccessMode mode,
588 s3_device_finish(Device * self);
591 s3_device_get_bytes_read(Device * self);
594 s3_device_get_bytes_written(Device * self);
597 s3_device_start_file(Device * self,
598 dumpfile_t * jobInfo);
601 s3_device_write_block(Device * self,
606 s3_device_finish_file(Device * self);
609 s3_device_seek_file(Device *pself,
613 s3_device_seek_block(Device *pself,
617 s3_device_read_block(Device * pself,
622 s3_device_recycle_file(Device *pself,
626 s3_device_erase(Device *pself);
629 check_at_leom(S3Device *self,
633 check_at_peom(S3Device *self,
641 file_and_block_to_key(S3Device *self,
645 char *s3_key = g_strdup_printf("%sf%08x-b%016llx.data",
646 self->prefix, file, (long long unsigned int)block);
647 g_assert(strlen(s3_key) <= S3_MAX_KEY_LENGTH);
652 special_file_to_key(S3Device *self,
657 return g_strdup_printf("%s" SPECIAL_INFIX "%s", self->prefix, special_name);
659 return g_strdup_printf("%sf%08x-%s", self->prefix, file, special_name);
663 write_amanda_header(S3Device *self,
667 CurlBuffer amanda_header = {NULL, 0, 0, 0};
670 dumpfile_t * dumpinfo = NULL;
671 Device *d_self = DEVICE(self);
674 /* build the header */
675 header_size = 0; /* no minimum size */
676 dumpinfo = make_tapestart_header(DEVICE(self), label, timestamp);
677 amanda_header.buffer = device_build_amanda_header(DEVICE(self), dumpinfo,
679 if (amanda_header.buffer == NULL) {
680 device_set_error(d_self,
681 stralloc(_("Amanda tapestart header won't fit in a single block!")),
682 DEVICE_STATUS_DEVICE_ERROR);
683 dumpfile_free(dumpinfo);
684 g_free(amanda_header.buffer);
688 if(check_at_leom(self, header_size))
689 d_self->is_eom = TRUE;
691 if(check_at_peom(self, header_size)) {
692 d_self->is_eom = TRUE;
693 device_set_error(d_self,
694 stralloc(_("No space left on device")),
695 DEVICE_STATUS_DEVICE_ERROR);
696 g_free(amanda_header.buffer);
700 /* write out the header and flush the uploads. */
701 key = special_file_to_key(self, "tapestart", -1);
702 g_assert(header_size < G_MAXUINT); /* for cast to guint */
703 amanda_header.buffer_len = (guint)header_size;
704 result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
705 &amanda_header, NULL, NULL);
706 g_free(amanda_header.buffer);
710 device_set_error(d_self,
711 vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3t[0].s3)),
712 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
713 dumpfile_free(dumpinfo);
715 dumpfile_free(d_self->volume_header);
716 d_self->volume_header = dumpinfo;
717 self->volume_bytes += header_size;
719 d_self->header_block_size = header_size;
724 seek_to_end(S3Device *self) {
727 Device *pself = DEVICE(self);
729 last_file = find_last_file(self);
733 pself->file = last_file;
738 /* Convert an object name into a file number, assuming the given prefix
739 * length. Returns -1 if the object name is invalid, or 0 if the object name
740 * is a "special" key. */
741 static int key_to_file(guint prefix_len, const char * key) {
745 /* skip the prefix */
746 if (strlen(key) <= prefix_len)
751 if (strncmp(key, SPECIAL_INFIX, strlen(SPECIAL_INFIX)) == 0) {
755 /* check that key starts with 'f' */
760 /* check that key is of the form "%08x-" */
761 for (i = 0; i < 8; i++) {
762 if (!(key[i] >= '0' && key[i] <= '9') &&
763 !(key[i] >= 'a' && key[i] <= 'f') &&
764 !(key[i] >= 'A' && key[i] <= 'F')) break;
766 if (key[i] != '-') return -1;
767 if (i < 8) return -1;
769 /* convert the file number */
771 file = strtoul(key, NULL, 16);
773 g_warning(_("unparseable file number '%s'"), key);
780 /* Find the number of the last file that contains any data (even just a header).
781 * Returns -1 in event of an error
784 find_last_file(S3Device *self) {
787 unsigned int prefix_len = strlen(self->prefix);
789 Device *d_self = DEVICE(self);
791 /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
792 result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-", &keys, NULL);
794 device_set_error(d_self,
795 vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
796 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
800 for (; keys; keys = g_slist_remove(keys, keys->data)) {
801 int file = key_to_file(prefix_len, keys->data);
803 /* and if it's the last, keep it */
804 if (file > last_file)
811 /* Find the number of the file following the requested one, if any.
812 * Returns 0 if there is no such file or -1 in event of an error
815 find_next_file(S3Device *self, int last_file) {
818 unsigned int prefix_len = strlen(self->prefix);
820 Device *d_self = DEVICE(self);
822 /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
823 result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-",
826 device_set_error(d_self,
827 vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
828 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
832 for (; keys; keys = g_slist_remove(keys, keys->data)) {
835 file = key_to_file(prefix_len, (char*)keys->data);
838 /* Set this in case we don't find a next file; this is not a
839 * hard error, so if we can find a next file we'll return that
844 if (file < next_file && file > last_file) {
853 delete_file(S3Device *self,
860 guint64 total_size = 0;
861 Device *d_self = DEVICE(self);
865 my_prefix = g_strdup_printf("%sf", self->prefix);
867 my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
870 result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL,
873 device_set_error(d_self,
874 g_strdup_printf(_("While listing S3 keys: %s"),
875 s3_strerror(self->s3t[0].s3)),
876 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
880 g_mutex_lock(self->thread_idle_mutex);
884 self->keys = g_slist_concat(self->keys, keys);
888 for (thread = 0; thread < self->nb_threads; thread++) {
889 if (self->s3t[thread].idle == 1) {
890 /* Check if the thread is in error */
891 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
892 device_set_error(d_self,
893 (char *)self->s3t[thread].errmsg,
894 self->s3t[thread].errflags);
895 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
896 self->s3t[thread].errmsg = NULL;
897 g_mutex_unlock(self->thread_idle_mutex);
898 s3_wait_thread_delete(self);
901 self->s3t[thread].idle = 0;
902 self->s3t[thread].done = 0;
903 g_thread_pool_push(self->thread_pool_delete, &self->s3t[thread],
907 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
908 g_mutex_unlock(self->thread_idle_mutex);
910 self->volume_bytes = total_size;
912 s3_wait_thread_delete(self);
918 s3_thread_delete_block(
919 gpointer thread_data,
922 static int count = 0;
923 S3_by_thread *s3t = (S3_by_thread *)thread_data;
924 Device *pself = (Device *)data;
925 S3Device *self = S3_DEVICE(pself);
929 g_mutex_lock(self->thread_idle_mutex);
930 while (result && self->keys) {
931 if (self->use_s3_multi_delete) {
932 char **filenames = g_new(char *, 1001);
933 char **f = filenames;
935 while (self->keys && n<1000) {
936 *f++ = self->keys->data;
937 self->keys = g_slist_remove(self->keys, self->keys->data);
941 g_mutex_unlock(self->thread_idle_mutex);
942 result = s3_multi_delete(s3t->s3, (const char *)self->bucket,
943 (const char **)filenames);
948 g_debug("Deleting multiple keys not implemented");
949 } else { /* result == 0 */
950 g_debug("Deleteing multiple keys failed: %s",
951 s3_strerror(s3t->s3));
954 self->use_s3_multi_delete = 0;
955 /* re-add all filenames */
957 g_mutex_lock(self->thread_idle_mutex);
959 self->keys = g_slist_prepend(self->keys, *f++);
961 g_mutex_unlock(self->thread_idle_mutex);
964 g_mutex_lock(self->thread_idle_mutex);
973 filename = self->keys->data;
974 self->keys = g_slist_remove(self->keys, self->keys->data);
977 g_debug("Deleting %s ...", filename);
980 g_mutex_unlock(self->thread_idle_mutex);
981 result = s3_delete(s3t->s3, (const char *)self->bucket,
982 (const char *)filename);
984 s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
985 s3t->errmsg = g_strdup_printf(_("While deleting key '%s': %s"),
986 filename, s3_strerror(s3t->s3));
990 g_mutex_lock(self->thread_idle_mutex);
994 g_cond_broadcast(self->thread_idle_cond);
995 g_mutex_unlock(self->thread_idle_mutex);
999 s3_wait_thread_delete(S3Device *self)
1001 Device *d_self = (Device *)self;
1002 int idle_thread = 0;
1005 g_mutex_lock(self->thread_idle_mutex);
1006 while (idle_thread != self->nb_threads) {
1008 for (thread = 0; thread < self->nb_threads; thread++) {
1009 if (self->s3t[thread].idle == 1) {
1012 /* Check if the thread is in error */
1013 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
1014 device_set_error(d_self, (char *)self->s3t[thread].errmsg,
1015 self->s3t[thread].errflags);
1016 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
1017 self->s3t[thread].errmsg = NULL;
1020 if (idle_thread != self->nb_threads) {
1021 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
1024 g_mutex_unlock(self->thread_idle_mutex);
1029 delete_all_files(S3Device *self)
1031 return delete_file(self, -1);
1039 s3_device_register(void)
1041 static const char * device_prefix_list[] = { S3_DEVICE_NAME, NULL };
1042 g_assert(s3_init());
1044 /* set up our properties */
1045 device_property_fill_and_register(&device_property_s3_secret_key,
1046 G_TYPE_STRING, "s3_secret_key",
1047 "Secret access key to authenticate with Amazon S3");
1048 device_property_fill_and_register(&device_property_s3_access_key,
1049 G_TYPE_STRING, "s3_access_key",
1050 "Access key ID to authenticate with Amazon S3");
1051 device_property_fill_and_register(&device_property_swift_account_id,
1052 G_TYPE_STRING, "swift_account_id",
1053 "Account ID to authenticate with openstack swift");
1054 device_property_fill_and_register(&device_property_swift_access_key,
1055 G_TYPE_STRING, "swift_access_key",
1056 "Access key to authenticate with openstack swift");
1057 device_property_fill_and_register(&device_property_username,
1058 G_TYPE_STRING, "username",
1059 "Username to authenticate with");
1060 device_property_fill_and_register(&device_property_password,
1061 G_TYPE_STRING, "password",
1062 "password to authenticate with");
1063 device_property_fill_and_register(&device_property_tenant_id,
1064 G_TYPE_STRING, "tenant_id",
1065 "tenant_id to authenticate with");
1066 device_property_fill_and_register(&device_property_tenant_name,
1067 G_TYPE_STRING, "tenant_name",
1068 "tenant_name to authenticate with");
1069 device_property_fill_and_register(&device_property_s3_host,
1070 G_TYPE_STRING, "s3_host",
1071 "hostname:port of the server");
1072 device_property_fill_and_register(&device_property_s3_service_path,
1073 G_TYPE_STRING, "s3_service_path",
1074 "path to add in the url");
1075 device_property_fill_and_register(&device_property_s3_user_token,
1076 G_TYPE_STRING, "s3_user_token",
1077 "User token for authentication Amazon devpay requests");
1078 device_property_fill_and_register(&device_property_s3_bucket_location,
1079 G_TYPE_STRING, "s3_bucket_location",
1080 "Location constraint for buckets on Amazon S3");
1081 device_property_fill_and_register(&device_property_s3_storage_class,
1082 G_TYPE_STRING, "s3_storage_class",
1083 "Storage class as specified by Amazon (STANDARD or REDUCED_REDUNDANCY)");
1084 device_property_fill_and_register(&device_property_s3_server_side_encryption,
1085 G_TYPE_STRING, "s3_server_side_encryption",
1086 "Serve side encryption as specified by Amazon (AES256)");
1087 device_property_fill_and_register(&device_property_proxy,
1088 G_TYPE_STRING, "proxy",
1090 device_property_fill_and_register(&device_property_ssl_ca_info,
1091 G_TYPE_STRING, "ssl_ca_info",
1092 "Path to certificate authority certificate");
1093 device_property_fill_and_register(&device_property_storage_api,
1094 G_TYPE_STRING, "storage_api",
1095 "Which cloud API to use.");
1096 device_property_fill_and_register(&device_property_openstack_swift_api,
1097 G_TYPE_STRING, "openstack_swift_api",
1098 "Whether to use openstack protocol");
1099 device_property_fill_and_register(&device_property_client_id,
1100 G_TYPE_STRING, "client_id",
1101 "client_id for use with oauth2");
1102 device_property_fill_and_register(&device_property_client_secret,
1103 G_TYPE_STRING, "client_secret",
1104 "client_secret for use with oauth2");
1105 device_property_fill_and_register(&device_property_refresh_token,
1106 G_TYPE_STRING, "refresh_token",
1107 "refresh_token for use with oauth2");
1108 device_property_fill_and_register(&device_property_project_id,
1109 G_TYPE_STRING, "project_id",
1110 "project id for use with google");
1111 device_property_fill_and_register(&device_property_s3_ssl,
1112 G_TYPE_BOOLEAN, "s3_ssl",
1113 "Whether to use SSL with Amazon S3");
1114 device_property_fill_and_register(&device_property_reuse_connection,
1115 G_TYPE_BOOLEAN, "reuse_connection",
1116 "Whether to reuse connection");
1117 device_property_fill_and_register(&device_property_create_bucket,
1118 G_TYPE_BOOLEAN, "create_bucket",
1119 "Whether to create/delete bucket");
1120 device_property_fill_and_register(&device_property_s3_subdomain,
1121 G_TYPE_BOOLEAN, "s3_subdomain",
1122 "Whether to use subdomain");
1123 device_property_fill_and_register(&device_property_max_send_speed,
1124 G_TYPE_UINT64, "max_send_speed",
1125 "Maximum average upload speed (bytes/sec)");
1126 device_property_fill_and_register(&device_property_max_recv_speed,
1127 G_TYPE_UINT64, "max_recv_speed",
1128 "Maximum average download speed (bytes/sec)");
1129 device_property_fill_and_register(&device_property_nb_threads_backup,
1130 G_TYPE_UINT64, "nb_threads_backup",
1131 "Number of writer thread");
1132 device_property_fill_and_register(&device_property_nb_threads_recovery,
1133 G_TYPE_UINT64, "nb_threads_recovery",
1134 "Number of reader thread");
1135 device_property_fill_and_register(&device_property_s3_multi_delete,
1136 G_TYPE_BOOLEAN, "s3_multi_delete",
1137 "Whether to use multi-delete");
1139 /* register the device itself */
1140 register_device(s3_device_factory, device_prefix_list);
1144 s3_device_get_type(void)
1146 static GType type = 0;
1148 if G_UNLIKELY(type == 0) {
1149 static const GTypeInfo info = {
1150 sizeof (S3DeviceClass),
1151 (GBaseInitFunc) NULL,
1152 (GBaseFinalizeFunc) NULL,
1153 (GClassInitFunc) s3_device_class_init,
1154 (GClassFinalizeFunc) NULL,
1155 NULL /* class_data */,
1157 0 /* n_preallocs */,
1158 (GInstanceInitFunc) s3_device_init,
1162 type = g_type_register_static (TYPE_DEVICE, "S3Device", &info,
1170 s3_device_init(S3Device * self)
1172 Device * dself = DEVICE(self);
1175 self->s3_api = S3_API_S3;
1176 self->volume_bytes = 0;
1177 self->volume_limit = 0;
1179 self->enforce_volume_limit = FALSE;
1180 self->use_subdomain = FALSE;
1181 self->nb_threads = 1;
1182 self->nb_threads_backup = 1;
1183 self->nb_threads_recovery = 1;
1184 self->thread_pool_delete = NULL;
1185 self->thread_pool_write = NULL;
1186 self->thread_pool_read = NULL;
1187 self->thread_idle_cond = NULL;
1188 self->thread_idle_mutex = NULL;
1189 self->use_s3_multi_delete = 1;
1191 /* Register property values
1192 * Note: Some aren't added until s3_device_open_device()
1194 bzero(&response, sizeof(response));
1196 g_value_init(&response, CONCURRENCY_PARADIGM_TYPE);
1197 g_value_set_enum(&response, CONCURRENCY_PARADIGM_SHARED_READ);
1198 device_set_simple_property(dself, PROPERTY_CONCURRENCY,
1199 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1200 g_value_unset(&response);
1202 g_value_init(&response, STREAMING_REQUIREMENT_TYPE);
1203 g_value_set_enum(&response, STREAMING_REQUIREMENT_NONE);
1204 device_set_simple_property(dself, PROPERTY_STREAMING,
1205 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1206 g_value_unset(&response);
1208 g_value_init(&response, G_TYPE_BOOLEAN);
1209 g_value_set_boolean(&response, TRUE);
1210 device_set_simple_property(dself, PROPERTY_APPENDABLE,
1211 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1212 g_value_unset(&response);
1214 g_value_init(&response, G_TYPE_BOOLEAN);
1215 g_value_set_boolean(&response, TRUE);
1216 device_set_simple_property(dself, PROPERTY_PARTIAL_DELETION,
1217 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1218 g_value_unset(&response);
1220 g_value_init(&response, G_TYPE_BOOLEAN);
1221 g_value_set_boolean(&response, TRUE);
1222 device_set_simple_property(dself, PROPERTY_FULL_DELETION,
1223 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1224 g_value_unset(&response);
1226 g_value_init(&response, G_TYPE_BOOLEAN);
1227 g_value_set_boolean(&response, TRUE); /* well, there *is* no EOM on S3 .. */
1228 device_set_simple_property(dself, PROPERTY_LEOM,
1229 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1230 g_value_unset(&response);
1232 g_value_init(&response, G_TYPE_BOOLEAN);
1233 g_value_set_boolean(&response, FALSE);
1234 device_set_simple_property(dself, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
1235 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1236 g_value_unset(&response);
1238 g_value_init(&response, G_TYPE_BOOLEAN);
1239 g_value_set_boolean(&response, FALSE);
1240 device_set_simple_property(dself, PROPERTY_S3_SUBDOMAIN,
1241 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1242 g_value_unset(&response);
1244 g_value_init(&response, G_TYPE_BOOLEAN);
1245 g_value_set_boolean(&response, FALSE);
1246 device_set_simple_property(dself, PROPERTY_COMPRESSION,
1247 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1248 g_value_unset(&response);
1250 g_value_init(&response, MEDIA_ACCESS_MODE_TYPE);
1251 g_value_set_enum(&response, MEDIA_ACCESS_MODE_READ_WRITE);
1252 device_set_simple_property(dself, PROPERTY_MEDIUM_ACCESS_TYPE,
1253 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1254 g_value_unset(&response);
1259 s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED)
1261 GObjectClass *g_object_class = (GObjectClass*) c;
1262 DeviceClass *device_class = (DeviceClass *)c;
1264 parent_class = g_type_class_ref (TYPE_DEVICE);
1266 device_class->open_device = s3_device_open_device;
1267 device_class->read_label = s3_device_read_label;
1268 device_class->start = s3_device_start;
1269 device_class->finish = s3_device_finish;
1270 device_class->get_bytes_read = s3_device_get_bytes_read;
1271 device_class->get_bytes_written = s3_device_get_bytes_written;
1273 device_class->start_file = s3_device_start_file;
1274 device_class->write_block = s3_device_write_block;
1275 device_class->finish_file = s3_device_finish_file;
1277 device_class->seek_file = s3_device_seek_file;
1278 device_class->seek_block = s3_device_seek_block;
1279 device_class->read_block = s3_device_read_block;
1280 device_class->recycle_file = s3_device_recycle_file;
1282 device_class->erase = s3_device_erase;
1284 g_object_class->finalize = s3_device_finalize;
1286 device_class_register_property(device_class, PROPERTY_S3_ACCESS_KEY,
1287 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1288 device_simple_property_get_fn,
1289 s3_device_set_access_key_fn);
1291 device_class_register_property(device_class, PROPERTY_S3_SECRET_KEY,
1292 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1293 device_simple_property_get_fn,
1294 s3_device_set_secret_key_fn);
1296 device_class_register_property(device_class, PROPERTY_SWIFT_ACCOUNT_ID,
1297 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1298 device_simple_property_get_fn,
1299 s3_device_set_swift_account_id_fn);
1301 device_class_register_property(device_class, PROPERTY_SWIFT_ACCESS_KEY,
1302 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1303 device_simple_property_get_fn,
1304 s3_device_set_swift_access_key_fn);
1306 device_class_register_property(device_class, PROPERTY_USERNAME,
1307 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1308 device_simple_property_get_fn,
1309 s3_device_set_username);
1311 device_class_register_property(device_class, PROPERTY_PASSWORD,
1312 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1313 device_simple_property_get_fn,
1314 s3_device_set_password);
1316 device_class_register_property(device_class, PROPERTY_TENANT_ID,
1317 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1318 device_simple_property_get_fn,
1319 s3_device_set_tenant_id);
1321 device_class_register_property(device_class, PROPERTY_TENANT_NAME,
1322 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1323 device_simple_property_get_fn,
1324 s3_device_set_tenant_name);
1326 device_class_register_property(device_class, PROPERTY_S3_HOST,
1327 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1328 device_simple_property_get_fn,
1329 s3_device_set_host_fn);
1331 device_class_register_property(device_class, PROPERTY_S3_SERVICE_PATH,
1332 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1333 device_simple_property_get_fn,
1334 s3_device_set_service_path_fn);
1336 device_class_register_property(device_class, PROPERTY_S3_USER_TOKEN,
1337 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1338 device_simple_property_get_fn,
1339 s3_device_set_user_token_fn);
1341 device_class_register_property(device_class, PROPERTY_S3_BUCKET_LOCATION,
1342 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1343 device_simple_property_get_fn,
1344 s3_device_set_bucket_location_fn);
1346 device_class_register_property(device_class, PROPERTY_S3_STORAGE_CLASS,
1347 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1348 device_simple_property_get_fn,
1349 s3_device_set_storage_class_fn);
1351 device_class_register_property(device_class, PROPERTY_S3_SERVER_SIDE_ENCRYPTION,
1352 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1353 device_simple_property_get_fn,
1354 s3_device_set_server_side_encryption_fn);
1356 device_class_register_property(device_class, PROPERTY_PROXY,
1357 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1358 device_simple_property_get_fn,
1359 s3_device_set_proxy_fn);
1361 device_class_register_property(device_class, PROPERTY_SSL_CA_INFO,
1362 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1363 device_simple_property_get_fn,
1364 s3_device_set_ca_info_fn);
1366 device_class_register_property(device_class, PROPERTY_VERBOSE,
1367 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1368 device_simple_property_get_fn,
1369 s3_device_set_verbose_fn);
1371 device_class_register_property(device_class, PROPERTY_CREATE_BUCKET,
1372 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1373 device_simple_property_get_fn,
1374 s3_device_set_create_bucket_fn);
1376 device_class_register_property(device_class, PROPERTY_STORAGE_API,
1377 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1378 device_simple_property_get_fn,
1379 s3_device_set_storage_api);
1381 device_class_register_property(device_class, PROPERTY_OPENSTACK_SWIFT_API,
1382 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1383 device_simple_property_get_fn,
1384 s3_device_set_openstack_swift_api_fn);
1386 device_class_register_property(device_class, PROPERTY_S3_MULTI_DELETE,
1387 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1388 device_simple_property_get_fn,
1389 s3_device_set_s3_multi_delete_fn);
1391 device_class_register_property(device_class, PROPERTY_S3_SSL,
1392 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1393 device_simple_property_get_fn,
1394 s3_device_set_ssl_fn);
1396 device_class_register_property(device_class, PROPERTY_REUSE_CONNECTION,
1397 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1398 device_simple_property_get_fn,
1399 s3_device_set_reuse_connection_fn);
1401 device_class_register_property(device_class, PROPERTY_MAX_SEND_SPEED,
1402 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1403 device_simple_property_get_fn,
1404 s3_device_set_max_send_speed_fn);
1406 device_class_register_property(device_class, PROPERTY_MAX_RECV_SPEED,
1407 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1408 device_simple_property_get_fn,
1409 s3_device_set_max_recv_speed_fn);
1411 device_class_register_property(device_class, PROPERTY_NB_THREADS_BACKUP,
1412 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1413 device_simple_property_get_fn,
1414 s3_device_set_nb_threads_backup);
1416 device_class_register_property(device_class, PROPERTY_NB_THREADS_RECOVERY,
1417 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1418 device_simple_property_get_fn,
1419 s3_device_set_nb_threads_recovery);
1421 device_class_register_property(device_class, PROPERTY_COMPRESSION,
1422 PROPERTY_ACCESS_GET_MASK,
1423 device_simple_property_get_fn,
1426 device_class_register_property(device_class, PROPERTY_LEOM,
1427 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1428 device_simple_property_get_fn,
1429 property_set_leom_fn);
1431 device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
1432 (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1433 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1434 device_simple_property_get_fn,
1435 s3_device_set_max_volume_usage_fn);
1437 device_class_register_property(device_class, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
1438 (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1439 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1440 device_simple_property_get_fn,
1441 s3_device_set_enforce_max_volume_usage_fn);
1443 device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN,
1444 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1445 device_simple_property_get_fn,
1446 s3_device_set_use_subdomain_fn);
1448 device_class_register_property(device_class, PROPERTY_CLIENT_ID,
1449 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1450 device_simple_property_get_fn,
1451 s3_device_set_client_id_fn);
1453 device_class_register_property(device_class, PROPERTY_CLIENT_SECRET,
1454 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1455 device_simple_property_get_fn,
1456 s3_device_set_client_secret_fn);
1458 device_class_register_property(device_class, PROPERTY_REFRESH_TOKEN,
1459 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1460 device_simple_property_get_fn,
1461 s3_device_set_refresh_token_fn);
1463 device_class_register_property(device_class, PROPERTY_PROJECT_ID,
1464 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1465 device_simple_property_get_fn,
1466 s3_device_set_project_id_fn);
1470 s3_device_set_access_key_fn(Device *p_self, DevicePropertyBase *base,
1471 GValue *val, PropertySurety surety, PropertySource source)
1473 S3Device *self = S3_DEVICE(p_self);
1475 amfree(self->access_key);
1476 self->access_key = g_value_dup_string(val);
1477 device_clear_volume_details(p_self);
1479 return device_simple_property_set_fn(p_self, base, val, surety, source);
1483 s3_device_set_secret_key_fn(Device *p_self, DevicePropertyBase *base,
1484 GValue *val, PropertySurety surety, PropertySource source)
1486 S3Device *self = S3_DEVICE(p_self);
1488 amfree(self->secret_key);
1489 self->secret_key = g_value_dup_string(val);
1490 device_clear_volume_details(p_self);
1492 return device_simple_property_set_fn(p_self, base, val, surety, source);
1496 s3_device_set_swift_account_id_fn(Device *p_self, DevicePropertyBase *base,
1497 GValue *val, PropertySurety surety, PropertySource source)
1499 S3Device *self = S3_DEVICE(p_self);
1501 amfree(self->swift_account_id);
1502 self->swift_account_id = g_value_dup_string(val);
1503 device_clear_volume_details(p_self);
1505 return device_simple_property_set_fn(p_self, base, val, surety, source);
1509 s3_device_set_swift_access_key_fn(Device *p_self, DevicePropertyBase *base,
1510 GValue *val, PropertySurety surety, PropertySource source)
1512 S3Device *self = S3_DEVICE(p_self);
1514 amfree(self->swift_access_key);
1515 self->swift_access_key = g_value_dup_string(val);
1516 device_clear_volume_details(p_self);
1518 return device_simple_property_set_fn(p_self, base, val, surety, source);
1522 s3_device_set_username(Device *p_self, DevicePropertyBase *base,
1523 GValue *val, PropertySurety surety, PropertySource source)
1525 S3Device *self = S3_DEVICE(p_self);
1527 amfree(self->username);
1528 self->username = g_value_dup_string(val);
1529 device_clear_volume_details(p_self);
1531 return device_simple_property_set_fn(p_self, base, val, surety, source);
1535 s3_device_set_password(Device *p_self, DevicePropertyBase *base,
1536 GValue *val, PropertySurety surety, PropertySource source)
1538 S3Device *self = S3_DEVICE(p_self);
1540 amfree(self->password);
1541 self->password = g_value_dup_string(val);
1542 device_clear_volume_details(p_self);
1544 return device_simple_property_set_fn(p_self, base, val, surety, source);
1548 s3_device_set_tenant_id(Device *p_self, DevicePropertyBase *base,
1549 GValue *val, PropertySurety surety, PropertySource source)
1551 S3Device *self = S3_DEVICE(p_self);
1553 amfree(self->tenant_id);
1554 self->tenant_id = g_value_dup_string(val);
1555 device_clear_volume_details(p_self);
1557 return device_simple_property_set_fn(p_self, base, val, surety, source);
1561 s3_device_set_tenant_name(Device *p_self, DevicePropertyBase *base,
1562 GValue *val, PropertySurety surety, PropertySource source)
1564 S3Device *self = S3_DEVICE(p_self);
1566 amfree(self->tenant_name);
1567 self->tenant_name = g_value_dup_string(val);
1568 device_clear_volume_details(p_self);
1570 return device_simple_property_set_fn(p_self, base, val, surety, source);
1574 s3_device_set_host_fn(Device *p_self,
1575 DevicePropertyBase *base, GValue *val,
1576 PropertySurety surety, PropertySource source)
1578 S3Device *self = S3_DEVICE(p_self);
1581 self->host = g_value_dup_string(val);
1582 device_clear_volume_details(p_self);
1584 return device_simple_property_set_fn(p_self, base, val, surety, source);
1588 s3_device_set_service_path_fn(Device *p_self,
1589 DevicePropertyBase *base, GValue *val,
1590 PropertySurety surety, PropertySource source)
1592 S3Device *self = S3_DEVICE(p_self);
1594 amfree(self->service_path);
1595 self->service_path = g_value_dup_string(val);
1596 device_clear_volume_details(p_self);
1598 return device_simple_property_set_fn(p_self, base, val, surety, source);
1602 s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base,
1603 GValue *val, PropertySurety surety, PropertySource source)
1605 S3Device *self = S3_DEVICE(p_self);
1607 amfree(self->user_token);
1608 self->user_token = g_value_dup_string(val);
1609 device_clear_volume_details(p_self);
1611 return device_simple_property_set_fn(p_self, base, val, surety, source);
1615 s3_device_set_bucket_location_fn(Device *p_self, DevicePropertyBase *base,
1616 GValue *val, PropertySurety surety, PropertySource source)
1618 S3Device *self = S3_DEVICE(p_self);
1619 char *str_val = g_value_dup_string(val);
1621 if (str_val[0] && self->use_ssl && !s3_curl_location_compat()) {
1622 device_set_error(p_self, stralloc(_(
1623 "Location constraint given for Amazon S3 bucket, "
1624 "but libcurl is too old support wildcard certificates.")),
1625 DEVICE_STATUS_DEVICE_ERROR);
1629 if (str_val[0] && !s3_bucket_location_compat(self->bucket)) {
1630 device_set_error(p_self, g_strdup_printf(_(
1631 "Location constraint given for Amazon S3 bucket, "
1632 "but the bucket name (%s) is not usable as a subdomain."),
1634 DEVICE_STATUS_DEVICE_ERROR);
1638 amfree(self->bucket_location);
1639 self->bucket_location = str_val;
1640 device_clear_volume_details(p_self);
1642 return device_simple_property_set_fn(p_self, base, val, surety, source);
1649 s3_device_set_storage_class_fn(Device *p_self, DevicePropertyBase *base,
1650 GValue *val, PropertySurety surety, PropertySource source)
1652 S3Device *self = S3_DEVICE(p_self);
1653 char *str_val = g_value_dup_string(val);
1655 amfree(self->storage_class);
1656 self->storage_class = str_val;
1657 device_clear_volume_details(p_self);
1659 return device_simple_property_set_fn(p_self, base, val, surety, source);
1663 s3_device_set_server_side_encryption_fn(Device *p_self, DevicePropertyBase *base,
1664 GValue *val, PropertySurety surety, PropertySource source)
1666 S3Device *self = S3_DEVICE(p_self);
1667 char *str_val = g_value_dup_string(val);
1669 amfree(self->server_side_encryption);
1670 self->server_side_encryption = str_val;
1671 device_clear_volume_details(p_self);
1673 return device_simple_property_set_fn(p_self, base, val, surety, source);
1677 s3_device_set_proxy_fn(Device *p_self, DevicePropertyBase *base,
1678 GValue *val, PropertySurety surety, PropertySource source)
1680 S3Device *self = S3_DEVICE(p_self);
1681 char *str_val = g_value_dup_string(val);
1683 amfree(self->proxy);
1684 self->proxy = str_val;
1685 device_clear_volume_details(p_self);
1687 return device_simple_property_set_fn(p_self, base, val, surety, source);
1691 s3_device_set_ca_info_fn(Device *p_self, DevicePropertyBase *base,
1692 GValue *val, PropertySurety surety, PropertySource source)
1694 S3Device *self = S3_DEVICE(p_self);
1696 amfree(self->ca_info);
1697 self->ca_info = g_value_dup_string(val);
1698 device_clear_volume_details(p_self);
1700 return device_simple_property_set_fn(p_self, base, val, surety, source);
1704 s3_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base,
1705 GValue *val, PropertySurety surety, PropertySource source)
1707 S3Device *self = S3_DEVICE(p_self);
1710 self->verbose = g_value_get_boolean(val);
1711 /* Our S3 handle may not yet have been instantiated; if so, it will
1712 * get the proper verbose setting when it is created */
1714 for (thread = 0; thread < self->nb_threads; thread++) {
1715 if (self->s3t[thread].s3)
1716 s3_verbose(self->s3t[thread].s3, self->verbose);
1720 return device_simple_property_set_fn(p_self, base, val, surety, source);
1724 s3_device_set_create_bucket_fn(Device *p_self, DevicePropertyBase *base,
1725 GValue *val, PropertySurety surety, PropertySource source)
1727 S3Device *self = S3_DEVICE(p_self);
1730 self->create_bucket = g_value_get_boolean(val);
1731 /* Our S3 handle may not yet have been instantiated; if so, it will
1732 * get the proper verbose setting when it is created */
1734 for (thread = 0; thread < self->nb_threads; thread++) {
1735 if (self->s3t[thread].s3)
1736 s3_verbose(self->s3t[thread].s3, self->verbose);
1740 return device_simple_property_set_fn(p_self, base, val, surety, source);
1744 s3_device_set_storage_api(Device *p_self, DevicePropertyBase *base,
1745 GValue *val, PropertySurety surety, PropertySource source)
1747 S3Device *self = S3_DEVICE(p_self);
1749 const char *storage_api = g_value_get_string(val);
1750 if (g_str_equal(storage_api, "S3")) {
1751 self->s3_api = S3_API_S3;
1752 } else if (g_str_equal(storage_api, "SWIFT-1.0")) {
1753 self->s3_api = S3_API_SWIFT_1;
1754 } else if (g_str_equal(storage_api, "SWIFT-2.0")) {
1755 self->s3_api = S3_API_SWIFT_2;
1756 } else if (g_str_equal(storage_api, "OAUTH2")) {
1757 self->s3_api = S3_API_OAUTH2;
1759 g_debug("Invalid STORAGE_API, using \"S3\".");
1760 self->s3_api = S3_API_S3;
1763 return device_simple_property_set_fn(p_self, base, val, surety, source);
1767 s3_device_set_openstack_swift_api_fn(Device *p_self, DevicePropertyBase *base,
1768 GValue *val, PropertySurety surety, PropertySource source)
1771 const gboolean openstack_swift_api = g_value_get_boolean(val);
1772 if (openstack_swift_api) {
1773 GValue storage_api_val;
1774 g_value_init(&storage_api_val, G_TYPE_STRING);
1775 g_value_set_static_string(&storage_api_val, "SWIFT-1.0");
1776 return s3_device_set_storage_api(p_self, base, &storage_api_val,
1783 s3_device_set_s3_multi_delete_fn(Device *p_self,
1784 DevicePropertyBase *base, GValue *val,
1785 PropertySurety surety, PropertySource source)
1787 S3Device *self = S3_DEVICE(p_self);
1789 self->use_s3_multi_delete = g_value_get_boolean(val);
1791 return device_simple_property_set_fn(p_self, base, val, surety, source);
1795 s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base,
1796 GValue *val, PropertySurety surety, PropertySource source)
1798 S3Device *self = S3_DEVICE(p_self);
1802 new_val = g_value_get_boolean(val);
1803 /* Our S3 handle may not yet have been instantiated; if so, it will
1804 * get the proper use_ssl setting when it is created */
1806 for (thread = 0; thread < self->nb_threads; thread++) {
1807 if (self->s3t[thread].s3 && !s3_use_ssl(self->s3t[thread].s3, new_val)) {
1808 device_set_error(p_self, g_strdup_printf(_(
1809 "Error setting S3 SSL/TLS use "
1810 "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
1811 DEVICE_STATUS_DEVICE_ERROR);
1816 self->use_ssl = new_val;
1818 return device_simple_property_set_fn(p_self, base, val, surety, source);
1822 s3_device_set_reuse_connection_fn(Device *p_self, DevicePropertyBase *base,
1823 GValue *val, PropertySurety surety, PropertySource source)
1825 S3Device *self = S3_DEVICE(p_self);
1827 self->reuse_connection = g_value_get_boolean(val);
1829 return device_simple_property_set_fn(p_self, base, val, surety, source);
1833 s3_device_set_max_send_speed_fn(Device *p_self,
1834 DevicePropertyBase *base, GValue *val,
1835 PropertySurety surety, PropertySource source)
1837 S3Device *self = S3_DEVICE(p_self);
1841 new_val = g_value_get_uint64(val);
1843 for (thread = 0; thread < self->nb_threads; thread++) {
1844 if (self->s3t[thread].s3 && !s3_set_max_send_speed(self->s3t[thread].s3, new_val)) {
1845 device_set_error(p_self,
1846 g_strdup("Could not set S3 maximum send speed"),
1847 DEVICE_STATUS_DEVICE_ERROR);
1852 self->max_send_speed = new_val;
1854 return device_simple_property_set_fn(p_self, base, val, surety, source);
1858 s3_device_set_max_recv_speed_fn(Device *p_self,
1859 DevicePropertyBase *base, GValue *val,
1860 PropertySurety surety, PropertySource source)
1862 S3Device *self = S3_DEVICE(p_self);
1866 new_val = g_value_get_uint64(val);
1868 for (thread = 0; thread < self->nb_threads; thread++) {
1869 if (self->s3t[thread].s3 &&
1870 !s3_set_max_recv_speed(self->s3t[thread].s3, new_val)) {
1871 device_set_error(p_self,
1872 g_strdup("Could not set S3 maximum recv speed"),
1873 DEVICE_STATUS_DEVICE_ERROR);
1878 self->max_recv_speed = new_val;
1880 return device_simple_property_set_fn(p_self, base, val, surety, source);
1884 s3_device_set_nb_threads_backup(Device *p_self,
1885 DevicePropertyBase *base, GValue *val,
1886 PropertySurety surety, PropertySource source)
1888 S3Device *self = S3_DEVICE(p_self);
1891 new_val = g_value_get_uint64(val);
1892 self->nb_threads_backup = new_val;
1893 if (self->nb_threads_backup > self->nb_threads) {
1894 self->nb_threads = self->nb_threads_backup;
1897 return device_simple_property_set_fn(p_self, base, val, surety, source);
1901 s3_device_set_nb_threads_recovery(Device *p_self,
1902 DevicePropertyBase *base, GValue *val,
1903 PropertySurety surety, PropertySource source)
1905 S3Device *self = S3_DEVICE(p_self);
1908 new_val = g_value_get_uint64(val);
1909 self->nb_threads_recovery = new_val;
1910 if (self->nb_threads_recovery > self->nb_threads) {
1911 self->nb_threads = self->nb_threads_recovery;
1914 return device_simple_property_set_fn(p_self, base, val, surety, source);
1918 s3_device_set_max_volume_usage_fn(Device *p_self,
1919 DevicePropertyBase *base, GValue *val,
1920 PropertySurety surety, PropertySource source)
1922 S3Device *self = S3_DEVICE(p_self);
1924 self->volume_limit = g_value_get_uint64(val);
1926 return device_simple_property_set_fn(p_self, base, val, surety, source);
1931 s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
1932 DevicePropertyBase *base, GValue *val,
1933 PropertySurety surety, PropertySource source)
1935 S3Device *self = S3_DEVICE(p_self);
1937 self->enforce_volume_limit = g_value_get_boolean(val);
1939 return device_simple_property_set_fn(p_self, base, val, surety, source);
1944 s3_device_set_use_subdomain_fn(Device *p_self,
1945 DevicePropertyBase *base, GValue *val,
1946 PropertySurety surety, PropertySource source)
1948 S3Device *self = S3_DEVICE(p_self);
1950 self->use_subdomain = g_value_get_boolean(val);
1952 return device_simple_property_set_fn(p_self, base, val, surety, source);
1956 property_set_leom_fn(Device *p_self,
1957 DevicePropertyBase *base, GValue *val,
1958 PropertySurety surety, PropertySource source)
1960 S3Device *self = S3_DEVICE(p_self);
1962 self->leom = g_value_get_boolean(val);
1964 return device_simple_property_set_fn(p_self, base, val, surety, source);
1968 s3_device_set_client_id_fn(Device *p_self,
1969 DevicePropertyBase *base, GValue *val,
1970 PropertySurety surety, PropertySource source)
1972 S3Device *self = S3_DEVICE(p_self);
1974 amfree(self->client_id);
1975 self->client_id = g_value_dup_string(val);
1977 return device_simple_property_set_fn(p_self, base, val, surety, source);
1981 s3_device_set_client_secret_fn(Device *p_self,
1982 DevicePropertyBase *base, GValue *val,
1983 PropertySurety surety, PropertySource source)
1985 S3Device *self = S3_DEVICE(p_self);
1987 amfree(self->client_secret);
1988 self->client_secret = g_value_dup_string(val);
1990 return device_simple_property_set_fn(p_self, base, val, surety, source);
1994 s3_device_set_refresh_token_fn(Device *p_self,
1995 DevicePropertyBase *base, GValue *val,
1996 PropertySurety surety, PropertySource source)
1998 S3Device *self = S3_DEVICE(p_self);
2000 amfree(self->refresh_token);
2001 self->refresh_token = g_value_dup_string(val);
2003 return device_simple_property_set_fn(p_self, base, val, surety, source);
2007 s3_device_set_project_id_fn(Device *p_self,
2008 DevicePropertyBase *base, GValue *val,
2009 PropertySurety surety, PropertySource source)
2011 S3Device *self = S3_DEVICE(p_self);
2013 amfree(self->project_id);
2014 self->project_id = g_value_dup_string(val);
2016 return device_simple_property_set_fn(p_self, base, val, surety, source);
2020 s3_device_factory(char * device_name, char * device_type, char * device_node)
2023 g_assert(0 == strcmp(device_type, S3_DEVICE_NAME));
2024 rval = DEVICE(g_object_new(TYPE_S3_DEVICE, NULL));
2026 device_open_device(rval, device_name, device_type, device_node);
2031 * Virtual function overrides
2035 s3_device_open_device(Device *pself, char *device_name,
2036 char * device_type, char * device_node)
2038 S3Device *self = S3_DEVICE(pself);
2042 pself->min_block_size = S3_DEVICE_MIN_BLOCK_SIZE;
2043 pself->max_block_size = S3_DEVICE_MAX_BLOCK_SIZE;
2044 pself->block_size = S3_DEVICE_DEFAULT_BLOCK_SIZE;
2046 /* Device name may be bucket/prefix, to support multiple volumes in a
2048 name_colon = strchr(device_node, '/');
2049 if (name_colon == NULL) {
2050 self->bucket = g_strdup(device_node);
2051 self->prefix = g_strdup("");
2053 self->bucket = g_strndup(device_node, name_colon - device_node);
2054 self->prefix = g_strdup(name_colon + 1);
2057 if (self->bucket == NULL || self->bucket[0] == '\0') {
2058 device_set_error(pself,
2059 vstrallocf(_("Empty bucket name in device %s"), device_name),
2060 DEVICE_STATUS_DEVICE_ERROR);
2061 amfree(self->bucket);
2062 amfree(self->prefix);
2066 g_debug(_("S3 driver using bucket '%s', prefix '%s'"), self->bucket, self->prefix);
2068 /* default values */
2069 self->verbose = FALSE;
2070 self->s3_api = S3_API_S3;
2072 /* use SSL if available */
2073 self->use_ssl = s3_curl_supports_ssl();
2074 bzero(&tmp_value, sizeof(GValue));
2075 g_value_init(&tmp_value, G_TYPE_BOOLEAN);
2076 g_value_set_boolean(&tmp_value, self->use_ssl);
2077 device_set_simple_property(pself, device_property_s3_ssl.ID,
2078 &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
2080 /* reuse connection */
2081 self->reuse_connection = TRUE;
2082 bzero(&tmp_value, sizeof(GValue));
2083 g_value_init(&tmp_value, G_TYPE_BOOLEAN);
2084 g_value_set_boolean(&tmp_value, self->reuse_connection);
2085 device_set_simple_property(pself, device_property_reuse_connection.ID,
2086 &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
2088 /* Set default create_bucket */
2089 self->create_bucket = TRUE;
2090 bzero(&tmp_value, sizeof(GValue));
2091 g_value_init(&tmp_value, G_TYPE_BOOLEAN);
2092 g_value_set_boolean(&tmp_value, self->create_bucket);
2093 device_set_simple_property(pself, device_property_create_bucket.ID,
2094 &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
2096 if (parent_class->open_device) {
2097 parent_class->open_device(pself, device_name, device_type, device_node);
2101 static void s3_device_finalize(GObject * obj_self) {
2102 S3Device *self = S3_DEVICE (obj_self);
2105 if(G_OBJECT_CLASS(parent_class)->finalize)
2106 (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
2108 if (self->thread_pool_delete) {
2109 g_thread_pool_free(self->thread_pool_delete, 1, 1);
2110 self->thread_pool_delete = NULL;
2112 if (self->thread_pool_write) {
2113 g_thread_pool_free(self->thread_pool_write, 1, 1);
2114 self->thread_pool_write = NULL;
2116 if (self->thread_pool_read) {
2117 g_thread_pool_free(self->thread_pool_read, 1, 1);
2118 self->thread_pool_read = NULL;
2120 if (self->thread_idle_mutex) {
2121 g_mutex_free(self->thread_idle_mutex);
2122 self->thread_idle_mutex = NULL;
2124 if (self->thread_idle_cond) {
2125 g_cond_free(self->thread_idle_cond);
2126 self->thread_idle_cond = NULL;
2129 for (thread = 0; thread < self->nb_threads; thread++) {
2130 g_mutex_free(self->s3t[thread].now_mutex);
2131 if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3);
2132 g_free(self->s3t[thread].curl_buffer.buffer);
2136 if(self->bucket) g_free(self->bucket);
2137 if(self->prefix) g_free(self->prefix);
2138 if(self->access_key) g_free(self->access_key);
2139 if(self->secret_key) g_free(self->secret_key);
2140 if(self->swift_account_id) g_free(self->swift_account_id);
2141 if(self->swift_access_key) g_free(self->swift_access_key);
2142 if(self->username) g_free(self->username);
2143 if(self->password) g_free(self->password);
2144 if(self->tenant_id) g_free(self->tenant_id);
2145 if(self->tenant_name) g_free(self->tenant_name);
2146 if(self->host) g_free(self->host);
2147 if(self->service_path) g_free(self->service_path);
2148 if(self->user_token) g_free(self->user_token);
2149 if(self->bucket_location) g_free(self->bucket_location);
2150 if(self->storage_class) g_free(self->storage_class);
2151 if(self->server_side_encryption) g_free(self->server_side_encryption);
2152 if(self->proxy) g_free(self->proxy);
2153 if(self->ca_info) g_free(self->ca_info);
2156 static gboolean setup_handle(S3Device * self) {
2157 Device *d_self = DEVICE(self);
2159 guint response_code;
2160 s3_error_code_t s3_error_code;
2163 if (self->s3t == NULL) {
2164 if (self->s3_api == S3_API_S3) {
2165 if (self->access_key == NULL || self->access_key[0] == '\0') {
2166 device_set_error(d_self,
2167 g_strdup(_("No Amazon access key specified")),
2168 DEVICE_STATUS_DEVICE_ERROR);
2172 if (self->secret_key == NULL || self->secret_key[0] == '\0') {
2173 device_set_error(d_self,
2174 g_strdup(_("No Amazon secret key specified")),
2175 DEVICE_STATUS_DEVICE_ERROR);
2178 } else if (self->s3_api == S3_API_SWIFT_1) {
2179 if (self->swift_account_id == NULL ||
2180 self->swift_account_id[0] == '\0') {
2181 device_set_error(d_self,
2182 g_strdup(_("No Swift account id specified")),
2183 DEVICE_STATUS_DEVICE_ERROR);
2186 if (self->swift_access_key == NULL ||
2187 self->swift_access_key[0] == '\0') {
2188 device_set_error(d_self,
2189 g_strdup(_("No Swift access key specified")),
2190 DEVICE_STATUS_DEVICE_ERROR);
2193 } else if (self->s3_api == S3_API_SWIFT_2) {
2194 if (!((self->username && self->password && self->tenant_id) ||
2195 (self->username && self->password && self->tenant_name) ||
2196 (self->access_key && self->secret_key && self->tenant_id) ||
2197 (self->access_key && self->secret_key && self->tenant_name))) {
2198 device_set_error(d_self,
2199 g_strdup(_("Missing authorization properties")),
2200 DEVICE_STATUS_DEVICE_ERROR);
2203 } else if (self->s3_api == S3_API_OAUTH2) {
2204 if (self->client_id == NULL ||
2205 self->client_id[0] == '\0') {
2206 device_set_error(d_self,
2207 g_strdup(_("Missing client_id properties")),
2208 DEVICE_STATUS_DEVICE_ERROR);
2211 if (self->client_secret == NULL ||
2212 self->client_secret[0] == '\0') {
2213 device_set_error(d_self,
2214 g_strdup(_("Missing client_secret properties")),
2215 DEVICE_STATUS_DEVICE_ERROR);
2218 if (self->refresh_token == NULL ||
2219 self->refresh_token[0] == '\0') {
2220 device_set_error(d_self,
2221 g_strdup(_("Missing refresh_token properties")),
2222 DEVICE_STATUS_DEVICE_ERROR);
2225 if (self->project_id == NULL ||
2226 self->project_id[0] == '\0') {
2227 device_set_error(d_self,
2228 g_strdup(_("Missing project_id properties")),
2229 DEVICE_STATUS_DEVICE_ERROR);
2234 self->s3t = g_new0(S3_by_thread, self->nb_threads);
2235 if (self->s3t == NULL) {
2236 device_set_error(d_self,
2237 g_strdup(_("Can't allocate S3Handle array")),
2238 DEVICE_STATUS_DEVICE_ERROR);
2242 self->thread_idle_cond = g_cond_new();
2243 self->thread_idle_mutex = g_mutex_new();
2245 for (thread = 0; thread < self->nb_threads; thread++) {
2246 self->s3t[thread].idle = 1;
2247 self->s3t[thread].done = 1;
2248 self->s3t[thread].eof = FALSE;
2249 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
2250 self->s3t[thread].errmsg = NULL;
2251 self->s3t[thread].filename = NULL;
2252 self->s3t[thread].curl_buffer.buffer = NULL;
2253 self->s3t[thread].curl_buffer.buffer_len = 0;
2254 self->s3t[thread].now_mutex = g_mutex_new();
2255 self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key,
2256 self->swift_account_id,
2257 self->swift_access_key,
2258 self->host, self->service_path,
2259 self->use_subdomain,
2260 self->user_token, self->bucket_location,
2261 self->storage_class, self->ca_info,
2262 self->server_side_encryption,
2270 self->client_secret,
2271 self->refresh_token,
2272 self->reuse_connection);
2273 if (self->s3t[thread].s3 == NULL) {
2274 device_set_error(d_self,
2275 stralloc(_("Internal error creating S3 handle")),
2276 DEVICE_STATUS_DEVICE_ERROR);
2277 self->nb_threads = thread+1;
2282 g_debug("Create %d threads", self->nb_threads);
2283 self->thread_pool_delete = g_thread_pool_new(s3_thread_delete_block,
2284 self, self->nb_threads, 0,
2286 self->thread_pool_write = g_thread_pool_new(s3_thread_write_block, self,
2287 self->nb_threads, 0, NULL);
2288 self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self,
2289 self->nb_threads, 0, NULL);
2291 for (thread = 0; thread < self->nb_threads; thread++) {
2292 s3_verbose(self->s3t[thread].s3, self->verbose);
2294 if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
2295 device_set_error(d_self, g_strdup_printf(_(
2296 "Error setting S3 SSL/TLS use "
2297 "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
2298 DEVICE_STATUS_DEVICE_ERROR);
2302 if (self->max_send_speed &&
2303 !s3_set_max_send_speed(self->s3t[thread].s3,
2304 self->max_send_speed)) {
2305 device_set_error(d_self,
2306 g_strdup("Could not set S3 maximum send speed"),
2307 DEVICE_STATUS_DEVICE_ERROR);
2311 if (self->max_recv_speed &&
2312 !s3_set_max_recv_speed(self->s3t[thread].s3,
2313 self->max_recv_speed)) {
2314 device_set_error(d_self,
2315 g_strdup("Could not set S3 maximum recv speed"),
2316 DEVICE_STATUS_DEVICE_ERROR);
2321 for (thread = 0; thread < self->nb_threads; thread++) {
2322 if (!s3_open2(self->s3t[thread].s3)) {
2323 if (self->s3_api == S3_API_SWIFT_1 ||
2324 self->s3_api == S3_API_SWIFT_2) {
2325 s3_error(self->s3t[0].s3, NULL, &response_code,
2326 &s3_error_code, NULL, &curl_code, NULL);
2327 device_set_error(d_self,
2328 g_strdup_printf(_("s3_open2 failed: %s"),
2329 s3_strerror(self->s3t[0].s3)),
2330 DEVICE_STATUS_DEVICE_ERROR);
2331 self->nb_threads = thread+1;
2334 device_set_error(d_self,
2335 g_strdup("s3_open2 failed"),
2336 DEVICE_STATUS_DEVICE_ERROR);
2350 S3Device *self = S3_DEVICE(pself);
2351 guint response_code;
2352 s3_error_code_t s3_error_code;
2355 if (s3_is_bucket_exists(self->s3t[0].s3, self->bucket, self->project_id)) {
2359 s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, &curl_code, NULL);
2361 if (response_code == 0 && s3_error_code == 0 &&
2362 (curl_code == CURLE_COULDNT_CONNECT ||
2363 curl_code == CURLE_COULDNT_RESOLVE_HOST)) {
2364 device_set_error(pself,
2365 g_strdup_printf(_("While connecting to S3 bucket: %s"),
2366 s3_strerror(self->s3t[0].s3)),
2367 DEVICE_STATUS_DEVICE_ERROR);
2371 if (!self->create_bucket) {
2372 device_set_error(pself,
2373 g_strdup_printf(_("Can't list bucket: %s"),
2374 s3_strerror(self->s3t[0].s3)),
2375 DEVICE_STATUS_DEVICE_ERROR);
2379 if (!s3_make_bucket(self->s3t[0].s3, self->bucket, self->project_id)) {
2380 s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2382 /* if it isn't an expected error (bucket already exists),
2384 if (response_code != 409 ||
2385 (s3_error_code != S3_ERROR_BucketAlreadyExists &&
2386 s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) {
2387 device_set_error(pself,
2388 g_strdup_printf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)),
2389 DEVICE_STATUS_DEVICE_ERROR);
2396 static int progress_func(
2398 double dltotal G_GNUC_UNUSED,
2400 double ultotal G_GNUC_UNUSED,
2403 S3_by_thread *s3t = (S3_by_thread *)thread_data;
2405 g_mutex_lock(s3t->now_mutex);
2408 g_mutex_unlock(s3t->now_mutex);
2413 static DeviceStatusFlags
2414 s3_device_read_label(Device *pself) {
2415 S3Device *self = S3_DEVICE(pself);
2417 CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
2418 dumpfile_t *amanda_header;
2419 /* note that this may be called from s3_device_start, when
2420 * self->access_mode is not ACCESS_NULL */
2422 amfree(pself->volume_label);
2423 amfree(pself->volume_time);
2424 dumpfile_free(pself->volume_header);
2425 pself->volume_header = NULL;
2427 if (device_in_error(self)) return pself->status;
2429 if (!setup_handle(self)) {
2430 /* setup_handle already set our error message */
2431 return pself->status;
2435 key = special_file_to_key(self, "tapestart", -1);
2437 if (!make_bucket(pself)) {
2438 return pself->status;
2441 if (!s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) {
2442 guint response_code;
2443 s3_error_code_t s3_error_code;
2444 s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2446 /* if it's an expected error (not found), just return FALSE */
2447 if (response_code == 404 &&
2448 (s3_error_code == S3_ERROR_None ||
2449 s3_error_code == S3_ERROR_Unknown ||
2450 s3_error_code == S3_ERROR_NoSuchKey ||
2451 s3_error_code == S3_ERROR_NoSuchEntity ||
2452 s3_error_code == S3_ERROR_NoSuchBucket)) {
2453 g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)"));
2454 device_set_error(pself,
2455 stralloc(_("Amanda header not found -- unlabeled volume?")),
2456 DEVICE_STATUS_DEVICE_ERROR
2457 | DEVICE_STATUS_VOLUME_ERROR
2458 | DEVICE_STATUS_VOLUME_UNLABELED);
2459 return pself->status;
2462 /* otherwise, log it and return */
2463 device_set_error(pself,
2464 vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3t[0].s3)),
2465 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
2466 return pself->status;
2469 /* handle an empty file gracefully */
2470 if (buf.buffer_len == 0) {
2471 device_set_error(pself, stralloc(_("Empty header file")), DEVICE_STATUS_VOLUME_ERROR);
2472 return pself->status;
2475 pself->header_block_size = buf.buffer_len;
2476 g_assert(buf.buffer != NULL);
2477 amanda_header = g_new(dumpfile_t, 1);
2478 parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
2479 pself->volume_header = amanda_header;
2482 if (amanda_header->type != F_TAPESTART) {
2483 device_set_error(pself, stralloc(_("Invalid amanda header")), DEVICE_STATUS_VOLUME_ERROR);
2484 return pself->status;
2487 pself->volume_label = g_strdup(amanda_header->name);
2488 pself->volume_time = g_strdup(amanda_header->datestamp);
2489 /* pself->volume_header is already set */
2491 device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
2493 return pself->status;
2497 s3_device_start (Device * pself, DeviceAccessMode mode,
2498 char * label, char * timestamp) {
2501 guint64 total_size = 0;
2504 self = S3_DEVICE(pself);
2506 if (device_in_error(self)) return FALSE;
2508 if (!setup_handle(self)) {
2509 /* setup_handle already set our error message */
2514 pself->access_mode = mode;
2515 g_mutex_lock(pself->device_mutex);
2516 pself->in_file = FALSE;
2517 g_mutex_unlock(pself->device_mutex);
2519 /* try creating the bucket, in case it doesn't exist */
2520 if (!make_bucket(pself)) {
2524 /* take care of any dirty work for this mode */
2527 if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
2528 /* s3_device_read_label already set our error message */
2534 if (!delete_all_files(self)) {
2538 /* write a new amanda header */
2539 if (!write_amanda_header(self, label, timestamp)) {
2543 pself->volume_label = newstralloc(pself->volume_label, label);
2544 pself->volume_time = newstralloc(pself->volume_time, timestamp);
2546 /* unset the VOLUME_UNLABELED flag, if it was set */
2547 device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
2551 if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
2552 /* s3_device_read_label already set our error message */
2555 result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, NULL, &keys, &total_size);
2557 device_set_error(pself,
2558 vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
2559 DEVICE_STATUS_DEVICE_ERROR|DEVICE_STATUS_VOLUME_ERROR);
2562 self->volume_bytes = total_size;
2565 return seek_to_end(self);
2569 g_assert_not_reached();
2579 S3Device *self = S3_DEVICE(pself);
2583 /* we're not in a file anymore */
2584 pself->access_mode = ACCESS_NULL;
2586 if (device_in_error(pself)) return FALSE;
2591 /* functions for writing */
2595 s3_device_get_bytes_read(
2598 S3Device *self = S3_DEVICE(pself);
2602 g_mutex_unlock(pself->device_mutex);
2603 /* Add per thread */
2604 g_mutex_lock(self->thread_idle_mutex);
2605 dltotal = self->dltotal;
2606 for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2607 g_mutex_lock(self->s3t[thread].now_mutex);
2608 dltotal += self->s3t[thread].dlnow;
2609 g_mutex_unlock(self->s3t[thread].now_mutex);
2611 g_mutex_unlock(self->thread_idle_mutex);
2612 g_mutex_lock(pself->device_mutex);
2619 s3_device_get_bytes_written(
2622 S3Device *self = S3_DEVICE(pself);
2626 g_mutex_unlock(pself->device_mutex);
2627 /* Add per thread */
2628 g_mutex_lock(self->thread_idle_mutex);
2629 ultotal = self->ultotal;
2630 for (thread = 0; thread < self->nb_threads_backup; thread++) {
2631 g_mutex_lock(self->s3t[thread].now_mutex);
2632 ultotal += self->s3t[thread].ulnow;
2633 g_mutex_unlock(self->s3t[thread].now_mutex);
2635 g_mutex_unlock(self->thread_idle_mutex);
2636 g_mutex_lock(pself->device_mutex);
2643 s3_device_start_file (Device *pself, dumpfile_t *jobInfo) {
2644 S3Device *self = S3_DEVICE(pself);
2645 CurlBuffer amanda_header = {NULL, 0, 0, 0};
2651 if (device_in_error(self)) return FALSE;
2654 pself->is_eom = FALSE;
2656 /* Set the blocksize to zero, since there's no header to skip (it's stored
2657 * in a distinct file, rather than block zero) */
2658 jobInfo->blocksize = 0;
2660 /* Build the amanda header. */
2661 header_size = 0; /* no minimum size */
2662 amanda_header.buffer = device_build_amanda_header(pself, jobInfo,
2664 if (amanda_header.buffer == NULL) {
2665 device_set_error(pself,
2666 stralloc(_("Amanda file header won't fit in a single block!")),
2667 DEVICE_STATUS_DEVICE_ERROR);
2670 amanda_header.buffer_len = header_size;
2672 if(check_at_leom(self, header_size))
2673 pself->is_eom = TRUE;
2675 if(check_at_peom(self, header_size)) {
2676 pself->is_eom = TRUE;
2677 device_set_error(pself,
2678 stralloc(_("No space left on device")),
2679 DEVICE_STATUS_DEVICE_ERROR);
2680 g_free(amanda_header.buffer);
2684 for (thread = 0; thread < self->nb_threads; thread++) {
2685 self->s3t[thread].idle = 1;
2686 self->s3t[thread].ulnow = 0;
2689 /* set the file and block numbers correctly */
2690 pself->file = (pself->file > 0)? pself->file+1 : 1;
2692 g_mutex_lock(pself->device_mutex);
2693 pself->in_file = TRUE;
2694 pself->bytes_written = 0;
2695 g_mutex_unlock(pself->device_mutex);
2696 g_mutex_lock(self->thread_idle_mutex);
2698 g_mutex_unlock(self->thread_idle_mutex);
2699 /* write it out as a special block (not the 0th) */
2700 key = special_file_to_key(self, "filestart", pself->file);
2701 result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
2702 &amanda_header, NULL, NULL);
2703 g_free(amanda_header.buffer);
2706 device_set_error(pself,
2707 vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3t[0].s3)),
2708 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
2712 self->volume_bytes += header_size;
2718 s3_device_write_block (Device * pself, guint size, gpointer data) {
2720 S3Device * self = S3_DEVICE(pself);
2721 int idle_thread = 0;
2723 int first_idle = -1;
2725 g_assert (self != NULL);
2726 g_assert (data != NULL);
2727 if (device_in_error(self)) return FALSE;
2729 if(check_at_leom(self, size))
2730 pself->is_eom = TRUE;
2732 if(check_at_peom(self, size)) {
2733 pself->is_eom = TRUE;
2734 device_set_error(pself,
2735 stralloc(_("No space left on device")),
2736 DEVICE_STATUS_DEVICE_ERROR);
2740 filename = file_and_block_to_key(self, pself->file, pself->block);
2742 g_mutex_lock(self->thread_idle_mutex);
2743 while (!idle_thread) {
2745 for (thread = 0; thread < self->nb_threads_backup; thread++) {
2746 if (self->s3t[thread].idle == 1) {
2748 /* Check if the thread is in error */
2749 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
2750 device_set_error(pself, (char *)self->s3t[thread].errmsg,
2751 self->s3t[thread].errflags);
2752 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
2753 self->s3t[thread].errmsg = NULL;
2754 g_mutex_unlock(self->thread_idle_mutex);
2757 if (first_idle == -1) {
2758 first_idle = thread;
2764 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2767 thread = first_idle;
2769 self->s3t[thread].idle = 0;
2770 self->s3t[thread].done = 0;
2771 if (self->s3t[thread].curl_buffer.buffer &&
2772 self->s3t[thread].curl_buffer.buffer_len < size) {
2773 g_free((char *)self->s3t[thread].curl_buffer.buffer);
2774 self->s3t[thread].curl_buffer.buffer = NULL;
2775 self->s3t[thread].curl_buffer.buffer_len = 0;
2776 self->s3t[thread].buffer_len = 0;
2778 if (self->s3t[thread].curl_buffer.buffer == NULL) {
2779 self->s3t[thread].curl_buffer.buffer = g_malloc(size);
2780 self->s3t[thread].curl_buffer.buffer_len = size;
2781 self->s3t[thread].buffer_len = size;
2783 memcpy((char *)self->s3t[thread].curl_buffer.buffer, data, size);
2784 self->s3t[thread].curl_buffer.buffer_pos = 0;
2785 self->s3t[thread].curl_buffer.buffer_len = size;
2786 self->s3t[thread].curl_buffer.max_buffer_size = 0;
2787 self->s3t[thread].filename = filename;
2788 g_thread_pool_push(self->thread_pool_write, &self->s3t[thread], NULL);
2789 g_mutex_unlock(self->thread_idle_mutex);
2792 self->volume_bytes += size;
2797 s3_thread_write_block(
2798 gpointer thread_data,
2801 S3_by_thread *s3t = (S3_by_thread *)thread_data;
2802 Device *pself = (Device *)data;
2803 S3Device *self = S3_DEVICE(pself);
2806 result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename,
2807 S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer,
2808 progress_func, s3t);
2809 g_free((void *)s3t->filename);
2810 s3t->filename = NULL;
2812 s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
2813 s3t->errmsg = g_strdup_printf(_("While writing data block to S3: %s"), s3_strerror(s3t->s3));
2815 g_mutex_lock(self->thread_idle_mutex);
2819 self->ultotal += s3t->curl_buffer.buffer_len;
2820 s3t->curl_buffer.buffer_len = s3t->buffer_len;
2822 g_cond_broadcast(self->thread_idle_cond);
2823 g_mutex_unlock(self->thread_idle_mutex);
2827 s3_device_finish_file (Device * pself) {
2828 S3Device *self = S3_DEVICE(pself);
2830 /* Check all threads are done */
2831 int idle_thread = 0;
2834 g_mutex_lock(self->thread_idle_mutex);
2835 while (idle_thread != self->nb_threads) {
2837 for (thread = 0; thread < self->nb_threads; thread++) {
2838 if (self->s3t[thread].idle == 1) {
2841 /* check thread status */
2842 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
2843 device_set_error(pself, (char *)self->s3t[thread].errmsg,
2844 self->s3t[thread].errflags);
2845 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
2846 self->s3t[thread].errmsg = NULL;
2849 if (idle_thread != self->nb_threads) {
2850 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2854 g_mutex_unlock(self->thread_idle_mutex);
2856 if (device_in_error(pself)) return FALSE;
2858 /* we're not in a file anymore */
2859 g_mutex_lock(pself->device_mutex);
2860 pself->in_file = FALSE;
2861 pself->bytes_written = 0;;
2862 g_mutex_unlock(pself->device_mutex);
2868 s3_device_recycle_file(Device *pself, guint file) {
2869 S3Device *self = S3_DEVICE(pself);
2870 if (device_in_error(self)) return FALSE;
2873 delete_file(self, file);
2874 s3_wait_thread_delete(self);
2875 return !device_in_error(self);
2876 /* delete_file already set our error message if necessary */
2880 s3_device_erase(Device *pself) {
2881 S3Device *self = S3_DEVICE(pself);
2883 const char *errmsg = NULL;
2884 guint response_code;
2885 s3_error_code_t s3_error_code;
2887 if (!setup_handle(self)) {
2888 /* error set by setup_handle */
2893 key = special_file_to_key(self, "tapestart", -1);
2894 if (!s3_delete(self->s3t[0].s3, self->bucket, key)) {
2895 s3_error(self->s3t[0].s3, &errmsg, NULL, NULL, NULL, NULL, NULL);
2896 device_set_error(pself,
2898 DEVICE_STATUS_DEVICE_ERROR);
2903 dumpfile_free(pself->volume_header);
2904 pself->volume_header = NULL;
2906 if (!delete_all_files(self))
2909 device_set_error(pself, g_strdup("Unlabeled volume"),
2910 DEVICE_STATUS_VOLUME_UNLABELED);
2912 if (self->create_bucket &&
2913 !s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
2914 s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2917 * ignore the error if the bucket isn't empty (there may be data from elsewhere)
2918 * or the bucket not existing (already deleted perhaps?)
2921 (response_code == 409 && s3_error_code == S3_ERROR_BucketNotEmpty) ||
2922 (response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket))) {
2924 device_set_error(pself,
2926 DEVICE_STATUS_DEVICE_ERROR);
2930 self->volume_bytes = 0;
2934 /* functions for reading */
2937 s3_device_seek_file(Device *pself, guint file) {
2938 S3Device *self = S3_DEVICE(pself);
2941 CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
2942 dumpfile_t *amanda_header;
2943 const char *errmsg = NULL;
2946 if (device_in_error(self)) return NULL;
2951 pself->is_eof = FALSE;
2953 g_mutex_lock(pself->device_mutex);
2954 pself->in_file = FALSE;
2955 pself->bytes_read = 0;
2956 g_mutex_unlock(pself->device_mutex);
2957 self->next_block_to_read = 0;
2958 g_mutex_lock(self->thread_idle_mutex);
2960 g_mutex_unlock(self->thread_idle_mutex);
2963 key = special_file_to_key(self, "filestart", pself->file);
2964 result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS,
2969 guint response_code;
2970 s3_error_code_t s3_error_code;
2971 s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL);
2973 /* if it's an expected error (not found), check what to do. */
2974 if (response_code == 404 &&
2975 (s3_error_code == S3_ERROR_None ||
2976 s3_error_code == S3_ERROR_NoSuchKey ||
2977 s3_error_code == S3_ERROR_NoSuchEntity)) {
2979 next_file = find_next_file(self, pself->file);
2980 if (next_file > 0) {
2981 /* Note short-circut of dispatcher. */
2982 return s3_device_seek_file(pself, next_file);
2983 } else if (next_file == 0) {
2984 /* No next file. Check if we are one past the end. */
2985 key = special_file_to_key(self, "filestart", pself->file - 1);
2986 result = s3_read(self->s3t[0].s3, self->bucket, key,
2987 S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL);
2990 /* pself->file, etc. are already correct */
2991 return make_tapeend_header();
2993 device_set_error(pself,
2994 stralloc(_("Attempt to read past tape-end file")),
2995 DEVICE_STATUS_SUCCESS);
3000 /* An unexpected error occured finding out if we are the last file. */
3001 device_set_error(pself,
3003 DEVICE_STATUS_DEVICE_ERROR);
3008 /* and make a dumpfile_t out of it */
3009 g_assert(buf.buffer != NULL);
3010 amanda_header = g_new(dumpfile_t, 1);
3011 fh_init(amanda_header);
3012 parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
3015 switch (amanda_header->type) {
3017 case F_CONT_DUMPFILE:
3018 case F_SPLIT_DUMPFILE:
3022 device_set_error(pself,
3023 stralloc(_("Invalid amanda header while reading file header")),
3024 DEVICE_STATUS_VOLUME_ERROR);
3025 g_free(amanda_header);
3029 for (thread = 0; thread < self->nb_threads; thread++) {
3030 self->s3t[thread].idle = 1;
3031 self->s3t[thread].eof = FALSE;
3032 self->s3t[thread].ulnow = 0;
3034 g_mutex_lock(pself->device_mutex);
3035 pself->in_file = TRUE;
3036 g_mutex_unlock(pself->device_mutex);
3037 return amanda_header;
3041 s3_device_seek_block(Device *pself, guint64 block) {
3042 S3Device * self = S3_DEVICE(pself);
3043 if (device_in_error(pself)) return FALSE;
3046 pself->block = block;
3047 self->next_block_to_read = block;
3052 s3_device_read_block (Device * pself, gpointer data, int *size_req) {
3053 S3Device * self = S3_DEVICE(pself);
3058 g_assert (self != NULL);
3059 if (device_in_error(self)) return -1;
3061 g_mutex_lock(self->thread_idle_mutex);
3062 /* start a read ahead for each thread */
3063 for (thread = 0; thread < self->nb_threads_recovery; thread++) {
3064 S3_by_thread *s3t = &self->s3t[thread];
3066 key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
3067 s3t->filename = key;
3073 s3t->errflags = DEVICE_STATUS_SUCCESS;
3074 if (self->s3t[thread].curl_buffer.buffer &&
3075 (int)self->s3t[thread].curl_buffer.buffer_len < *size_req) {
3076 g_free(self->s3t[thread].curl_buffer.buffer);
3077 self->s3t[thread].curl_buffer.buffer = NULL;
3078 self->s3t[thread].curl_buffer.buffer_len = 0;
3079 self->s3t[thread].buffer_len = 0;
3081 if (!self->s3t[thread].curl_buffer.buffer) {
3082 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
3083 self->s3t[thread].curl_buffer.buffer_len = *size_req;
3084 self->s3t[thread].buffer_len = *size_req;
3086 s3t->curl_buffer.buffer_pos = 0;
3087 s3t->curl_buffer.max_buffer_size = S3_DEVICE_MAX_BLOCK_SIZE;
3088 self->next_block_to_read++;
3089 g_thread_pool_push(self->thread_pool_read, s3t, NULL);
3094 key = file_and_block_to_key(self, pself->file, pself->block);
3095 g_assert(key != NULL);
3097 /* find which thread read the key */
3098 for (thread = 0; thread < self->nb_threads_recovery; thread++) {
3100 s3t = &self->s3t[thread];
3103 strcmp(key, (char *)s3t->filename) == 0) {
3107 pself->is_eof = TRUE;
3108 g_mutex_lock(pself->device_mutex);
3109 pself->in_file = FALSE;
3110 g_mutex_unlock(pself->device_mutex);
3111 device_set_error(pself, stralloc(_("EOF")),
3112 DEVICE_STATUS_SUCCESS);
3113 g_mutex_unlock(self->thread_idle_mutex);
3115 } else if (s3t->errflags != DEVICE_STATUS_SUCCESS) {
3116 /* return the error */
3117 device_set_error(pself, (char *)s3t->errmsg, s3t->errflags);
3119 g_mutex_unlock(self->thread_idle_mutex);
3122 } else if ((guint)*size_req >= s3t->curl_buffer.buffer_pos) {
3123 /* return the buffer */
3124 g_mutex_unlock(self->thread_idle_mutex);
3125 memcpy(data, s3t->curl_buffer.buffer,
3126 s3t->curl_buffer.buffer_pos);
3127 *size_req = s3t->curl_buffer.buffer_pos;
3130 g_free((char *)s3t->filename);
3133 g_mutex_lock(self->thread_idle_mutex);
3135 } else { /* buffer not enough large */
3136 *size_req = s3t->curl_buffer.buffer_len;
3138 g_mutex_unlock(self->thread_idle_mutex);
3144 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
3148 /* start a read ahead for the thread */
3149 for (thread = 0; thread < self->nb_threads_recovery; thread++) {
3150 S3_by_thread *s3t = &self->s3t[thread];
3152 key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
3153 s3t->filename = key;
3159 s3t->errflags = DEVICE_STATUS_SUCCESS;
3160 if (!self->s3t[thread].curl_buffer.buffer) {
3161 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
3162 self->s3t[thread].curl_buffer.buffer_len = *size_req;
3164 s3t->curl_buffer.buffer_pos = 0;
3165 self->next_block_to_read++;
3166 g_thread_pool_push(self->thread_pool_read, s3t, NULL);
3169 g_mutex_unlock(self->thread_idle_mutex);
3176 s3_thread_read_block(
3177 gpointer thread_data,
3180 S3_by_thread *s3t = (S3_by_thread *)thread_data;
3181 Device *pself = (Device *)data;
3182 S3Device *self = S3_DEVICE(pself);
3185 result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename,
3186 S3_BUFFER_WRITE_FUNCS,
3187 (CurlBuffer *)&s3t->curl_buffer, progress_func, s3t);
3189 g_mutex_lock(self->thread_idle_mutex);
3191 guint response_code;
3192 s3_error_code_t s3_error_code;
3193 s3_error(s3t->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
3194 /* if it's an expected error (not found), just return -1 */
3195 if (response_code == 404 &&
3196 (s3_error_code == S3_ERROR_None ||
3197 s3_error_code == S3_ERROR_Unknown ||
3198 s3_error_code == S3_ERROR_NoSuchKey ||
3199 s3_error_code == S3_ERROR_NoSuchEntity)) {
3203 /* otherwise, log it and return FALSE */
3204 s3t->errflags = DEVICE_STATUS_VOLUME_ERROR;
3205 s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"),
3206 s3_strerror(s3t->s3));
3209 self->dltotal += s3t->curl_buffer.buffer_len;
3214 g_cond_broadcast(self->thread_idle_cond);
3215 g_mutex_unlock(self->thread_idle_mutex);
3221 check_at_peom(S3Device *self, guint64 size)
3223 if(self->enforce_volume_limit && (self->volume_limit > 0)) {
3224 guint64 newtotal = self->volume_bytes + size;
3225 if(newtotal > self->volume_limit) {
3233 check_at_leom(S3Device *self, guint64 size)
3235 guint64 block_size = DEVICE(self)->block_size;
3236 guint64 eom_warning_buffer = block_size *
3237 (EOM_EARLY_WARNING_ZONE_BLOCKS + self->nb_threads);
3242 if(self->enforce_volume_limit && (self->volume_limit > 0)) {
3243 guint64 newtotal = self->volume_bytes + size + eom_warning_buffer;
3244 if(newtotal > self->volume_limit) {
3258 if (self->thread_idle_mutex) {
3259 g_mutex_lock(self->thread_idle_mutex);
3260 while(nb_done != self->nb_threads) {
3262 for (thread = 0; thread < self->nb_threads; thread++) {
3263 if (self->s3t[thread].done == 1)
3266 if (nb_done != self->nb_threads) {
3267 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
3270 g_mutex_unlock(self->thread_idle_mutex);