2 * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 as published
6 * by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21 /* An S3 device uses Amazon's S3 service (http://www.amazon.com/s3) to store
22 * data. It stores data in keys named with a user-specified prefix, inside a
23 * user-specified bucket. Data is stored in the form of numbered (large)
29 #include <sys/types.h>
39 #include <curl/curl.h>
40 #ifdef HAVE_OPENSSL_HMAC_H
41 # include <openssl/hmac.h>
43 # ifdef HAVE_CRYPTO_HMAC_H
44 # include <crypto/hmac.h>
53 * Type checking and casting macros
55 #define TYPE_S3_DEVICE (s3_device_get_type())
56 #define S3_DEVICE(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device)
57 #define S3_DEVICE_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device const)
58 #define S3_DEVICE_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), s3_device_get_type(), S3DeviceClass)
59 #define IS_S3_DEVICE(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), s3_device_get_type ())
61 #define S3_DEVICE_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), s3_device_get_type(), S3DeviceClass)
62 static GType s3_device_get_type (void);
65 * Main object structure
67 typedef struct _S3MetadataFile S3MetadataFile;
69 typedef struct _S3_by_thread S3_by_thread;
70 struct _S3_by_thread {
71 S3Handle * volatile s3;
72 CurlBuffer volatile curl_buffer;
73 guint volatile buffer_len;
77 char volatile * volatile filename;
78 DeviceStatusFlags volatile errflags; /* device_status */
79 char volatile * volatile errmsg; /* device error message */
82 typedef struct _S3Device S3Device;
86 /* The "easy" curl handle we use to access Amazon S3 */
89 /* S3 access information */
93 /* The S3 access information. */
98 /* The Openstack swift information. */
99 char *swift_account_id;
100 char *swift_access_key;
102 char *bucket_location;
106 char *server_side_encryption;
110 /* a cache for unsuccessful reads (where we get the file but the caller
111 * doesn't have space for it or doesn't want it), where we expect the
112 * next call will request the same file.
118 /* Produce verbose output? */
123 gboolean openstack_swift_api;
126 guint64 max_send_speed;
127 guint64 max_recv_speed;
130 guint64 volume_bytes;
131 guint64 volume_limit;
132 gboolean enforce_volume_limit;
133 gboolean use_subdomain;
136 int nb_threads_backup;
137 int nb_threads_recovery;
138 GThreadPool *thread_pool_delete;
139 GThreadPool *thread_pool_write;
140 GThreadPool *thread_pool_read;
141 GCond *thread_idle_cond;
142 GMutex *thread_idle_mutex;
143 int next_block_to_read;
150 typedef struct _S3DeviceClass S3DeviceClass;
151 struct _S3DeviceClass {
152 DeviceClass __parent__;
157 * Constants and static data
160 #define S3_DEVICE_NAME "s3"
162 /* Maximum key length as specified in the S3 documentation
163 * (*excluding* null terminator) */
164 #define S3_MAX_KEY_LENGTH 1024
166 /* Note: for compatability, min can only be decreased and max increased */
167 #define S3_DEVICE_MIN_BLOCK_SIZE 1024
168 #define S3_DEVICE_MAX_BLOCK_SIZE (3*1024*1024*1024ULL)
169 #define S3_DEVICE_DEFAULT_BLOCK_SIZE (10*1024*1024)
170 #define EOM_EARLY_WARNING_ZONE_BLOCKS 4
172 /* This goes in lieu of file number for metadata. */
173 #define SPECIAL_INFIX "special-"
175 /* pointer to the class of our parent */
176 static DeviceClass *parent_class = NULL;
179 * device-specific properties
182 /* Authentication information for Amazon S3. Both of these are strings. */
183 static DevicePropertyBase device_property_s3_access_key;
184 static DevicePropertyBase device_property_s3_secret_key;
185 #define PROPERTY_S3_SECRET_KEY (device_property_s3_secret_key.ID)
186 #define PROPERTY_S3_ACCESS_KEY (device_property_s3_access_key.ID)
188 /* Authentication information for Openstack Swift. Both of these are strings. */
189 static DevicePropertyBase device_property_swift_account_id;
190 static DevicePropertyBase device_property_swift_access_key;
191 #define PROPERTY_SWIFT_ACCOUNT_ID (device_property_swift_account_id.ID)
192 #define PROPERTY_SWIFT_ACCESS_KEY (device_property_swift_access_key.ID)
195 static DevicePropertyBase device_property_s3_host;
196 static DevicePropertyBase device_property_s3_service_path;
197 #define PROPERTY_S3_HOST (device_property_s3_host.ID)
198 #define PROPERTY_S3_SERVICE_PATH (device_property_s3_service_path.ID)
200 /* Same, but for S3 with DevPay. */
201 static DevicePropertyBase device_property_s3_user_token;
202 #define PROPERTY_S3_USER_TOKEN (device_property_s3_user_token.ID)
204 /* Location constraint for new buckets created on Amazon S3. */
205 static DevicePropertyBase device_property_s3_bucket_location;
206 #define PROPERTY_S3_BUCKET_LOCATION (device_property_s3_bucket_location.ID)
209 static DevicePropertyBase device_property_s3_storage_class;
210 #define PROPERTY_S3_STORAGE_CLASS (device_property_s3_storage_class.ID)
213 static DevicePropertyBase device_property_s3_server_side_encryption;
214 #define PROPERTY_S3_SERVER_SIDE_ENCRYPTION (device_property_s3_server_side_encryption.ID)
216 /* Path to certificate authority certificate */
217 static DevicePropertyBase device_property_ssl_ca_info;
218 #define PROPERTY_SSL_CA_INFO (device_property_ssl_ca_info.ID)
220 /* Whether to use openstack protocol. */
221 static DevicePropertyBase device_property_openstack_swift_api;
222 #define PROPERTY_OPENSTACK_SWIFT_API (device_property_openstack_swift_api.ID)
224 /* Whether to use SSL with Amazon S3. */
225 static DevicePropertyBase device_property_s3_ssl;
226 #define PROPERTY_S3_SSL (device_property_s3_ssl.ID)
228 /* Speed limits for sending and receiving */
229 static DevicePropertyBase device_property_max_send_speed;
230 static DevicePropertyBase device_property_max_recv_speed;
231 #define PROPERTY_MAX_SEND_SPEED (device_property_max_send_speed.ID)
232 #define PROPERTY_MAX_RECV_SPEED (device_property_max_recv_speed.ID)
234 /* Whether to use subdomain */
235 static DevicePropertyBase device_property_s3_subdomain;
236 #define PROPERTY_S3_SUBDOMAIN (device_property_s3_subdomain.ID)
238 /* Number of threads to use */
239 static DevicePropertyBase device_property_nb_threads_backup;
240 #define PROPERTY_NB_THREADS_BACKUP (device_property_nb_threads_backup.ID)
241 static DevicePropertyBase device_property_nb_threads_recovery;
242 #define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID)
248 void s3_device_register(void);
251 * utility functions */
253 /* Given file and block numbers, return an S3 key.
255 * @param self: the S3Device object
256 * @param file: the file number
257 * @param block: the block within that file
258 * @returns: a newly allocated string containing an S3 key.
261 file_and_block_to_key(S3Device *self,
265 /* Given the name of a special file (such as 'tapestart'), generate
266 * the S3 key to use for that file.
268 * @param self: the S3Device object
269 * @param special_name: name of the special file
270 * @param file: a file number to include; omitted if -1
271 * @returns: a newly alocated string containing an S3 key.
274 special_file_to_key(S3Device *self,
277 /* Write an amanda header file to S3.
279 * @param self: the S3Device object
280 * @param label: the volume label
281 * @param timestamp: the volume timestamp
284 write_amanda_header(S3Device *self,
288 /* "Fast forward" this device to the end by looking up the largest file number
289 * present and setting the current file number one greater.
291 * @param self: the S3Device object
294 seek_to_end(S3Device *self);
296 /* Find the number of the last file that contains any data (even just a header).
298 * @param self: the S3Device object
299 * @returns: the last file, or -1 in event of an error
302 find_last_file(S3Device *self);
304 /* Delete all blocks in the given file, including the filestart block
306 * @param self: the S3Device object
307 * @param file: the file to delete
310 delete_file(S3Device *self,
314 /* Delete all files in the given device
316 * @param self: the S3Device object
319 delete_all_files(S3Device *self);
321 /* Set up self->s3t as best as possible.
323 * The return value is TRUE iff self->s3t is useable.
325 * @param self: the S3Device object
326 * @returns: TRUE if the handle is set up
329 setup_handle(S3Device * self);
332 s3_wait_thread_delete(S3Device *self);
338 s3_device_init(S3Device * o);
341 s3_device_class_init(S3DeviceClass * c);
344 s3_device_finalize(GObject * o);
347 s3_device_factory(char * device_name, char * device_type, char * device_node);
350 * Property{Get,Set}Fns */
352 static gboolean s3_device_set_access_key_fn(Device *self,
353 DevicePropertyBase *base, GValue *val,
354 PropertySurety surety, PropertySource source);
356 static gboolean s3_device_set_secret_key_fn(Device *self,
357 DevicePropertyBase *base, GValue *val,
358 PropertySurety surety, PropertySource source);
360 static gboolean s3_device_set_swift_account_id_fn(Device *self,
361 DevicePropertyBase *base, GValue *val,
362 PropertySurety surety, PropertySource source);
364 static gboolean s3_device_set_swift_access_key_fn(Device *self,
365 DevicePropertyBase *base, GValue *val,
366 PropertySurety surety, PropertySource source);
368 static gboolean s3_device_set_user_token_fn(Device *self,
369 DevicePropertyBase *base, GValue *val,
370 PropertySurety surety, PropertySource source);
372 static gboolean s3_device_set_bucket_location_fn(Device *self,
373 DevicePropertyBase *base, GValue *val,
374 PropertySurety surety, PropertySource source);
376 static gboolean s3_device_set_storage_class_fn(Device *self,
377 DevicePropertyBase *base, GValue *val,
378 PropertySurety surety, PropertySource source);
380 static gboolean s3_device_set_server_side_encryption_fn(Device *self,
381 DevicePropertyBase *base, GValue *val,
382 PropertySurety surety, PropertySource source);
384 static gboolean s3_device_set_ca_info_fn(Device *self,
385 DevicePropertyBase *base, GValue *val,
386 PropertySurety surety, PropertySource source);
388 static gboolean s3_device_set_verbose_fn(Device *self,
389 DevicePropertyBase *base, GValue *val,
390 PropertySurety surety, PropertySource source);
392 static gboolean s3_device_set_openstack_swift_api_fn(Device *self,
393 DevicePropertyBase *base, GValue *val,
394 PropertySurety surety, PropertySource source);
396 static gboolean s3_device_set_ssl_fn(Device *self,
397 DevicePropertyBase *base, GValue *val,
398 PropertySurety surety, PropertySource source);
400 static gboolean s3_device_set_max_send_speed_fn(Device *self,
401 DevicePropertyBase *base, GValue *val,
402 PropertySurety surety, PropertySource source);
404 static gboolean s3_device_set_max_recv_speed_fn(Device *self,
405 DevicePropertyBase *base, GValue *val,
406 PropertySurety surety, PropertySource source);
408 static gboolean s3_device_set_nb_threads_backup(Device *self,
409 DevicePropertyBase *base, GValue *val,
410 PropertySurety surety, PropertySource source);
412 static gboolean s3_device_set_nb_threads_recovery(Device *self,
413 DevicePropertyBase *base, GValue *val,
414 PropertySurety surety, PropertySource source);
416 static gboolean s3_device_set_max_volume_usage_fn(Device *p_self,
417 DevicePropertyBase *base, GValue *val,
418 PropertySurety surety, PropertySource source);
420 static gboolean property_set_leom_fn(Device *p_self,
421 DevicePropertyBase *base, GValue *val,
422 PropertySurety surety, PropertySource source);
424 static gboolean s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
425 DevicePropertyBase *base, GValue *val,
426 PropertySurety surety, PropertySource source);
428 static gboolean s3_device_set_use_subdomain_fn(Device *p_self,
429 DevicePropertyBase *base, GValue *val,
430 PropertySurety surety, PropertySource source);
432 static gboolean s3_device_set_host_fn(Device *p_self,
433 DevicePropertyBase *base, GValue *val,
434 PropertySurety surety, PropertySource source);
436 static gboolean s3_device_set_service_path_fn(Device *p_self,
437 DevicePropertyBase *base, GValue *val,
438 PropertySurety surety, PropertySource source);
440 static void s3_thread_read_block(gpointer thread_data,
442 static void s3_thread_write_block(gpointer thread_data,
444 static gboolean make_bucket(Device * pself);
447 /* Wait that all threads are done */
448 static void reset_thread(S3Device *self);
451 * virtual functions */
454 s3_device_open_device(Device *pself, char *device_name,
455 char * device_type, char * device_node);
457 static DeviceStatusFlags s3_device_read_label(Device * self);
460 s3_device_start(Device * self,
461 DeviceAccessMode mode,
466 s3_device_finish(Device * self);
469 s3_device_start_file(Device * self,
470 dumpfile_t * jobInfo);
473 s3_device_write_block(Device * self,
478 s3_device_finish_file(Device * self);
481 s3_device_seek_file(Device *pself,
485 s3_device_seek_block(Device *pself,
489 s3_device_read_block(Device * pself,
494 s3_device_recycle_file(Device *pself,
498 s3_device_erase(Device *pself);
501 check_at_leom(S3Device *self,
505 check_at_peom(S3Device *self,
513 file_and_block_to_key(S3Device *self,
517 char *s3_key = g_strdup_printf("%sf%08x-b%016llx.data",
518 self->prefix, file, (long long unsigned int)block);
519 g_assert(strlen(s3_key) <= S3_MAX_KEY_LENGTH);
524 special_file_to_key(S3Device *self,
529 return g_strdup_printf("%s" SPECIAL_INFIX "%s", self->prefix, special_name);
531 return g_strdup_printf("%sf%08x-%s", self->prefix, file, special_name);
535 write_amanda_header(S3Device *self,
539 CurlBuffer amanda_header = {NULL, 0, 0, 0};
542 dumpfile_t * dumpinfo = NULL;
543 Device *d_self = DEVICE(self);
546 /* build the header */
547 header_size = 0; /* no minimum size */
548 dumpinfo = make_tapestart_header(DEVICE(self), label, timestamp);
549 amanda_header.buffer = device_build_amanda_header(DEVICE(self), dumpinfo,
551 if (amanda_header.buffer == NULL) {
552 device_set_error(d_self,
553 stralloc(_("Amanda tapestart header won't fit in a single block!")),
554 DEVICE_STATUS_DEVICE_ERROR);
555 dumpfile_free(dumpinfo);
556 g_free(amanda_header.buffer);
560 if(check_at_leom(self, header_size))
561 d_self->is_eom = TRUE;
563 if(check_at_peom(self, header_size)) {
564 d_self->is_eom = TRUE;
565 device_set_error(d_self,
566 stralloc(_("No space left on device")),
567 DEVICE_STATUS_DEVICE_ERROR);
568 g_free(amanda_header.buffer);
572 /* write out the header and flush the uploads. */
573 key = special_file_to_key(self, "tapestart", -1);
574 g_assert(header_size < G_MAXUINT); /* for cast to guint */
575 amanda_header.buffer_len = (guint)header_size;
576 result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
577 &amanda_header, NULL, NULL);
578 g_free(amanda_header.buffer);
582 device_set_error(d_self,
583 vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3t[0].s3)),
584 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
585 dumpfile_free(dumpinfo);
587 dumpfile_free(d_self->volume_header);
588 d_self->volume_header = dumpinfo;
589 self->volume_bytes += header_size;
591 d_self->header_block_size = header_size;
596 seek_to_end(S3Device *self) {
599 Device *pself = DEVICE(self);
601 last_file = find_last_file(self);
605 pself->file = last_file;
610 /* Convert an object name into a file number, assuming the given prefix
611 * length. Returns -1 if the object name is invalid, or 0 if the object name
612 * is a "special" key. */
613 static int key_to_file(guint prefix_len, const char * key) {
617 /* skip the prefix */
618 if (strlen(key) <= prefix_len)
623 if (strncmp(key, SPECIAL_INFIX, strlen(SPECIAL_INFIX)) == 0) {
627 /* check that key starts with 'f' */
632 /* check that key is of the form "%08x-" */
633 for (i = 0; i < 8; i++) {
634 if (!(key[i] >= '0' && key[i] <= '9') &&
635 !(key[i] >= 'a' && key[i] <= 'f') &&
636 !(key[i] >= 'A' && key[i] <= 'F')) break;
638 if (key[i] != '-') return -1;
639 if (i < 8) return -1;
641 /* convert the file number */
643 file = strtoul(key, NULL, 16);
645 g_warning(_("unparseable file number '%s'"), key);
652 /* Find the number of the last file that contains any data (even just a header).
653 * Returns -1 in event of an error
656 find_last_file(S3Device *self) {
659 unsigned int prefix_len = strlen(self->prefix);
661 Device *d_self = DEVICE(self);
663 /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
664 result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-", &keys, NULL);
666 device_set_error(d_self,
667 vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
668 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
672 for (; keys; keys = g_slist_remove(keys, keys->data)) {
673 int file = key_to_file(prefix_len, keys->data);
675 /* and if it's the last, keep it */
676 if (file > last_file)
683 /* Find the number of the file following the requested one, if any.
684 * Returns 0 if there is no such file or -1 in event of an error
687 find_next_file(S3Device *self, int last_file) {
690 unsigned int prefix_len = strlen(self->prefix);
692 Device *d_self = DEVICE(self);
694 /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
695 result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-",
698 device_set_error(d_self,
699 vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
700 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
704 for (; keys; keys = g_slist_remove(keys, keys->data)) {
707 file = key_to_file(prefix_len, (char*)keys->data);
710 /* Set this in case we don't find a next file; this is not a
711 * hard error, so if we can find a next file we'll return that
716 if (file < next_file && file > last_file) {
725 delete_file(S3Device *self,
732 guint64 total_size = 0;
733 Device *d_self = DEVICE(self);
736 my_prefix = g_strdup_printf("%sf", self->prefix);
738 my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
741 result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL, &keys,
744 device_set_error(d_self,
745 vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
746 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
750 g_mutex_lock(self->thread_idle_mutex);
754 self->keys = g_slist_concat(self->keys, keys);
758 for (thread = 0; thread < self->nb_threads; thread++) {
759 if (self->s3t[thread].idle == 1) {
760 /* Check if the thread is in error */
761 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
762 device_set_error(d_self,
763 (char *)self->s3t[thread].errmsg,
764 self->s3t[thread].errflags);
765 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
766 self->s3t[thread].errmsg = NULL;
767 g_mutex_unlock(self->thread_idle_mutex);
768 s3_wait_thread_delete(self);
771 self->s3t[thread].idle = 0;
772 self->s3t[thread].done = 0;
773 g_thread_pool_push(self->thread_pool_delete, &self->s3t[thread],
777 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
778 g_mutex_unlock(self->thread_idle_mutex);
780 self->volume_bytes = total_size;
782 s3_wait_thread_delete(self);
788 s3_thread_delete_block(
789 gpointer thread_data,
792 static int count = 0;
793 S3_by_thread *s3t = (S3_by_thread *)thread_data;
794 Device *pself = (Device *)data;
795 S3Device *self = S3_DEVICE(pself);
799 g_mutex_lock(self->thread_idle_mutex);
800 while (result && self->keys) {
801 filename = self->keys->data;
802 self->keys = g_slist_remove(self->keys, self->keys->data);
805 g_debug("Deleting %s ...", filename);
808 g_mutex_unlock(self->thread_idle_mutex);
809 result = s3_delete(s3t->s3, (const char *)self->bucket, (const char *)filename);
811 s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
812 s3t->errmsg = g_strdup_printf(_("While deleting key '%s': %s"),
813 filename, s3_strerror(s3t->s3));
816 g_mutex_lock(self->thread_idle_mutex);
820 g_cond_broadcast(self->thread_idle_cond);
821 g_mutex_unlock(self->thread_idle_mutex);
825 s3_wait_thread_delete(S3Device *self)
827 Device *d_self = (Device *)self;
831 g_mutex_lock(self->thread_idle_mutex);
832 while (idle_thread != self->nb_threads) {
834 for (thread = 0; thread < self->nb_threads; thread++) {
835 if (self->s3t[thread].idle == 1) {
838 /* Check if the thread is in error */
839 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
840 device_set_error(d_self, (char *)self->s3t[thread].errmsg,
841 self->s3t[thread].errflags);
842 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
843 self->s3t[thread].errmsg = NULL;
846 if (idle_thread != self->nb_threads) {
847 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
850 g_mutex_unlock(self->thread_idle_mutex);
855 delete_all_files(S3Device *self)
857 return delete_file(self, -1);
865 s3_device_register(void)
867 static const char * device_prefix_list[] = { S3_DEVICE_NAME, NULL };
870 /* set up our properties */
871 device_property_fill_and_register(&device_property_s3_secret_key,
872 G_TYPE_STRING, "s3_secret_key",
873 "Secret access key to authenticate with Amazon S3");
874 device_property_fill_and_register(&device_property_s3_access_key,
875 G_TYPE_STRING, "s3_access_key",
876 "Access key ID to authenticate with Amazon S3");
877 device_property_fill_and_register(&device_property_swift_account_id,
878 G_TYPE_STRING, "swift_account_id",
879 "Account ID to authenticate with openstack swift");
880 device_property_fill_and_register(&device_property_swift_access_key,
881 G_TYPE_STRING, "swift_access_key",
882 "Access key to authenticate with openstack swift");
883 device_property_fill_and_register(&device_property_s3_host,
884 G_TYPE_STRING, "s3_host",
885 "hostname:port of the server");
886 device_property_fill_and_register(&device_property_s3_service_path,
887 G_TYPE_STRING, "s3_service_path",
888 "path to add in the url");
889 device_property_fill_and_register(&device_property_s3_user_token,
890 G_TYPE_STRING, "s3_user_token",
891 "User token for authentication Amazon devpay requests");
892 device_property_fill_and_register(&device_property_s3_bucket_location,
893 G_TYPE_STRING, "s3_bucket_location",
894 "Location constraint for buckets on Amazon S3");
895 device_property_fill_and_register(&device_property_s3_storage_class,
896 G_TYPE_STRING, "s3_storage_class",
897 "Storage class as specified by Amazon (STANDARD or REDUCED_REDUNDANCY)");
898 device_property_fill_and_register(&device_property_s3_server_side_encryption,
899 G_TYPE_STRING, "s3_server_side_encryption",
900 "Serve side encryption as specified by Amazon (AES256)");
901 device_property_fill_and_register(&device_property_ssl_ca_info,
902 G_TYPE_STRING, "ssl_ca_info",
903 "Path to certificate authority certificate");
904 device_property_fill_and_register(&device_property_openstack_swift_api,
905 G_TYPE_BOOLEAN, "openstack_swift_api",
906 "Whether to use openstack protocol");
907 device_property_fill_and_register(&device_property_s3_ssl,
908 G_TYPE_BOOLEAN, "s3_ssl",
909 "Whether to use SSL with Amazon S3");
910 device_property_fill_and_register(&device_property_s3_subdomain,
911 G_TYPE_BOOLEAN, "s3_subdomain",
912 "Whether to use subdomain");
913 device_property_fill_and_register(&device_property_max_send_speed,
914 G_TYPE_UINT64, "max_send_speed",
915 "Maximum average upload speed (bytes/sec)");
916 device_property_fill_and_register(&device_property_max_recv_speed,
917 G_TYPE_UINT64, "max_recv_speed",
918 "Maximum average download speed (bytes/sec)");
919 device_property_fill_and_register(&device_property_nb_threads_backup,
920 G_TYPE_UINT64, "nb_threads_backup",
921 "Number of writer thread");
922 device_property_fill_and_register(&device_property_nb_threads_recovery,
923 G_TYPE_UINT64, "nb_threads_recovery",
924 "Number of reader thread");
926 /* register the device itself */
927 register_device(s3_device_factory, device_prefix_list);
931 s3_device_get_type(void)
933 static GType type = 0;
935 if G_UNLIKELY(type == 0) {
936 static const GTypeInfo info = {
937 sizeof (S3DeviceClass),
938 (GBaseInitFunc) NULL,
939 (GBaseFinalizeFunc) NULL,
940 (GClassInitFunc) s3_device_class_init,
941 (GClassFinalizeFunc) NULL,
942 NULL /* class_data */,
945 (GInstanceInitFunc) s3_device_init,
949 type = g_type_register_static (TYPE_DEVICE, "S3Device", &info,
957 s3_device_init(S3Device * self)
959 Device * dself = DEVICE(self);
962 self->volume_bytes = 0;
963 self->volume_limit = 0;
965 self->enforce_volume_limit = FALSE;
966 self->use_subdomain = FALSE;
967 self->nb_threads = 1;
968 self->nb_threads_backup = 1;
969 self->nb_threads_recovery = 1;
970 self->thread_pool_delete = NULL;
971 self->thread_pool_write = NULL;
972 self->thread_pool_read = NULL;
973 self->thread_idle_cond = NULL;
974 self->thread_idle_mutex = NULL;
976 /* Register property values
977 * Note: Some aren't added until s3_device_open_device()
979 bzero(&response, sizeof(response));
981 g_value_init(&response, CONCURRENCY_PARADIGM_TYPE);
982 g_value_set_enum(&response, CONCURRENCY_PARADIGM_SHARED_READ);
983 device_set_simple_property(dself, PROPERTY_CONCURRENCY,
984 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
985 g_value_unset(&response);
987 g_value_init(&response, STREAMING_REQUIREMENT_TYPE);
988 g_value_set_enum(&response, STREAMING_REQUIREMENT_NONE);
989 device_set_simple_property(dself, PROPERTY_STREAMING,
990 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
991 g_value_unset(&response);
993 g_value_init(&response, G_TYPE_BOOLEAN);
994 g_value_set_boolean(&response, TRUE);
995 device_set_simple_property(dself, PROPERTY_APPENDABLE,
996 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
997 g_value_unset(&response);
999 g_value_init(&response, G_TYPE_BOOLEAN);
1000 g_value_set_boolean(&response, TRUE);
1001 device_set_simple_property(dself, PROPERTY_PARTIAL_DELETION,
1002 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1003 g_value_unset(&response);
1005 g_value_init(&response, G_TYPE_BOOLEAN);
1006 g_value_set_boolean(&response, TRUE);
1007 device_set_simple_property(dself, PROPERTY_FULL_DELETION,
1008 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1009 g_value_unset(&response);
1011 g_value_init(&response, G_TYPE_BOOLEAN);
1012 g_value_set_boolean(&response, TRUE); /* well, there *is* no EOM on S3 .. */
1013 device_set_simple_property(dself, PROPERTY_LEOM,
1014 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1015 g_value_unset(&response);
1017 g_value_init(&response, G_TYPE_BOOLEAN);
1018 g_value_set_boolean(&response, FALSE);
1019 device_set_simple_property(dself, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
1020 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1021 g_value_unset(&response);
1023 g_value_init(&response, G_TYPE_BOOLEAN);
1024 g_value_set_boolean(&response, FALSE);
1025 device_set_simple_property(dself, PROPERTY_S3_SUBDOMAIN,
1026 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1027 g_value_unset(&response);
1029 g_value_init(&response, G_TYPE_BOOLEAN);
1030 g_value_set_boolean(&response, FALSE);
1031 device_set_simple_property(dself, PROPERTY_COMPRESSION,
1032 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1033 g_value_unset(&response);
1035 g_value_init(&response, MEDIA_ACCESS_MODE_TYPE);
1036 g_value_set_enum(&response, MEDIA_ACCESS_MODE_READ_WRITE);
1037 device_set_simple_property(dself, PROPERTY_MEDIUM_ACCESS_TYPE,
1038 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
1039 g_value_unset(&response);
1044 s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED)
1046 GObjectClass *g_object_class = (GObjectClass*) c;
1047 DeviceClass *device_class = (DeviceClass *)c;
1049 parent_class = g_type_class_ref (TYPE_DEVICE);
1051 device_class->open_device = s3_device_open_device;
1052 device_class->read_label = s3_device_read_label;
1053 device_class->start = s3_device_start;
1054 device_class->finish = s3_device_finish;
1056 device_class->start_file = s3_device_start_file;
1057 device_class->write_block = s3_device_write_block;
1058 device_class->finish_file = s3_device_finish_file;
1060 device_class->seek_file = s3_device_seek_file;
1061 device_class->seek_block = s3_device_seek_block;
1062 device_class->read_block = s3_device_read_block;
1063 device_class->recycle_file = s3_device_recycle_file;
1065 device_class->erase = s3_device_erase;
1067 g_object_class->finalize = s3_device_finalize;
1069 device_class_register_property(device_class, PROPERTY_S3_ACCESS_KEY,
1070 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1071 device_simple_property_get_fn,
1072 s3_device_set_access_key_fn);
1074 device_class_register_property(device_class, PROPERTY_S3_SECRET_KEY,
1075 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1076 device_simple_property_get_fn,
1077 s3_device_set_secret_key_fn);
1079 device_class_register_property(device_class, PROPERTY_SWIFT_ACCOUNT_ID,
1080 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1081 device_simple_property_get_fn,
1082 s3_device_set_swift_account_id_fn);
1084 device_class_register_property(device_class, PROPERTY_SWIFT_ACCESS_KEY,
1085 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1086 device_simple_property_get_fn,
1087 s3_device_set_swift_access_key_fn);
1089 device_class_register_property(device_class, PROPERTY_S3_HOST,
1090 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1091 device_simple_property_get_fn,
1092 s3_device_set_host_fn);
1094 device_class_register_property(device_class, PROPERTY_S3_SERVICE_PATH,
1095 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1096 device_simple_property_get_fn,
1097 s3_device_set_service_path_fn);
1099 device_class_register_property(device_class, PROPERTY_S3_USER_TOKEN,
1100 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1101 device_simple_property_get_fn,
1102 s3_device_set_user_token_fn);
1104 device_class_register_property(device_class, PROPERTY_S3_BUCKET_LOCATION,
1105 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1106 device_simple_property_get_fn,
1107 s3_device_set_bucket_location_fn);
1109 device_class_register_property(device_class, PROPERTY_S3_STORAGE_CLASS,
1110 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1111 device_simple_property_get_fn,
1112 s3_device_set_storage_class_fn);
1114 device_class_register_property(device_class, PROPERTY_S3_SERVER_SIDE_ENCRYPTION,
1115 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1116 device_simple_property_get_fn,
1117 s3_device_set_server_side_encryption_fn);
1119 device_class_register_property(device_class, PROPERTY_SSL_CA_INFO,
1120 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1121 device_simple_property_get_fn,
1122 s3_device_set_ca_info_fn);
1124 device_class_register_property(device_class, PROPERTY_VERBOSE,
1125 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1126 device_simple_property_get_fn,
1127 s3_device_set_verbose_fn);
1129 device_class_register_property(device_class, PROPERTY_OPENSTACK_SWIFT_API,
1130 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1131 device_simple_property_get_fn,
1132 s3_device_set_openstack_swift_api_fn);
1134 device_class_register_property(device_class, PROPERTY_S3_SSL,
1135 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1136 device_simple_property_get_fn,
1137 s3_device_set_ssl_fn);
1139 device_class_register_property(device_class, PROPERTY_MAX_SEND_SPEED,
1140 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1141 device_simple_property_get_fn,
1142 s3_device_set_max_send_speed_fn);
1144 device_class_register_property(device_class, PROPERTY_MAX_RECV_SPEED,
1145 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1146 device_simple_property_get_fn,
1147 s3_device_set_max_recv_speed_fn);
1149 device_class_register_property(device_class, PROPERTY_NB_THREADS_BACKUP,
1150 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1151 device_simple_property_get_fn,
1152 s3_device_set_nb_threads_backup);
1154 device_class_register_property(device_class, PROPERTY_NB_THREADS_RECOVERY,
1155 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1156 device_simple_property_get_fn,
1157 s3_device_set_nb_threads_recovery);
1159 device_class_register_property(device_class, PROPERTY_COMPRESSION,
1160 PROPERTY_ACCESS_GET_MASK,
1161 device_simple_property_get_fn,
1164 device_class_register_property(device_class, PROPERTY_LEOM,
1165 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1166 device_simple_property_get_fn,
1167 property_set_leom_fn);
1169 device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
1170 (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1171 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1172 device_simple_property_get_fn,
1173 s3_device_set_max_volume_usage_fn);
1175 device_class_register_property(device_class, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
1176 (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1177 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1178 device_simple_property_get_fn,
1179 s3_device_set_enforce_max_volume_usage_fn);
1181 device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN,
1182 (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1183 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1184 device_simple_property_get_fn,
1185 s3_device_set_use_subdomain_fn);
1189 s3_device_set_access_key_fn(Device *p_self, DevicePropertyBase *base,
1190 GValue *val, PropertySurety surety, PropertySource source)
1192 S3Device *self = S3_DEVICE(p_self);
1194 amfree(self->access_key);
1195 self->access_key = g_value_dup_string(val);
1196 device_clear_volume_details(p_self);
1198 return device_simple_property_set_fn(p_self, base, val, surety, source);
1202 s3_device_set_secret_key_fn(Device *p_self, DevicePropertyBase *base,
1203 GValue *val, PropertySurety surety, PropertySource source)
1205 S3Device *self = S3_DEVICE(p_self);
1207 amfree(self->secret_key);
1208 self->secret_key = g_value_dup_string(val);
1209 device_clear_volume_details(p_self);
1211 return device_simple_property_set_fn(p_self, base, val, surety, source);
1215 s3_device_set_swift_account_id_fn(Device *p_self, DevicePropertyBase *base,
1216 GValue *val, PropertySurety surety, PropertySource source)
1218 S3Device *self = S3_DEVICE(p_self);
1220 amfree(self->swift_account_id);
1221 self->swift_account_id = g_value_dup_string(val);
1222 device_clear_volume_details(p_self);
1224 return device_simple_property_set_fn(p_self, base, val, surety, source);
1228 s3_device_set_swift_access_key_fn(Device *p_self, DevicePropertyBase *base,
1229 GValue *val, PropertySurety surety, PropertySource source)
1231 S3Device *self = S3_DEVICE(p_self);
1233 amfree(self->swift_access_key);
1234 self->swift_access_key = g_value_dup_string(val);
1235 device_clear_volume_details(p_self);
1237 return device_simple_property_set_fn(p_self, base, val, surety, source);
1241 s3_device_set_host_fn(Device *p_self,
1242 DevicePropertyBase *base, GValue *val,
1243 PropertySurety surety, PropertySource source)
1245 S3Device *self = S3_DEVICE(p_self);
1248 self->host = g_value_dup_string(val);
1249 device_clear_volume_details(p_self);
1251 return device_simple_property_set_fn(p_self, base, val, surety, source);
1255 s3_device_set_service_path_fn(Device *p_self,
1256 DevicePropertyBase *base, GValue *val,
1257 PropertySurety surety, PropertySource source)
1259 S3Device *self = S3_DEVICE(p_self);
1261 amfree(self->service_path);
1262 self->service_path = g_value_dup_string(val);
1263 device_clear_volume_details(p_self);
1265 return device_simple_property_set_fn(p_self, base, val, surety, source);
1269 s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base,
1270 GValue *val, PropertySurety surety, PropertySource source)
1272 S3Device *self = S3_DEVICE(p_self);
1274 amfree(self->user_token);
1275 self->user_token = g_value_dup_string(val);
1276 device_clear_volume_details(p_self);
1278 return device_simple_property_set_fn(p_self, base, val, surety, source);
1282 s3_device_set_bucket_location_fn(Device *p_self, DevicePropertyBase *base,
1283 GValue *val, PropertySurety surety, PropertySource source)
1285 S3Device *self = S3_DEVICE(p_self);
1286 char *str_val = g_value_dup_string(val);
1288 if (str_val[0] && self->use_ssl && !s3_curl_location_compat()) {
1289 device_set_error(p_self, stralloc(_(
1290 "Location constraint given for Amazon S3 bucket, "
1291 "but libcurl is too old support wildcard certificates.")),
1292 DEVICE_STATUS_DEVICE_ERROR);
1296 if (str_val[0] && !s3_bucket_location_compat(self->bucket)) {
1297 device_set_error(p_self, g_strdup_printf(_(
1298 "Location constraint given for Amazon S3 bucket, "
1299 "but the bucket name (%s) is not usable as a subdomain."),
1301 DEVICE_STATUS_DEVICE_ERROR);
1305 amfree(self->bucket_location);
1306 self->bucket_location = str_val;
1307 device_clear_volume_details(p_self);
1309 return device_simple_property_set_fn(p_self, base, val, surety, source);
1316 s3_device_set_storage_class_fn(Device *p_self, DevicePropertyBase *base,
1317 GValue *val, PropertySurety surety, PropertySource source)
1319 S3Device *self = S3_DEVICE(p_self);
1320 char *str_val = g_value_dup_string(val);
1322 amfree(self->storage_class);
1323 self->storage_class = str_val;
1324 device_clear_volume_details(p_self);
1326 return device_simple_property_set_fn(p_self, base, val, surety, source);
1330 s3_device_set_server_side_encryption_fn(Device *p_self, DevicePropertyBase *base,
1331 GValue *val, PropertySurety surety, PropertySource source)
1333 S3Device *self = S3_DEVICE(p_self);
1334 char *str_val = g_value_dup_string(val);
1336 amfree(self->server_side_encryption);
1337 self->server_side_encryption = str_val;
1338 device_clear_volume_details(p_self);
1340 return device_simple_property_set_fn(p_self, base, val, surety, source);
1344 s3_device_set_ca_info_fn(Device *p_self, DevicePropertyBase *base,
1345 GValue *val, PropertySurety surety, PropertySource source)
1347 S3Device *self = S3_DEVICE(p_self);
1349 amfree(self->ca_info);
1350 self->ca_info = g_value_dup_string(val);
1351 device_clear_volume_details(p_self);
1353 return device_simple_property_set_fn(p_self, base, val, surety, source);
1357 s3_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base,
1358 GValue *val, PropertySurety surety, PropertySource source)
1360 S3Device *self = S3_DEVICE(p_self);
1363 self->verbose = g_value_get_boolean(val);
1364 /* Our S3 handle may not yet have been instantiated; if so, it will
1365 * get the proper verbose setting when it is created */
1367 for (thread = 0; thread < self->nb_threads; thread++) {
1368 if (self->s3t[thread].s3)
1369 s3_verbose(self->s3t[thread].s3, self->verbose);
1373 return device_simple_property_set_fn(p_self, base, val, surety, source);
1377 s3_device_set_openstack_swift_api_fn(Device *p_self, DevicePropertyBase *base,
1378 GValue *val, PropertySurety surety, PropertySource source)
1380 S3Device *self = S3_DEVICE(p_self);
1382 self->openstack_swift_api = g_value_get_boolean(val);
1384 return device_simple_property_set_fn(p_self, base, val, surety, source);
1388 s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base,
1389 GValue *val, PropertySurety surety, PropertySource source)
1391 S3Device *self = S3_DEVICE(p_self);
1395 new_val = g_value_get_boolean(val);
1396 /* Our S3 handle may not yet have been instantiated; if so, it will
1397 * get the proper use_ssl setting when it is created */
1399 for (thread = 0; thread < self->nb_threads; thread++) {
1400 if (self->s3t[thread].s3 && !s3_use_ssl(self->s3t[thread].s3, new_val)) {
1401 device_set_error(p_self, g_strdup_printf(_(
1402 "Error setting S3 SSL/TLS use "
1403 "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
1404 DEVICE_STATUS_DEVICE_ERROR);
1409 self->use_ssl = new_val;
1411 return device_simple_property_set_fn(p_self, base, val, surety, source);
1415 s3_device_set_max_send_speed_fn(Device *p_self,
1416 DevicePropertyBase *base, GValue *val,
1417 PropertySurety surety, PropertySource source)
1419 S3Device *self = S3_DEVICE(p_self);
1423 new_val = g_value_get_uint64(val);
1425 for (thread = 0; thread < self->nb_threads; thread++) {
1426 if (self->s3t[thread].s3 && !s3_set_max_send_speed(self->s3t[thread].s3, new_val)) {
1427 device_set_error(p_self,
1428 g_strdup("Could not set S3 maximum send speed"),
1429 DEVICE_STATUS_DEVICE_ERROR);
1434 self->max_send_speed = new_val;
1436 return device_simple_property_set_fn(p_self, base, val, surety, source);
1440 s3_device_set_max_recv_speed_fn(Device *p_self,
1441 DevicePropertyBase *base, GValue *val,
1442 PropertySurety surety, PropertySource source)
1444 S3Device *self = S3_DEVICE(p_self);
1448 new_val = g_value_get_uint64(val);
1450 for (thread = 0; thread < self->nb_threads; thread++) {
1451 if (self->s3t[thread].s3 &&
1452 !s3_set_max_recv_speed(self->s3t[thread].s3, new_val)) {
1453 device_set_error(p_self,
1454 g_strdup("Could not set S3 maximum recv speed"),
1455 DEVICE_STATUS_DEVICE_ERROR);
1460 self->max_recv_speed = new_val;
1462 return device_simple_property_set_fn(p_self, base, val, surety, source);
1466 s3_device_set_nb_threads_backup(Device *p_self,
1467 DevicePropertyBase *base, GValue *val,
1468 PropertySurety surety, PropertySource source)
1470 S3Device *self = S3_DEVICE(p_self);
1473 new_val = g_value_get_uint64(val);
1474 self->nb_threads_backup = new_val;
1475 if (self->nb_threads_backup > self->nb_threads) {
1476 self->nb_threads = self->nb_threads_backup;
1479 return device_simple_property_set_fn(p_self, base, val, surety, source);
1483 s3_device_set_nb_threads_recovery(Device *p_self,
1484 DevicePropertyBase *base, GValue *val,
1485 PropertySurety surety, PropertySource source)
1487 S3Device *self = S3_DEVICE(p_self);
1490 new_val = g_value_get_uint64(val);
1491 self->nb_threads_recovery = new_val;
1492 if (self->nb_threads_recovery > self->nb_threads) {
1493 self->nb_threads = self->nb_threads_recovery;
1496 return device_simple_property_set_fn(p_self, base, val, surety, source);
1500 s3_device_set_max_volume_usage_fn(Device *p_self,
1501 DevicePropertyBase *base, GValue *val,
1502 PropertySurety surety, PropertySource source)
1504 S3Device *self = S3_DEVICE(p_self);
1506 self->volume_limit = g_value_get_uint64(val);
1508 return device_simple_property_set_fn(p_self, base, val, surety, source);
1513 s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
1514 DevicePropertyBase *base, GValue *val,
1515 PropertySurety surety, PropertySource source)
1517 S3Device *self = S3_DEVICE(p_self);
1519 self->enforce_volume_limit = g_value_get_boolean(val);
1521 return device_simple_property_set_fn(p_self, base, val, surety, source);
1526 s3_device_set_use_subdomain_fn(Device *p_self,
1527 DevicePropertyBase *base, GValue *val,
1528 PropertySurety surety, PropertySource source)
1530 S3Device *self = S3_DEVICE(p_self);
1532 self->use_subdomain = g_value_get_boolean(val);
1534 return device_simple_property_set_fn(p_self, base, val, surety, source);
1538 property_set_leom_fn(Device *p_self,
1539 DevicePropertyBase *base, GValue *val,
1540 PropertySurety surety, PropertySource source)
1542 S3Device *self = S3_DEVICE(p_self);
1544 self->leom = g_value_get_boolean(val);
1546 return device_simple_property_set_fn(p_self, base, val, surety, source);
1549 s3_device_factory(char * device_name, char * device_type, char * device_node)
1552 g_assert(0 == strcmp(device_type, S3_DEVICE_NAME));
1553 rval = DEVICE(g_object_new(TYPE_S3_DEVICE, NULL));
1555 device_open_device(rval, device_name, device_type, device_node);
1560 * Virtual function overrides
1564 s3_device_open_device(Device *pself, char *device_name,
1565 char * device_type, char * device_node)
1567 S3Device *self = S3_DEVICE(pself);
1571 pself->min_block_size = S3_DEVICE_MIN_BLOCK_SIZE;
1572 pself->max_block_size = S3_DEVICE_MAX_BLOCK_SIZE;
1573 pself->block_size = S3_DEVICE_DEFAULT_BLOCK_SIZE;
1575 /* Device name may be bucket/prefix, to support multiple volumes in a
1577 name_colon = strchr(device_node, '/');
1578 if (name_colon == NULL) {
1579 self->bucket = g_strdup(device_node);
1580 self->prefix = g_strdup("");
1582 self->bucket = g_strndup(device_node, name_colon - device_node);
1583 self->prefix = g_strdup(name_colon + 1);
1586 if (self->bucket == NULL || self->bucket[0] == '\0') {
1587 device_set_error(pself,
1588 vstrallocf(_("Empty bucket name in device %s"), device_name),
1589 DEVICE_STATUS_DEVICE_ERROR);
1590 amfree(self->bucket);
1591 amfree(self->prefix);
1595 g_debug(_("S3 driver using bucket '%s', prefix '%s'"), self->bucket, self->prefix);
1597 /* default values */
1598 self->verbose = FALSE;
1599 self->openstack_swift_api = FALSE;
1601 /* use SSL if available */
1602 self->use_ssl = s3_curl_supports_ssl();
1603 bzero(&tmp_value, sizeof(GValue));
1604 g_value_init(&tmp_value, G_TYPE_BOOLEAN);
1605 g_value_set_boolean(&tmp_value, self->use_ssl);
1606 device_set_simple_property(pself, device_property_s3_ssl.ID,
1607 &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
1609 if (parent_class->open_device) {
1610 parent_class->open_device(pself, device_name, device_type, device_node);
1614 static void s3_device_finalize(GObject * obj_self) {
1615 S3Device *self = S3_DEVICE (obj_self);
1618 if(G_OBJECT_CLASS(parent_class)->finalize)
1619 (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
1621 if (self->thread_pool_delete) {
1622 g_thread_pool_free(self->thread_pool_delete, 1, 1);
1623 self->thread_pool_delete = NULL;
1625 if (self->thread_pool_write) {
1626 g_thread_pool_free(self->thread_pool_write, 1, 1);
1627 self->thread_pool_write = NULL;
1629 if (self->thread_pool_read) {
1630 g_thread_pool_free(self->thread_pool_read, 1, 1);
1631 self->thread_pool_read = NULL;
1633 if (self->thread_idle_mutex) {
1634 g_mutex_free(self->thread_idle_mutex);
1635 self->thread_idle_mutex = NULL;
1637 if (self->thread_idle_cond) {
1638 g_cond_free(self->thread_idle_cond);
1639 self->thread_idle_cond = NULL;
1642 for (thread = 0; thread < self->nb_threads; thread++) {
1643 if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3);
1647 if(self->bucket) g_free(self->bucket);
1648 if(self->prefix) g_free(self->prefix);
1649 if(self->access_key) g_free(self->access_key);
1650 if(self->secret_key) g_free(self->secret_key);
1651 if(self->swift_account_id) g_free(self->swift_account_id);
1652 if(self->swift_access_key) g_free(self->swift_access_key);
1653 if(self->host) g_free(self->host);
1654 if(self->service_path) g_free(self->service_path);
1655 if(self->user_token) g_free(self->user_token);
1656 if(self->bucket_location) g_free(self->bucket_location);
1657 if(self->storage_class) g_free(self->storage_class);
1658 if(self->server_side_encryption) g_free(self->server_side_encryption);
1659 if(self->ca_info) g_free(self->ca_info);
1662 static gboolean setup_handle(S3Device * self) {
1663 Device *d_self = DEVICE(self);
1665 guint response_code;
1666 s3_error_code_t s3_error_code;
1669 if (self->s3t == NULL) {
1670 self->s3t = g_new(S3_by_thread, self->nb_threads);
1671 if (self->s3t == NULL) {
1672 device_set_error(d_self,
1673 stralloc(_("Can't allocate S3Handle array")),
1674 DEVICE_STATUS_DEVICE_ERROR);
1678 if (!self->openstack_swift_api) {
1679 if (self->access_key == NULL || self->access_key[0] == '\0') {
1680 device_set_error(d_self,
1681 g_strdup(_("No Amazon access key specified")),
1682 DEVICE_STATUS_DEVICE_ERROR);
1686 if (self->secret_key == NULL || self->secret_key[0] == '\0') {
1687 device_set_error(d_self,
1688 g_strdup(_("No Amazon secret key specified")),
1689 DEVICE_STATUS_DEVICE_ERROR);
1693 if (self->swift_account_id == NULL ||
1694 self->swift_account_id[0] == '\0') {
1695 device_set_error(d_self,
1696 g_strdup(_("No Swift account id specified")),
1697 DEVICE_STATUS_DEVICE_ERROR);
1700 if (self->swift_access_key == NULL ||
1701 self->swift_access_key[0] == '\0') {
1702 device_set_error(d_self,
1703 g_strdup(_("No Swift access key specified")),
1704 DEVICE_STATUS_DEVICE_ERROR);
1709 if (!self->use_ssl && self->ca_info) {
1710 amfree(self->ca_info);
1713 self->thread_idle_cond = g_cond_new();
1714 self->thread_idle_mutex = g_mutex_new();
1716 for (thread = 0; thread < self->nb_threads; thread++) {
1717 self->s3t[thread].idle = 1;
1718 self->s3t[thread].done = 1;
1719 self->s3t[thread].eof = FALSE;
1720 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
1721 self->s3t[thread].errmsg = NULL;
1722 self->s3t[thread].filename = NULL;
1723 self->s3t[thread].curl_buffer.buffer = NULL;
1724 self->s3t[thread].curl_buffer.buffer_len = 0;
1725 self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key,
1726 self->swift_account_id,
1727 self->swift_access_key,
1728 self->host, self->service_path,
1729 self->use_subdomain,
1730 self->user_token, self->bucket_location,
1731 self->storage_class, self->ca_info,
1732 self->server_side_encryption,
1733 self->openstack_swift_api);
1734 if (self->s3t[thread].s3 == NULL) {
1735 device_set_error(d_self,
1736 stralloc(_("Internal error creating S3 handle")),
1737 DEVICE_STATUS_DEVICE_ERROR);
1738 self->nb_threads = thread+1;
1740 } else if (self->openstack_swift_api) {
1741 s3_error(self->s3t[0].s3, NULL, &response_code,
1742 &s3_error_code, NULL, &curl_code, NULL);
1743 if (response_code != 200) {
1744 device_set_error(d_self,
1745 g_strdup_printf(_("Internal error creating S3 handle: %s"),
1746 s3_strerror(self->s3t[0].s3)),
1747 DEVICE_STATUS_DEVICE_ERROR);
1748 self->nb_threads = thread+1;
1754 g_debug("Create %d threads", self->nb_threads);
1755 self->thread_pool_delete = g_thread_pool_new(s3_thread_delete_block,
1756 self, self->nb_threads, 0,
1758 self->thread_pool_write = g_thread_pool_new(s3_thread_write_block, self,
1759 self->nb_threads, 0, NULL);
1760 self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self,
1761 self->nb_threads, 0, NULL);
1764 for (thread = 0; thread < self->nb_threads; thread++) {
1765 s3_verbose(self->s3t[thread].s3, self->verbose);
1767 if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
1768 device_set_error(d_self, g_strdup_printf(_(
1769 "Error setting S3 SSL/TLS use "
1770 "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
1771 DEVICE_STATUS_DEVICE_ERROR);
1775 if (self->max_send_speed &&
1776 !s3_set_max_send_speed(self->s3t[thread].s3, self->max_send_speed)) {
1777 device_set_error(d_self,
1778 g_strdup("Could not set S3 maximum send speed"),
1779 DEVICE_STATUS_DEVICE_ERROR);
1783 if (self->max_recv_speed &&
1784 !s3_set_max_recv_speed(self->s3t[thread].s3, self->max_recv_speed)) {
1785 device_set_error(d_self,
1786 g_strdup("Could not set S3 maximum recv speed"),
1787 DEVICE_STATUS_DEVICE_ERROR);
1799 S3Device *self = S3_DEVICE(pself);
1800 guint response_code;
1801 s3_error_code_t s3_error_code;
1804 if (s3_is_bucket_exists(self->s3t[0].s3, self->bucket)) {
1808 s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, &curl_code, NULL);
1810 if (response_code == 0 && s3_error_code == 0 &&
1811 (curl_code == CURLE_COULDNT_CONNECT ||
1812 curl_code == CURLE_COULDNT_RESOLVE_HOST)) {
1813 device_set_error(pself,
1814 g_strdup_printf(_("While connecting to S3 bucket: %s"),
1815 s3_strerror(self->s3t[0].s3)),
1816 DEVICE_STATUS_DEVICE_ERROR);
1820 if (!s3_make_bucket(self->s3t[0].s3, self->bucket)) {
1821 s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
1823 /* if it isn't an expected error (bucket already exists),
1825 if (response_code != 409 ||
1826 (s3_error_code != S3_ERROR_BucketAlreadyExists &&
1827 s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) {
1828 device_set_error(pself,
1829 g_strdup_printf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)),
1830 DEVICE_STATUS_DEVICE_ERROR);
1837 static DeviceStatusFlags
1838 s3_device_read_label(Device *pself) {
1839 S3Device *self = S3_DEVICE(pself);
1841 CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
1842 dumpfile_t *amanda_header;
1843 /* note that this may be called from s3_device_start, when
1844 * self->access_mode is not ACCESS_NULL */
1846 amfree(pself->volume_label);
1847 amfree(pself->volume_time);
1848 dumpfile_free(pself->volume_header);
1849 pself->volume_header = NULL;
1851 if (device_in_error(self)) return pself->status;
1853 if (!setup_handle(self)) {
1854 /* setup_handle already set our error message */
1855 return pself->status;
1859 key = special_file_to_key(self, "tapestart", -1);
1861 if (!make_bucket(pself)) {
1862 return pself->status;
1865 if (!s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) {
1866 guint response_code;
1867 s3_error_code_t s3_error_code;
1868 s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
1870 /* if it's an expected error (not found), just return FALSE */
1871 if (response_code == 404 &&
1872 (s3_error_code == S3_ERROR_None ||
1873 s3_error_code == S3_ERROR_Unknown ||
1874 s3_error_code == S3_ERROR_NoSuchKey ||
1875 s3_error_code == S3_ERROR_NoSuchEntity ||
1876 s3_error_code == S3_ERROR_NoSuchBucket)) {
1877 g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)"));
1878 device_set_error(pself,
1879 stralloc(_("Amanda header not found -- unlabeled volume?")),
1880 DEVICE_STATUS_DEVICE_ERROR
1881 | DEVICE_STATUS_VOLUME_ERROR
1882 | DEVICE_STATUS_VOLUME_UNLABELED);
1883 return pself->status;
1886 /* otherwise, log it and return */
1887 device_set_error(pself,
1888 vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3t[0].s3)),
1889 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
1890 return pself->status;
1893 /* handle an empty file gracefully */
1894 if (buf.buffer_len == 0) {
1895 device_set_error(pself, stralloc(_("Empty header file")), DEVICE_STATUS_VOLUME_ERROR);
1896 return pself->status;
1899 pself->header_block_size = buf.buffer_len;
1900 g_assert(buf.buffer != NULL);
1901 amanda_header = g_new(dumpfile_t, 1);
1902 parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
1903 pself->volume_header = amanda_header;
1906 if (amanda_header->type != F_TAPESTART) {
1907 device_set_error(pself, stralloc(_("Invalid amanda header")), DEVICE_STATUS_VOLUME_ERROR);
1908 return pself->status;
1911 pself->volume_label = g_strdup(amanda_header->name);
1912 pself->volume_time = g_strdup(amanda_header->datestamp);
1913 /* pself->volume_header is already set */
1915 device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
1917 return pself->status;
1921 s3_device_start (Device * pself, DeviceAccessMode mode,
1922 char * label, char * timestamp) {
1925 guint64 total_size = 0;
1928 self = S3_DEVICE(pself);
1930 if (device_in_error(self)) return FALSE;
1932 if (!setup_handle(self)) {
1933 /* setup_handle already set our error message */
1938 pself->access_mode = mode;
1939 pself->in_file = FALSE;
1941 /* try creating the bucket, in case it doesn't exist */
1942 if (!make_bucket(pself)) {
1946 /* take care of any dirty work for this mode */
1949 if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
1950 /* s3_device_read_label already set our error message */
1956 delete_all_files(self);
1958 /* write a new amanda header */
1959 if (!write_amanda_header(self, label, timestamp)) {
1963 pself->volume_label = newstralloc(pself->volume_label, label);
1964 pself->volume_time = newstralloc(pself->volume_time, timestamp);
1966 /* unset the VOLUME_UNLABELED flag, if it was set */
1967 device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
1971 if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
1972 /* s3_device_read_label already set our error message */
1975 result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, NULL, &keys, &total_size);
1977 device_set_error(pself,
1978 vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
1979 DEVICE_STATUS_DEVICE_ERROR|DEVICE_STATUS_VOLUME_ERROR);
1982 self->volume_bytes = total_size;
1985 return seek_to_end(self);
1989 g_assert_not_reached();
1999 S3Device *self = S3_DEVICE(pself);
2003 /* we're not in a file anymore */
2004 pself->access_mode = ACCESS_NULL;
2006 if (device_in_error(pself)) return FALSE;
2011 /* functions for writing */
2015 s3_device_start_file (Device *pself, dumpfile_t *jobInfo) {
2016 S3Device *self = S3_DEVICE(pself);
2017 CurlBuffer amanda_header = {NULL, 0, 0, 0};
2023 if (device_in_error(self)) return FALSE;
2026 pself->is_eom = FALSE;
2028 /* Set the blocksize to zero, since there's no header to skip (it's stored
2029 * in a distinct file, rather than block zero) */
2030 jobInfo->blocksize = 0;
2032 /* Build the amanda header. */
2033 header_size = 0; /* no minimum size */
2034 amanda_header.buffer = device_build_amanda_header(pself, jobInfo,
2036 if (amanda_header.buffer == NULL) {
2037 device_set_error(pself,
2038 stralloc(_("Amanda file header won't fit in a single block!")),
2039 DEVICE_STATUS_DEVICE_ERROR);
2042 amanda_header.buffer_len = header_size;
2044 if(check_at_leom(self, header_size))
2045 pself->is_eom = TRUE;
2047 if(check_at_peom(self, header_size)) {
2048 pself->is_eom = TRUE;
2049 device_set_error(pself,
2050 stralloc(_("No space left on device")),
2051 DEVICE_STATUS_DEVICE_ERROR);
2052 g_free(amanda_header.buffer);
2055 /* set the file and block numbers correctly */
2056 pself->file = (pself->file > 0)? pself->file+1 : 1;
2058 pself->in_file = TRUE;
2059 /* write it out as a special block (not the 0th) */
2060 key = special_file_to_key(self, "filestart", pself->file);
2061 result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
2062 &amanda_header, NULL, NULL);
2063 g_free(amanda_header.buffer);
2066 device_set_error(pself,
2067 vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3t[0].s3)),
2068 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
2072 self->volume_bytes += header_size;
2073 for (thread = 0; thread < self->nb_threads; thread++) {
2074 self->s3t[thread].idle = 1;
2081 s3_device_write_block (Device * pself, guint size, gpointer data) {
2083 S3Device * self = S3_DEVICE(pself);
2084 int idle_thread = 0;
2086 int first_idle = -1;
2088 g_assert (self != NULL);
2089 g_assert (data != NULL);
2090 if (device_in_error(self)) return FALSE;
2092 if(check_at_leom(self, size))
2093 pself->is_eom = TRUE;
2095 if(check_at_peom(self, size)) {
2096 pself->is_eom = TRUE;
2097 device_set_error(pself,
2098 stralloc(_("No space left on device")),
2099 DEVICE_STATUS_DEVICE_ERROR);
2103 filename = file_and_block_to_key(self, pself->file, pself->block);
2105 g_mutex_lock(self->thread_idle_mutex);
2106 while (!idle_thread) {
2108 for (thread = 0; thread < self->nb_threads_backup; thread++) {
2109 if (self->s3t[thread].idle == 1) {
2111 /* Check if the thread is in error */
2112 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
2113 device_set_error(pself, (char *)self->s3t[thread].errmsg,
2114 self->s3t[thread].errflags);
2115 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
2116 self->s3t[thread].errmsg = NULL;
2117 g_mutex_unlock(self->thread_idle_mutex);
2120 if (first_idle == -1) {
2121 first_idle = thread;
2127 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2130 thread = first_idle;
2132 self->s3t[thread].idle = 0;
2133 self->s3t[thread].done = 0;
2134 if (self->s3t[thread].curl_buffer.buffer &&
2135 self->s3t[thread].curl_buffer.buffer_len < size) {
2136 g_free((char *)self->s3t[thread].curl_buffer.buffer);
2137 self->s3t[thread].curl_buffer.buffer = NULL;
2138 self->s3t[thread].curl_buffer.buffer_len = 0;
2139 self->s3t[thread].buffer_len = 0;
2141 if (self->s3t[thread].curl_buffer.buffer == NULL) {
2142 self->s3t[thread].curl_buffer.buffer = g_malloc(size);
2143 self->s3t[thread].curl_buffer.buffer_len = size;
2144 self->s3t[thread].buffer_len = size;
2146 memcpy((char *)self->s3t[thread].curl_buffer.buffer, data, size);
2147 self->s3t[thread].curl_buffer.buffer_pos = 0;
2148 self->s3t[thread].curl_buffer.buffer_len = size;
2149 self->s3t[thread].curl_buffer.max_buffer_size = 0;
2150 self->s3t[thread].filename = filename;
2151 g_thread_pool_push(self->thread_pool_write, &self->s3t[thread], NULL);
2152 g_mutex_unlock(self->thread_idle_mutex);
2155 self->volume_bytes += size;
2160 s3_thread_write_block(
2161 gpointer thread_data,
2164 S3_by_thread *s3t = (S3_by_thread *)thread_data;
2165 Device *pself = (Device *)data;
2166 S3Device *self = S3_DEVICE(pself);
2169 result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename,
2170 S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
2171 g_free((void *)s3t->filename);
2172 s3t->filename = NULL;
2174 s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
2175 s3t->errmsg = g_strdup_printf(_("While writing data block to S3: %s"), s3_strerror(s3t->s3));
2177 g_mutex_lock(self->thread_idle_mutex);
2180 s3t->curl_buffer.buffer_len = s3t->buffer_len;
2181 g_cond_broadcast(self->thread_idle_cond);
2182 g_mutex_unlock(self->thread_idle_mutex);
2186 s3_device_finish_file (Device * pself) {
2187 S3Device *self = S3_DEVICE(pself);
2189 /* Check all threads are done */
2190 int idle_thread = 0;
2193 g_mutex_lock(self->thread_idle_mutex);
2194 while (idle_thread != self->nb_threads) {
2196 for (thread = 0; thread < self->nb_threads; thread++) {
2197 if (self->s3t[thread].idle == 1) {
2200 /* check thread status */
2201 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
2202 device_set_error(pself, (char *)self->s3t[thread].errmsg,
2203 self->s3t[thread].errflags);
2204 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
2205 self->s3t[thread].errmsg = NULL;
2208 if (idle_thread != self->nb_threads) {
2209 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2212 g_mutex_unlock(self->thread_idle_mutex);
2214 if (device_in_error(pself)) return FALSE;
2216 /* we're not in a file anymore */
2217 pself->in_file = FALSE;
2223 s3_device_recycle_file(Device *pself, guint file) {
2224 S3Device *self = S3_DEVICE(pself);
2225 if (device_in_error(self)) return FALSE;
2228 delete_file(self, file);
2229 s3_wait_thread_delete(self);
2230 return !device_in_error(self);
2231 /* delete_file already set our error message if necessary */
2235 s3_device_erase(Device *pself) {
2236 S3Device *self = S3_DEVICE(pself);
2238 const char *errmsg = NULL;
2239 guint response_code;
2240 s3_error_code_t s3_error_code;
2242 if (!setup_handle(self)) {
2243 /* error set by setup_handle */
2248 key = special_file_to_key(self, "tapestart", -1);
2249 if (!s3_delete(self->s3t[0].s3, self->bucket, key)) {
2250 s3_error(self->s3t[0].s3, &errmsg, NULL, NULL, NULL, NULL, NULL);
2251 device_set_error(pself,
2253 DEVICE_STATUS_DEVICE_ERROR);
2258 dumpfile_free(pself->volume_header);
2259 pself->volume_header = NULL;
2261 if (!delete_all_files(self))
2264 device_set_error(pself, g_strdup("Unlabeled volume"),
2265 DEVICE_STATUS_VOLUME_UNLABELED);
2267 if (!s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
2268 s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2271 * ignore the error if the bucket isn't empty (there may be data from elsewhere)
2272 * or the bucket not existing (already deleted perhaps?)
2275 (response_code == 409 && s3_error_code == S3_ERROR_BucketNotEmpty) ||
2276 (response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket))) {
2278 device_set_error(pself,
2280 DEVICE_STATUS_DEVICE_ERROR);
2284 self->volume_bytes = 0;
2288 /* functions for reading */
2291 s3_device_seek_file(Device *pself, guint file) {
2292 S3Device *self = S3_DEVICE(pself);
2295 CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
2296 dumpfile_t *amanda_header;
2297 const char *errmsg = NULL;
2300 if (device_in_error(self)) return NULL;
2305 pself->is_eof = FALSE;
2306 pself->in_file = FALSE;
2308 self->next_block_to_read = 0;
2311 key = special_file_to_key(self, "filestart", pself->file);
2312 result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS,
2317 guint response_code;
2318 s3_error_code_t s3_error_code;
2319 s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL);
2321 /* if it's an expected error (not found), check what to do. */
2322 if (response_code == 404 &&
2323 (s3_error_code == S3_ERROR_None ||
2324 s3_error_code == S3_ERROR_NoSuchKey ||
2325 s3_error_code == S3_ERROR_NoSuchEntity)) {
2327 next_file = find_next_file(self, pself->file);
2328 if (next_file > 0) {
2329 /* Note short-circut of dispatcher. */
2330 return s3_device_seek_file(pself, next_file);
2331 } else if (next_file == 0) {
2332 /* No next file. Check if we are one past the end. */
2333 key = special_file_to_key(self, "filestart", pself->file - 1);
2334 result = s3_read(self->s3t[0].s3, self->bucket, key,
2335 S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL);
2338 /* pself->file, etc. are already correct */
2339 return make_tapeend_header();
2341 device_set_error(pself,
2342 stralloc(_("Attempt to read past tape-end file")),
2343 DEVICE_STATUS_SUCCESS);
2348 /* An unexpected error occured finding out if we are the last file. */
2349 device_set_error(pself,
2351 DEVICE_STATUS_DEVICE_ERROR);
2356 /* and make a dumpfile_t out of it */
2357 g_assert(buf.buffer != NULL);
2358 amanda_header = g_new(dumpfile_t, 1);
2359 fh_init(amanda_header);
2360 parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
2363 switch (amanda_header->type) {
2365 case F_CONT_DUMPFILE:
2366 case F_SPLIT_DUMPFILE:
2370 device_set_error(pself,
2371 stralloc(_("Invalid amanda header while reading file header")),
2372 DEVICE_STATUS_VOLUME_ERROR);
2373 g_free(amanda_header);
2377 pself->in_file = TRUE;
2378 for (thread = 0; thread < self->nb_threads; thread++) {
2379 self->s3t[thread].idle = 1;
2380 self->s3t[thread].eof = FALSE;
2382 return amanda_header;
2386 s3_device_seek_block(Device *pself, guint64 block) {
2387 S3Device * self = S3_DEVICE(pself);
2388 if (device_in_error(pself)) return FALSE;
2391 pself->block = block;
2392 self->next_block_to_read = block;
2397 s3_device_read_block (Device * pself, gpointer data, int *size_req) {
2398 S3Device * self = S3_DEVICE(pself);
2403 g_assert (self != NULL);
2404 if (device_in_error(self)) return -1;
2406 g_mutex_lock(self->thread_idle_mutex);
2407 /* start a read ahead for each thread */
2408 for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2409 S3_by_thread *s3t = &self->s3t[thread];
2411 key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
2412 s3t->filename = key;
2416 s3t->errflags = DEVICE_STATUS_SUCCESS;
2417 if (self->s3t[thread].curl_buffer.buffer &&
2418 (int)self->s3t[thread].curl_buffer.buffer_len < *size_req) {
2419 g_free(self->s3t[thread].curl_buffer.buffer);
2420 self->s3t[thread].curl_buffer.buffer = NULL;
2421 self->s3t[thread].curl_buffer.buffer_len = 0;
2422 self->s3t[thread].buffer_len = 0;
2424 if (!self->s3t[thread].curl_buffer.buffer) {
2425 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
2426 self->s3t[thread].curl_buffer.buffer_len = *size_req;
2427 self->s3t[thread].buffer_len = *size_req;
2429 s3t->curl_buffer.buffer_pos = 0;
2430 s3t->curl_buffer.max_buffer_size = S3_DEVICE_MAX_BLOCK_SIZE;
2431 self->next_block_to_read++;
2432 g_thread_pool_push(self->thread_pool_read, s3t, NULL);
2437 key = file_and_block_to_key(self, pself->file, pself->block);
2438 g_assert(key != NULL);
2440 /* find which thread read the key */
2441 for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2443 s3t = &self->s3t[thread];
2446 strcmp(key, (char *)s3t->filename) == 0) {
2450 pself->is_eof = TRUE;
2451 pself->in_file = FALSE;
2452 device_set_error(pself, stralloc(_("EOF")),
2453 DEVICE_STATUS_SUCCESS);
2454 g_mutex_unlock(self->thread_idle_mutex);
2456 } else if (s3t->errflags != DEVICE_STATUS_SUCCESS) {
2457 /* return the error */
2458 device_set_error(pself, (char *)s3t->errmsg, s3t->errflags);
2460 g_mutex_unlock(self->thread_idle_mutex);
2463 } else if ((guint)*size_req >= s3t->curl_buffer.buffer_pos) {
2464 /* return the buffer */
2465 g_mutex_unlock(self->thread_idle_mutex);
2466 memcpy(data, s3t->curl_buffer.buffer,
2467 s3t->curl_buffer.buffer_pos);
2468 *size_req = s3t->curl_buffer.buffer_pos;
2471 g_free((char *)s3t->filename);
2474 g_mutex_lock(self->thread_idle_mutex);
2476 } else { /* buffer not enough large */
2477 *size_req = s3t->curl_buffer.buffer_len;
2479 g_mutex_unlock(self->thread_idle_mutex);
2485 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2489 /* start a read ahead for the thread */
2490 for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2491 S3_by_thread *s3t = &self->s3t[thread];
2493 key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
2494 s3t->filename = key;
2498 s3t->errflags = DEVICE_STATUS_SUCCESS;
2499 if (!self->s3t[thread].curl_buffer.buffer) {
2500 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
2501 self->s3t[thread].curl_buffer.buffer_len = *size_req;
2503 s3t->curl_buffer.buffer_pos = 0;
2504 self->next_block_to_read++;
2505 g_thread_pool_push(self->thread_pool_read, s3t, NULL);
2508 g_mutex_unlock(self->thread_idle_mutex);
2515 s3_thread_read_block(
2516 gpointer thread_data,
2519 S3_by_thread *s3t = (S3_by_thread *)thread_data;
2520 Device *pself = (Device *)data;
2521 S3Device *self = S3_DEVICE(pself);
2524 result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename, s3_buffer_write_func,
2525 s3_buffer_reset_func, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
2527 g_mutex_lock(self->thread_idle_mutex);
2529 guint response_code;
2530 s3_error_code_t s3_error_code;
2531 s3_error(s3t->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2532 /* if it's an expected error (not found), just return -1 */
2533 if (response_code == 404 &&
2534 (s3_error_code == S3_ERROR_None ||
2535 s3_error_code == S3_ERROR_Unknown ||
2536 s3_error_code == S3_ERROR_NoSuchKey ||
2537 s3_error_code == S3_ERROR_NoSuchEntity)) {
2541 /* otherwise, log it and return FALSE */
2542 s3t->errflags = DEVICE_STATUS_VOLUME_ERROR;
2543 s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"),
2544 s3_strerror(s3t->s3));
2548 g_cond_broadcast(self->thread_idle_cond);
2549 g_mutex_unlock(self->thread_idle_mutex);
2555 check_at_peom(S3Device *self, guint64 size)
2557 if(self->enforce_volume_limit && (self->volume_limit > 0)) {
2558 guint64 newtotal = self->volume_bytes + size;
2559 if(newtotal > self->volume_limit) {
2567 check_at_leom(S3Device *self, guint64 size)
2569 guint64 block_size = DEVICE(self)->block_size;
2570 guint64 eom_warning_buffer = block_size *
2571 (EOM_EARLY_WARNING_ZONE_BLOCKS + self->nb_threads);
2576 if(self->enforce_volume_limit && (self->volume_limit > 0)) {
2577 guint64 newtotal = self->volume_bytes + size + eom_warning_buffer;
2578 if(newtotal > self->volume_limit) {
2592 g_mutex_lock(self->thread_idle_mutex);
2593 while(nb_done != self->nb_threads) {
2595 for (thread = 0; thread < self->nb_threads; thread++) {
2596 if (self->s3t[thread].done == 1)
2599 if (nb_done != self->nb_threads) {
2600 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2603 g_mutex_unlock(self->thread_idle_mutex);