2 * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
4 * This program is free software; you can redistribute it and/or modify it
5 * under the terms of the GNU General Public License version 2 as published
6 * by the Free Software Foundation.
8 * This program is distributed in the hope that it will be useful, but
9 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 * You should have received a copy of the GNU General Public License along
14 * with this program; if not, write to the Free Software Foundation, Inc.,
15 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21 /* An S3 device uses Amazon's S3 service (http://www.amazon.com/s3) to store
22 * data. It stores data in keys named with a user-specified prefix, inside a
23 * user-specified bucket. Data is stored in the form of numbered (large)
29 #include <sys/types.h>
39 #include <curl/curl.h>
40 #ifdef HAVE_OPENSSL_HMAC_H
41 # include <openssl/hmac.h>
43 # ifdef HAVE_CRYPTO_HMAC_H
44 # include <crypto/hmac.h>
53 * Type checking and casting macros
55 #define TYPE_S3_DEVICE (s3_device_get_type())
56 #define S3_DEVICE(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device)
57 #define S3_DEVICE_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device const)
58 #define S3_DEVICE_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), s3_device_get_type(), S3DeviceClass)
59 #define IS_S3_DEVICE(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), s3_device_get_type ())
61 #define S3_DEVICE_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), s3_device_get_type(), S3DeviceClass)
62 static GType s3_device_get_type (void);
65 * Main object structure
67 typedef struct _S3MetadataFile S3MetadataFile;
69 typedef struct _S3_by_thread S3_by_thread;
70 struct _S3_by_thread {
71 S3Handle * volatile s3;
72 CurlBuffer volatile curl_buffer;
73 guint volatile buffer_len;
77 char volatile * volatile filename;
78 DeviceStatusFlags volatile errflags; /* device_status */
79 char volatile * volatile errmsg; /* device error message */
82 typedef struct _S3Device S3Device;
86 /* The "easy" curl handle we use to access Amazon S3 */
89 /* S3 access information */
93 /* The S3 access information. */
98 char *bucket_location;
105 /* a cache for unsuccessful reads (where we get the file but the caller
106 * doesn't have space for it or doesn't want it), where we expect the
107 * next call will request the same file.
113 /* Produce verbose output? */
120 guint64 max_send_speed;
121 guint64 max_recv_speed;
124 guint64 volume_bytes;
125 guint64 volume_limit;
126 gboolean enforce_volume_limit;
127 gboolean use_subdomain;
130 int nb_threads_backup;
131 int nb_threads_recovery;
132 GThreadPool *thread_pool_write;
133 GThreadPool *thread_pool_read;
134 GCond *thread_idle_cond;
135 GMutex *thread_idle_mutex;
136 int next_block_to_read;
142 typedef struct _S3DeviceClass S3DeviceClass;
143 struct _S3DeviceClass {
144 DeviceClass __parent__;
149 * Constants and static data
152 #define S3_DEVICE_NAME "s3"
154 /* Maximum key length as specified in the S3 documentation
155 * (*excluding* null terminator) */
156 #define S3_MAX_KEY_LENGTH 1024
158 /* Note: for compatability, min can only be decreased and max increased */
159 #define S3_DEVICE_MIN_BLOCK_SIZE 1024
160 #define S3_DEVICE_MAX_BLOCK_SIZE (100*1024*1024)
161 #define S3_DEVICE_DEFAULT_BLOCK_SIZE (10*1024*1024)
162 #define EOM_EARLY_WARNING_ZONE_BLOCKS 4
164 /* This goes in lieu of file number for metadata. */
165 #define SPECIAL_INFIX "special-"
167 /* pointer to the class of our parent */
168 static DeviceClass *parent_class = NULL;
171 * device-specific properties
174 /* Authentication information for Amazon S3. Both of these are strings. */
175 static DevicePropertyBase device_property_s3_access_key;
176 static DevicePropertyBase device_property_s3_secret_key;
177 #define PROPERTY_S3_SECRET_KEY (device_property_s3_secret_key.ID)
178 #define PROPERTY_S3_ACCESS_KEY (device_property_s3_access_key.ID)
181 static DevicePropertyBase device_property_s3_host;
182 static DevicePropertyBase device_property_s3_service_path;
183 #define PROPERTY_S3_HOST (device_property_s3_host.ID)
184 #define PROPERTY_S3_SERVICE_PATH (device_property_s3_service_path.ID)
186 /* Same, but for S3 with DevPay. */
187 static DevicePropertyBase device_property_s3_user_token;
188 #define PROPERTY_S3_USER_TOKEN (device_property_s3_user_token.ID)
190 /* Location constraint for new buckets created on Amazon S3. */
191 static DevicePropertyBase device_property_s3_bucket_location;
192 #define PROPERTY_S3_BUCKET_LOCATION (device_property_s3_bucket_location.ID)
195 static DevicePropertyBase device_property_s3_storage_class;
196 #define PROPERTY_S3_STORAGE_CLASS (device_property_s3_storage_class.ID)
198 /* Path to certificate authority certificate */
199 static DevicePropertyBase device_property_ssl_ca_info;
200 #define PROPERTY_SSL_CA_INFO (device_property_ssl_ca_info.ID)
202 /* Whether to use SSL with Amazon S3. */
203 static DevicePropertyBase device_property_s3_ssl;
204 #define PROPERTY_S3_SSL (device_property_s3_ssl.ID)
206 /* Speed limits for sending and receiving */
207 static DevicePropertyBase device_property_max_send_speed;
208 static DevicePropertyBase device_property_max_recv_speed;
209 #define PROPERTY_MAX_SEND_SPEED (device_property_max_send_speed.ID)
210 #define PROPERTY_MAX_RECV_SPEED (device_property_max_recv_speed.ID)
212 /* Whether to use subdomain */
213 static DevicePropertyBase device_property_s3_subdomain;
214 #define PROPERTY_S3_SUBDOMAIN (device_property_s3_subdomain.ID)
216 /* Number of threads to use */
217 static DevicePropertyBase device_property_nb_threads_backup;
218 #define PROPERTY_NB_THREADS_BACKUP (device_property_nb_threads_backup.ID)
219 static DevicePropertyBase device_property_nb_threads_recovery;
220 #define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID)
226 void s3_device_register(void);
229 * utility functions */
231 /* Given file and block numbers, return an S3 key.
233 * @param self: the S3Device object
234 * @param file: the file number
235 * @param block: the block within that file
236 * @returns: a newly allocated string containing an S3 key.
239 file_and_block_to_key(S3Device *self,
243 /* Given the name of a special file (such as 'tapestart'), generate
244 * the S3 key to use for that file.
246 * @param self: the S3Device object
247 * @param special_name: name of the special file
248 * @param file: a file number to include; omitted if -1
249 * @returns: a newly alocated string containing an S3 key.
252 special_file_to_key(S3Device *self,
255 /* Write an amanda header file to S3.
257 * @param self: the S3Device object
258 * @param label: the volume label
259 * @param timestamp: the volume timestamp
262 write_amanda_header(S3Device *self,
266 /* "Fast forward" this device to the end by looking up the largest file number
267 * present and setting the current file number one greater.
269 * @param self: the S3Device object
272 seek_to_end(S3Device *self);
274 /* Find the number of the last file that contains any data (even just a header).
276 * @param self: the S3Device object
277 * @returns: the last file, or -1 in event of an error
280 find_last_file(S3Device *self);
282 /* Delete all blocks in the given file, including the filestart block
284 * @param self: the S3Device object
285 * @param file: the file to delete
288 delete_file(S3Device *self,
292 /* Delete all files in the given device
294 * @param self: the S3Device object
297 delete_all_files(S3Device *self);
299 /* Set up self->s3t as best as possible.
301 * The return value is TRUE iff self->s3t is useable.
303 * @param self: the S3Device object
304 * @returns: TRUE if the handle is set up
307 setup_handle(S3Device * self);
313 s3_device_init(S3Device * o);
316 s3_device_class_init(S3DeviceClass * c);
319 s3_device_finalize(GObject * o);
322 s3_device_factory(char * device_name, char * device_type, char * device_node);
325 * Property{Get,Set}Fns */
327 static gboolean s3_device_set_access_key_fn(Device *self,
328 DevicePropertyBase *base, GValue *val,
329 PropertySurety surety, PropertySource source);
331 static gboolean s3_device_set_secret_key_fn(Device *self,
332 DevicePropertyBase *base, GValue *val,
333 PropertySurety surety, PropertySource source);
335 static gboolean s3_device_set_user_token_fn(Device *self,
336 DevicePropertyBase *base, GValue *val,
337 PropertySurety surety, PropertySource source);
339 static gboolean s3_device_set_bucket_location_fn(Device *self,
340 DevicePropertyBase *base, GValue *val,
341 PropertySurety surety, PropertySource source);
343 static gboolean s3_device_set_storage_class_fn(Device *self,
344 DevicePropertyBase *base, GValue *val,
345 PropertySurety surety, PropertySource source);
347 static gboolean s3_device_set_ca_info_fn(Device *self,
348 DevicePropertyBase *base, GValue *val,
349 PropertySurety surety, PropertySource source);
351 static gboolean s3_device_set_verbose_fn(Device *self,
352 DevicePropertyBase *base, GValue *val,
353 PropertySurety surety, PropertySource source);
355 static gboolean s3_device_set_ssl_fn(Device *self,
356 DevicePropertyBase *base, GValue *val,
357 PropertySurety surety, PropertySource source);
359 static gboolean s3_device_set_max_send_speed_fn(Device *self,
360 DevicePropertyBase *base, GValue *val,
361 PropertySurety surety, PropertySource source);
363 static gboolean s3_device_set_max_recv_speed_fn(Device *self,
364 DevicePropertyBase *base, GValue *val,
365 PropertySurety surety, PropertySource source);
367 static gboolean s3_device_set_nb_threads_backup(Device *self,
368 DevicePropertyBase *base, GValue *val,
369 PropertySurety surety, PropertySource source);
371 static gboolean s3_device_set_nb_threads_recovery(Device *self,
372 DevicePropertyBase *base, GValue *val,
373 PropertySurety surety, PropertySource source);
375 static gboolean s3_device_set_max_volume_usage_fn(Device *p_self,
376 DevicePropertyBase *base, GValue *val,
377 PropertySurety surety, PropertySource source);
379 static gboolean property_set_leom_fn(Device *p_self,
380 DevicePropertyBase *base, GValue *val,
381 PropertySurety surety, PropertySource source);
383 static gboolean s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
384 DevicePropertyBase *base, GValue *val,
385 PropertySurety surety, PropertySource source);
387 static gboolean s3_device_set_use_subdomain_fn(Device *p_self,
388 DevicePropertyBase *base, GValue *val,
389 PropertySurety surety, PropertySource source);
391 static gboolean s3_device_set_host_fn(Device *p_self,
392 DevicePropertyBase *base, GValue *val,
393 PropertySurety surety, PropertySource source);
395 static gboolean s3_device_set_service_path_fn(Device *p_self,
396 DevicePropertyBase *base, GValue *val,
397 PropertySurety surety, PropertySource source);
399 static void s3_thread_read_block(gpointer thread_data,
401 static void s3_thread_write_block(gpointer thread_data,
404 /* Wait that all threads are done */
405 static void reset_thread(S3Device *self);
408 * virtual functions */
411 s3_device_open_device(Device *pself, char *device_name,
412 char * device_type, char * device_node);
414 static DeviceStatusFlags s3_device_read_label(Device * self);
417 s3_device_start(Device * self,
418 DeviceAccessMode mode,
423 s3_device_finish(Device * self);
426 s3_device_start_file(Device * self,
427 dumpfile_t * jobInfo);
430 s3_device_write_block(Device * self,
435 s3_device_finish_file(Device * self);
438 s3_device_seek_file(Device *pself,
442 s3_device_seek_block(Device *pself,
446 s3_device_read_block(Device * pself,
451 s3_device_recycle_file(Device *pself,
455 s3_device_erase(Device *pself);
458 check_at_leom(S3Device *self,
462 check_at_peom(S3Device *self,
470 file_and_block_to_key(S3Device *self,
474 char *s3_key = g_strdup_printf("%sf%08x-b%016llx.data",
475 self->prefix, file, (long long unsigned int)block);
476 g_assert(strlen(s3_key) <= S3_MAX_KEY_LENGTH);
481 special_file_to_key(S3Device *self,
486 return g_strdup_printf("%s" SPECIAL_INFIX "%s", self->prefix, special_name);
488 return g_strdup_printf("%sf%08x-%s", self->prefix, file, special_name);
492 write_amanda_header(S3Device *self,
496 CurlBuffer amanda_header = {NULL, 0, 0, 0};
499 dumpfile_t * dumpinfo = NULL;
500 Device *d_self = DEVICE(self);
503 /* build the header */
504 header_size = 0; /* no minimum size */
505 dumpinfo = make_tapestart_header(DEVICE(self), label, timestamp);
506 amanda_header.buffer = device_build_amanda_header(DEVICE(self), dumpinfo,
508 if (amanda_header.buffer == NULL) {
509 device_set_error(d_self,
510 stralloc(_("Amanda tapestart header won't fit in a single block!")),
511 DEVICE_STATUS_DEVICE_ERROR);
512 dumpfile_free(dumpinfo);
513 g_free(amanda_header.buffer);
517 if(check_at_leom(self, header_size))
518 d_self->is_eom = TRUE;
520 if(check_at_peom(self, header_size)) {
521 d_self->is_eom = TRUE;
522 device_set_error(d_self,
523 stralloc(_("No space left on device")),
524 DEVICE_STATUS_DEVICE_ERROR);
525 g_free(amanda_header.buffer);
529 /* write out the header and flush the uploads. */
530 key = special_file_to_key(self, "tapestart", -1);
531 g_assert(header_size < G_MAXUINT); /* for cast to guint */
532 amanda_header.buffer_len = (guint)header_size;
533 result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
534 &amanda_header, NULL, NULL);
535 g_free(amanda_header.buffer);
539 device_set_error(d_self,
540 vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3t[0].s3)),
541 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
542 dumpfile_free(dumpinfo);
544 dumpfile_free(d_self->volume_header);
545 d_self->volume_header = dumpinfo;
546 self->volume_bytes += header_size;
553 seek_to_end(S3Device *self) {
556 Device *pself = DEVICE(self);
558 last_file = find_last_file(self);
562 pself->file = last_file;
567 /* Convert an object name into a file number, assuming the given prefix
568 * length. Returns -1 if the object name is invalid, or 0 if the object name
569 * is a "special" key. */
570 static int key_to_file(guint prefix_len, const char * key) {
574 /* skip the prefix */
575 if (strlen(key) <= prefix_len)
580 if (strncmp(key, SPECIAL_INFIX, strlen(SPECIAL_INFIX)) == 0) {
584 /* check that key starts with 'f' */
589 /* check that key is of the form "%08x-" */
590 for (i = 0; i < 8; i++) {
591 if (!(key[i] >= '0' && key[i] <= '9') &&
592 !(key[i] >= 'a' && key[i] <= 'f') &&
593 !(key[i] >= 'A' && key[i] <= 'F')) break;
595 if (key[i] != '-') return -1;
596 if (i < 8) return -1;
598 /* convert the file number */
600 file = strtoul(key, NULL, 16);
602 g_warning(_("unparseable file number '%s'"), key);
609 /* Find the number of the last file that contains any data (even just a header).
610 * Returns -1 in event of an error
613 find_last_file(S3Device *self) {
616 unsigned int prefix_len = strlen(self->prefix);
618 Device *d_self = DEVICE(self);
620 /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
621 result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-", &keys, NULL);
623 device_set_error(d_self,
624 vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
625 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
629 for (; keys; keys = g_slist_remove(keys, keys->data)) {
630 int file = key_to_file(prefix_len, keys->data);
632 /* and if it's the last, keep it */
633 if (file > last_file)
640 /* Find the number of the file following the requested one, if any.
641 * Returns 0 if there is no such file or -1 in event of an error
644 find_next_file(S3Device *self, int last_file) {
647 unsigned int prefix_len = strlen(self->prefix);
649 Device *d_self = DEVICE(self);
651 /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
652 result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-",
655 device_set_error(d_self,
656 vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
657 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
661 for (; keys; keys = g_slist_remove(keys, keys->data)) {
664 file = key_to_file(prefix_len, (char*)keys->data);
667 /* Set this in case we don't find a next file; this is not a
668 * hard error, so if we can find a next file we'll return that
673 if (file < next_file && file > last_file) {
682 delete_file(S3Device *self,
687 guint64 total_size = 0;
688 char *my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
689 Device *d_self = DEVICE(self);
691 result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL, &keys,
694 device_set_error(d_self,
695 vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
696 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
700 /* this will likely be a *lot* of keys */
701 for (; keys; keys = g_slist_remove(keys, keys->data)) {
702 if (self->verbose) g_debug(_("Deleting %s"), (char*)keys->data);
703 if (!s3_delete(self->s3t[0].s3, self->bucket, keys->data)) {
704 device_set_error(d_self,
705 vstrallocf(_("While deleting key '%s': %s"),
706 (char*)keys->data, s3_strerror(self->s3t[0].s3)),
707 DEVICE_STATUS_DEVICE_ERROR);
712 self->volume_bytes = total_size;
718 delete_all_files(S3Device *self)
723 * Note: this has to be allowed to retry for a while because the bucket
724 * may have been created and not yet appeared
726 last_file = find_last_file(self);
729 s3_error_code_t s3_error_code;
730 s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
733 * if the bucket doesn't exist, it doesn't conatin any files,
734 * so the operation is a success
736 if ((response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket)) {
737 /* find_last_file set an error; clear it */
738 device_set_error(DEVICE(self), NULL, DEVICE_STATUS_SUCCESS);
741 /* find_last_file already set the error */
746 for (file = 1; file <= last_file; file++) {
747 if (!delete_file(self, file))
748 /* delete_file already set our error message */
760 s3_device_register(void)
762 static const char * device_prefix_list[] = { S3_DEVICE_NAME, NULL };
765 /* set up our properties */
766 device_property_fill_and_register(&device_property_s3_secret_key,
767 G_TYPE_STRING, "s3_secret_key",
768 "Secret access key to authenticate with Amazon S3");
769 device_property_fill_and_register(&device_property_s3_access_key,
770 G_TYPE_STRING, "s3_access_key",
771 "Access key ID to authenticate with Amazon S3");
772 device_property_fill_and_register(&device_property_s3_host,
773 G_TYPE_STRING, "s3_host",
774 "hostname:port of the server");
775 device_property_fill_and_register(&device_property_s3_service_path,
776 G_TYPE_STRING, "s3_service_path",
777 "path to add in the url");
778 device_property_fill_and_register(&device_property_s3_user_token,
779 G_TYPE_STRING, "s3_user_token",
780 "User token for authentication Amazon devpay requests");
781 device_property_fill_and_register(&device_property_s3_bucket_location,
782 G_TYPE_STRING, "s3_bucket_location",
783 "Location constraint for buckets on Amazon S3");
784 device_property_fill_and_register(&device_property_s3_storage_class,
785 G_TYPE_STRING, "s3_storage_class",
786 "Storage class as specified by Amazon (STANDARD or REDUCED_REDUNDANCY)");
787 device_property_fill_and_register(&device_property_ssl_ca_info,
788 G_TYPE_STRING, "ssl_ca_info",
789 "Path to certificate authority certificate");
790 device_property_fill_and_register(&device_property_s3_ssl,
791 G_TYPE_BOOLEAN, "s3_ssl",
792 "Whether to use SSL with Amazon S3");
793 device_property_fill_and_register(&device_property_s3_subdomain,
794 G_TYPE_BOOLEAN, "s3_subdomain",
795 "Whether to use subdomain");
796 device_property_fill_and_register(&device_property_max_send_speed,
797 G_TYPE_UINT64, "max_send_speed",
798 "Maximum average upload speed (bytes/sec)");
799 device_property_fill_and_register(&device_property_max_recv_speed,
800 G_TYPE_UINT64, "max_recv_speed",
801 "Maximum average download speed (bytes/sec)");
802 device_property_fill_and_register(&device_property_nb_threads_backup,
803 G_TYPE_UINT64, "nb_threads_backup",
804 "Number of writer thread");
805 device_property_fill_and_register(&device_property_nb_threads_recovery,
806 G_TYPE_UINT64, "nb_threads_recovery",
807 "Number of reader thread");
809 /* register the device itself */
810 register_device(s3_device_factory, device_prefix_list);
814 s3_device_get_type(void)
816 static GType type = 0;
818 if G_UNLIKELY(type == 0) {
819 static const GTypeInfo info = {
820 sizeof (S3DeviceClass),
821 (GBaseInitFunc) NULL,
822 (GBaseFinalizeFunc) NULL,
823 (GClassInitFunc) s3_device_class_init,
824 (GClassFinalizeFunc) NULL,
825 NULL /* class_data */,
828 (GInstanceInitFunc) s3_device_init,
832 type = g_type_register_static (TYPE_DEVICE, "S3Device", &info,
840 s3_device_init(S3Device * self)
842 Device * dself = DEVICE(self);
845 self->volume_bytes = 0;
846 self->volume_limit = 0;
848 self->enforce_volume_limit = FALSE;
849 self->use_subdomain = FALSE;
850 self->nb_threads = 1;
851 self->nb_threads_backup = 1;
852 self->nb_threads_recovery = 1;
853 self->thread_pool_write = NULL;
854 self->thread_pool_read = NULL;
855 self->thread_idle_cond = NULL;
856 self->thread_idle_mutex = NULL;
858 /* Register property values
859 * Note: Some aren't added until s3_device_open_device()
861 bzero(&response, sizeof(response));
863 g_value_init(&response, CONCURRENCY_PARADIGM_TYPE);
864 g_value_set_enum(&response, CONCURRENCY_PARADIGM_SHARED_READ);
865 device_set_simple_property(dself, PROPERTY_CONCURRENCY,
866 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
867 g_value_unset(&response);
869 g_value_init(&response, STREAMING_REQUIREMENT_TYPE);
870 g_value_set_enum(&response, STREAMING_REQUIREMENT_NONE);
871 device_set_simple_property(dself, PROPERTY_STREAMING,
872 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
873 g_value_unset(&response);
875 g_value_init(&response, G_TYPE_BOOLEAN);
876 g_value_set_boolean(&response, TRUE);
877 device_set_simple_property(dself, PROPERTY_APPENDABLE,
878 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
879 g_value_unset(&response);
881 g_value_init(&response, G_TYPE_BOOLEAN);
882 g_value_set_boolean(&response, TRUE);
883 device_set_simple_property(dself, PROPERTY_PARTIAL_DELETION,
884 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
885 g_value_unset(&response);
887 g_value_init(&response, G_TYPE_BOOLEAN);
888 g_value_set_boolean(&response, TRUE);
889 device_set_simple_property(dself, PROPERTY_FULL_DELETION,
890 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
891 g_value_unset(&response);
893 g_value_init(&response, G_TYPE_BOOLEAN);
894 g_value_set_boolean(&response, TRUE); /* well, there *is* no EOM on S3 .. */
895 device_set_simple_property(dself, PROPERTY_LEOM,
896 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
897 g_value_unset(&response);
899 g_value_init(&response, G_TYPE_BOOLEAN);
900 g_value_set_boolean(&response, FALSE);
901 device_set_simple_property(dself, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
902 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
903 g_value_unset(&response);
905 g_value_init(&response, G_TYPE_BOOLEAN);
906 g_value_set_boolean(&response, FALSE);
907 device_set_simple_property(dself, PROPERTY_S3_SUBDOMAIN,
908 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
909 g_value_unset(&response);
911 g_value_init(&response, G_TYPE_BOOLEAN);
912 g_value_set_boolean(&response, FALSE);
913 device_set_simple_property(dself, PROPERTY_COMPRESSION,
914 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
915 g_value_unset(&response);
917 g_value_init(&response, MEDIA_ACCESS_MODE_TYPE);
918 g_value_set_enum(&response, MEDIA_ACCESS_MODE_READ_WRITE);
919 device_set_simple_property(dself, PROPERTY_MEDIUM_ACCESS_TYPE,
920 &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
921 g_value_unset(&response);
926 s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED)
928 GObjectClass *g_object_class = (GObjectClass*) c;
929 DeviceClass *device_class = (DeviceClass *)c;
931 parent_class = g_type_class_ref (TYPE_DEVICE);
933 device_class->open_device = s3_device_open_device;
934 device_class->read_label = s3_device_read_label;
935 device_class->start = s3_device_start;
936 device_class->finish = s3_device_finish;
938 device_class->start_file = s3_device_start_file;
939 device_class->write_block = s3_device_write_block;
940 device_class->finish_file = s3_device_finish_file;
942 device_class->seek_file = s3_device_seek_file;
943 device_class->seek_block = s3_device_seek_block;
944 device_class->read_block = s3_device_read_block;
945 device_class->recycle_file = s3_device_recycle_file;
947 device_class->erase = s3_device_erase;
949 g_object_class->finalize = s3_device_finalize;
951 device_class_register_property(device_class, PROPERTY_S3_ACCESS_KEY,
952 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
953 device_simple_property_get_fn,
954 s3_device_set_access_key_fn);
956 device_class_register_property(device_class, PROPERTY_S3_SECRET_KEY,
957 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
958 device_simple_property_get_fn,
959 s3_device_set_secret_key_fn);
961 device_class_register_property(device_class, PROPERTY_S3_HOST,
962 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
963 device_simple_property_get_fn,
964 s3_device_set_host_fn);
966 device_class_register_property(device_class, PROPERTY_S3_SERVICE_PATH,
967 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
968 device_simple_property_get_fn,
969 s3_device_set_service_path_fn);
971 device_class_register_property(device_class, PROPERTY_S3_USER_TOKEN,
972 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
973 device_simple_property_get_fn,
974 s3_device_set_user_token_fn);
976 device_class_register_property(device_class, PROPERTY_S3_BUCKET_LOCATION,
977 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
978 device_simple_property_get_fn,
979 s3_device_set_bucket_location_fn);
981 device_class_register_property(device_class, PROPERTY_S3_STORAGE_CLASS,
982 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
983 device_simple_property_get_fn,
984 s3_device_set_storage_class_fn);
986 device_class_register_property(device_class, PROPERTY_SSL_CA_INFO,
987 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
988 device_simple_property_get_fn,
989 s3_device_set_ca_info_fn);
991 device_class_register_property(device_class, PROPERTY_VERBOSE,
992 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
993 device_simple_property_get_fn,
994 s3_device_set_verbose_fn);
996 device_class_register_property(device_class, PROPERTY_S3_SSL,
997 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
998 device_simple_property_get_fn,
999 s3_device_set_ssl_fn);
1001 device_class_register_property(device_class, PROPERTY_MAX_SEND_SPEED,
1002 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1003 device_simple_property_get_fn,
1004 s3_device_set_max_send_speed_fn);
1006 device_class_register_property(device_class, PROPERTY_MAX_RECV_SPEED,
1007 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1008 device_simple_property_get_fn,
1009 s3_device_set_max_recv_speed_fn);
1011 device_class_register_property(device_class, PROPERTY_NB_THREADS_BACKUP,
1012 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1013 device_simple_property_get_fn,
1014 s3_device_set_nb_threads_backup);
1016 device_class_register_property(device_class, PROPERTY_NB_THREADS_RECOVERY,
1017 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1018 device_simple_property_get_fn,
1019 s3_device_set_nb_threads_recovery);
1021 device_class_register_property(device_class, PROPERTY_COMPRESSION,
1022 PROPERTY_ACCESS_GET_MASK,
1023 device_simple_property_get_fn,
1026 device_class_register_property(device_class, PROPERTY_LEOM,
1027 PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
1028 device_simple_property_get_fn,
1029 property_set_leom_fn);
1031 device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
1032 (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1033 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1034 device_simple_property_get_fn,
1035 s3_device_set_max_volume_usage_fn);
1037 device_class_register_property(device_class, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
1038 (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1039 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1040 device_simple_property_get_fn,
1041 s3_device_set_enforce_max_volume_usage_fn);
1043 device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN,
1044 (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
1045 (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
1046 device_simple_property_get_fn,
1047 s3_device_set_use_subdomain_fn);
1051 s3_device_set_access_key_fn(Device *p_self, DevicePropertyBase *base,
1052 GValue *val, PropertySurety surety, PropertySource source)
1054 S3Device *self = S3_DEVICE(p_self);
1056 amfree(self->access_key);
1057 self->access_key = g_value_dup_string(val);
1058 device_clear_volume_details(p_self);
1060 return device_simple_property_set_fn(p_self, base, val, surety, source);
1064 s3_device_set_secret_key_fn(Device *p_self, DevicePropertyBase *base,
1065 GValue *val, PropertySurety surety, PropertySource source)
1067 S3Device *self = S3_DEVICE(p_self);
1069 amfree(self->secret_key);
1070 self->secret_key = g_value_dup_string(val);
1071 device_clear_volume_details(p_self);
1073 return device_simple_property_set_fn(p_self, base, val, surety, source);
1077 s3_device_set_host_fn(Device *p_self,
1078 DevicePropertyBase *base, GValue *val,
1079 PropertySurety surety, PropertySource source)
1081 S3Device *self = S3_DEVICE(p_self);
1084 self->host = g_value_dup_string(val);
1085 device_clear_volume_details(p_self);
1087 return device_simple_property_set_fn(p_self, base, val, surety, source);
1091 s3_device_set_service_path_fn(Device *p_self,
1092 DevicePropertyBase *base, GValue *val,
1093 PropertySurety surety, PropertySource source)
1095 S3Device *self = S3_DEVICE(p_self);
1097 amfree(self->service_path);
1098 self->service_path = g_value_dup_string(val);
1099 device_clear_volume_details(p_self);
1101 return device_simple_property_set_fn(p_self, base, val, surety, source);
1105 s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base,
1106 GValue *val, PropertySurety surety, PropertySource source)
1108 S3Device *self = S3_DEVICE(p_self);
1110 amfree(self->user_token);
1111 self->user_token = g_value_dup_string(val);
1112 device_clear_volume_details(p_self);
1114 return device_simple_property_set_fn(p_self, base, val, surety, source);
1118 s3_device_set_bucket_location_fn(Device *p_self, DevicePropertyBase *base,
1119 GValue *val, PropertySurety surety, PropertySource source)
1121 S3Device *self = S3_DEVICE(p_self);
1122 char *str_val = g_value_dup_string(val);
1124 if (str_val[0] && self->use_ssl && !s3_curl_location_compat()) {
1125 device_set_error(p_self, stralloc(_(
1126 "Location constraint given for Amazon S3 bucket, "
1127 "but libcurl is too old support wildcard certificates.")),
1128 DEVICE_STATUS_DEVICE_ERROR);
1132 if (str_val[0] && !s3_bucket_location_compat(self->bucket)) {
1133 device_set_error(p_self, g_strdup_printf(_(
1134 "Location constraint given for Amazon S3 bucket, "
1135 "but the bucket name (%s) is not usable as a subdomain."),
1137 DEVICE_STATUS_DEVICE_ERROR);
1141 amfree(self->bucket_location);
1142 self->bucket_location = str_val;
1143 device_clear_volume_details(p_self);
1145 return device_simple_property_set_fn(p_self, base, val, surety, source);
1152 s3_device_set_storage_class_fn(Device *p_self, DevicePropertyBase *base,
1153 GValue *val, PropertySurety surety, PropertySource source)
1155 S3Device *self = S3_DEVICE(p_self);
1156 char *str_val = g_value_dup_string(val);
1158 amfree(self->storage_class);
1159 self->storage_class = str_val;
1160 device_clear_volume_details(p_self);
1162 return device_simple_property_set_fn(p_self, base, val, surety, source);
1166 s3_device_set_ca_info_fn(Device *p_self, DevicePropertyBase *base,
1167 GValue *val, PropertySurety surety, PropertySource source)
1169 S3Device *self = S3_DEVICE(p_self);
1171 amfree(self->ca_info);
1172 self->ca_info = g_value_dup_string(val);
1173 device_clear_volume_details(p_self);
1175 return device_simple_property_set_fn(p_self, base, val, surety, source);
1179 s3_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base,
1180 GValue *val, PropertySurety surety, PropertySource source)
1182 S3Device *self = S3_DEVICE(p_self);
1185 self->verbose = g_value_get_boolean(val);
1186 /* Our S3 handle may not yet have been instantiated; if so, it will
1187 * get the proper verbose setting when it is created */
1189 for (thread = 0; thread < self->nb_threads; thread++) {
1190 if (self->s3t[thread].s3)
1191 s3_verbose(self->s3t[thread].s3, self->verbose);
1195 return device_simple_property_set_fn(p_self, base, val, surety, source);
1199 s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base,
1200 GValue *val, PropertySurety surety, PropertySource source)
1202 S3Device *self = S3_DEVICE(p_self);
1206 new_val = g_value_get_boolean(val);
1207 /* Our S3 handle may not yet have been instantiated; if so, it will
1208 * get the proper use_ssl setting when it is created */
1210 for (thread = 0; thread < self->nb_threads; thread++) {
1211 if (self->s3t[thread].s3 && !s3_use_ssl(self->s3t[thread].s3, new_val)) {
1212 device_set_error(p_self, g_strdup_printf(_(
1213 "Error setting S3 SSL/TLS use "
1214 "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
1215 DEVICE_STATUS_DEVICE_ERROR);
1220 self->use_ssl = new_val;
1222 return device_simple_property_set_fn(p_self, base, val, surety, source);
1226 s3_device_set_max_send_speed_fn(Device *p_self,
1227 DevicePropertyBase *base, GValue *val,
1228 PropertySurety surety, PropertySource source)
1230 S3Device *self = S3_DEVICE(p_self);
1234 new_val = g_value_get_uint64(val);
1236 for (thread = 0; thread < self->nb_threads; thread++) {
1237 if (self->s3t[thread].s3 && !s3_set_max_send_speed(self->s3t[thread].s3, new_val)) {
1238 device_set_error(p_self,
1239 g_strdup("Could not set S3 maximum send speed"),
1240 DEVICE_STATUS_DEVICE_ERROR);
1245 self->max_send_speed = new_val;
1247 return device_simple_property_set_fn(p_self, base, val, surety, source);
1251 s3_device_set_max_recv_speed_fn(Device *p_self,
1252 DevicePropertyBase *base, GValue *val,
1253 PropertySurety surety, PropertySource source)
1255 S3Device *self = S3_DEVICE(p_self);
1259 new_val = g_value_get_uint64(val);
1261 for (thread = 0; thread < self->nb_threads; thread++) {
1262 if (self->s3t[thread].s3 &&
1263 !s3_set_max_recv_speed(self->s3t[thread].s3, new_val)) {
1264 device_set_error(p_self,
1265 g_strdup("Could not set S3 maximum recv speed"),
1266 DEVICE_STATUS_DEVICE_ERROR);
1271 self->max_recv_speed = new_val;
1273 return device_simple_property_set_fn(p_self, base, val, surety, source);
1277 s3_device_set_nb_threads_backup(Device *p_self,
1278 DevicePropertyBase *base, GValue *val,
1279 PropertySurety surety, PropertySource source)
1281 S3Device *self = S3_DEVICE(p_self);
1284 new_val = g_value_get_uint64(val);
1285 self->nb_threads_backup = new_val;
1286 if (self->nb_threads_backup > self->nb_threads) {
1287 self->nb_threads = self->nb_threads_backup;
1290 return device_simple_property_set_fn(p_self, base, val, surety, source);
1294 s3_device_set_nb_threads_recovery(Device *p_self,
1295 DevicePropertyBase *base, GValue *val,
1296 PropertySurety surety, PropertySource source)
1298 S3Device *self = S3_DEVICE(p_self);
1301 new_val = g_value_get_uint64(val);
1302 self->nb_threads_recovery = new_val;
1303 if (self->nb_threads_recovery > self->nb_threads) {
1304 self->nb_threads = self->nb_threads_recovery;
1307 return device_simple_property_set_fn(p_self, base, val, surety, source);
1311 s3_device_set_max_volume_usage_fn(Device *p_self,
1312 DevicePropertyBase *base, GValue *val,
1313 PropertySurety surety, PropertySource source)
1315 S3Device *self = S3_DEVICE(p_self);
1317 self->volume_limit = g_value_get_uint64(val);
1319 return device_simple_property_set_fn(p_self, base, val, surety, source);
1324 s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
1325 DevicePropertyBase *base, GValue *val,
1326 PropertySurety surety, PropertySource source)
1328 S3Device *self = S3_DEVICE(p_self);
1330 self->enforce_volume_limit = g_value_get_boolean(val);
1332 return device_simple_property_set_fn(p_self, base, val, surety, source);
1337 s3_device_set_use_subdomain_fn(Device *p_self,
1338 DevicePropertyBase *base, GValue *val,
1339 PropertySurety surety, PropertySource source)
1341 S3Device *self = S3_DEVICE(p_self);
1343 self->use_subdomain = g_value_get_boolean(val);
1345 return device_simple_property_set_fn(p_self, base, val, surety, source);
1349 property_set_leom_fn(Device *p_self,
1350 DevicePropertyBase *base, GValue *val,
1351 PropertySurety surety, PropertySource source)
1353 S3Device *self = S3_DEVICE(p_self);
1355 self->leom = g_value_get_boolean(val);
1357 return device_simple_property_set_fn(p_self, base, val, surety, source);
1360 s3_device_factory(char * device_name, char * device_type, char * device_node)
1364 g_assert(0 == strcmp(device_type, S3_DEVICE_NAME));
1365 rval = DEVICE(g_object_new(TYPE_S3_DEVICE, NULL));
1366 s3_rval = (S3Device*)rval;
1368 device_open_device(rval, device_name, device_type, device_node);
1373 * Virtual function overrides
1377 s3_device_open_device(Device *pself, char *device_name,
1378 char * device_type, char * device_node)
1380 S3Device *self = S3_DEVICE(pself);
1384 pself->min_block_size = S3_DEVICE_MIN_BLOCK_SIZE;
1385 pself->max_block_size = S3_DEVICE_MAX_BLOCK_SIZE;
1386 pself->block_size = S3_DEVICE_DEFAULT_BLOCK_SIZE;
1388 /* Device name may be bucket/prefix, to support multiple volumes in a
1390 name_colon = strchr(device_node, '/');
1391 if (name_colon == NULL) {
1392 self->bucket = g_strdup(device_node);
1393 self->prefix = g_strdup("");
1395 self->bucket = g_strndup(device_node, name_colon - device_node);
1396 self->prefix = g_strdup(name_colon + 1);
1399 if (self->bucket == NULL || self->bucket[0] == '\0') {
1400 device_set_error(pself,
1401 vstrallocf(_("Empty bucket name in device %s"), device_name),
1402 DEVICE_STATUS_DEVICE_ERROR);
1403 amfree(self->bucket);
1404 amfree(self->prefix);
1408 g_debug(_("S3 driver using bucket '%s', prefix '%s'"), self->bucket, self->prefix);
1410 /* default values */
1411 self->verbose = FALSE;
1413 /* use SSL if available */
1414 self->use_ssl = s3_curl_supports_ssl();
1415 bzero(&tmp_value, sizeof(GValue));
1416 g_value_init(&tmp_value, G_TYPE_BOOLEAN);
1417 g_value_set_boolean(&tmp_value, self->use_ssl);
1418 device_set_simple_property(pself, device_property_s3_ssl.ID,
1419 &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
1421 if (parent_class->open_device) {
1422 parent_class->open_device(pself, device_name, device_type, device_node);
1426 static void s3_device_finalize(GObject * obj_self) {
1427 S3Device *self = S3_DEVICE (obj_self);
1430 if(G_OBJECT_CLASS(parent_class)->finalize)
1431 (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
1433 if (self->thread_pool_write) {
1434 g_thread_pool_free(self->thread_pool_write, 1, 1);
1435 self->thread_pool_write = NULL;
1437 if (self->thread_pool_read) {
1438 g_thread_pool_free(self->thread_pool_read, 1, 1);
1439 self->thread_pool_read = NULL;
1441 if (self->thread_idle_mutex) {
1442 g_mutex_free(self->thread_idle_mutex);
1443 self->thread_idle_mutex = NULL;
1445 if (self->thread_idle_cond) {
1446 g_cond_free(self->thread_idle_cond);
1447 self->thread_idle_cond = NULL;
1450 for (thread = 0; thread < self->nb_threads; thread++) {
1451 if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3);
1455 if(self->bucket) g_free(self->bucket);
1456 if(self->prefix) g_free(self->prefix);
1457 if(self->access_key) g_free(self->access_key);
1458 if(self->secret_key) g_free(self->secret_key);
1459 if(self->host) g_free(self->host);
1460 if(self->service_path) g_free(self->service_path);
1461 if(self->user_token) g_free(self->user_token);
1462 if(self->bucket_location) g_free(self->bucket_location);
1463 if(self->storage_class) g_free(self->storage_class);
1464 if(self->ca_info) g_free(self->ca_info);
1467 static gboolean setup_handle(S3Device * self) {
1468 Device *d_self = DEVICE(self);
1471 if (self->s3t == NULL) {
1472 self->s3t = g_new(S3_by_thread, self->nb_threads);
1473 if (self->s3t == NULL) {
1474 device_set_error(d_self,
1475 stralloc(_("Can't allocate S3Handle array")),
1476 DEVICE_STATUS_DEVICE_ERROR);
1479 if (self->access_key == NULL || self->access_key[0] == '\0') {
1480 device_set_error(d_self,
1481 stralloc(_("No Amazon access key specified")),
1482 DEVICE_STATUS_DEVICE_ERROR);
1486 if (self->secret_key == NULL || self->secret_key[0] == '\0') {
1487 device_set_error(d_self,
1488 stralloc(_("No Amazon secret key specified")),
1489 DEVICE_STATUS_DEVICE_ERROR);
1493 if (!self->use_ssl && self->ca_info) {
1494 amfree(self->ca_info);
1497 for (thread = 0; thread < self->nb_threads; thread++) {
1498 self->s3t[thread].idle = 1;
1499 self->s3t[thread].done = 1;
1500 self->s3t[thread].eof = FALSE;
1501 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
1502 self->s3t[thread].errmsg = NULL;
1503 self->s3t[thread].filename = NULL;
1504 self->s3t[thread].curl_buffer.buffer = NULL;
1505 self->s3t[thread].curl_buffer.buffer_len = 0;
1506 self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key,
1507 self->host, self->service_path,
1508 self->use_subdomain,
1509 self->user_token, self->bucket_location,
1510 self->storage_class, self->ca_info);
1511 if (self->s3t[thread].s3 == NULL) {
1512 device_set_error(d_self,
1513 stralloc(_("Internal error creating S3 handle")),
1514 DEVICE_STATUS_DEVICE_ERROR);
1519 g_debug("Create %d threads", self->nb_threads);
1520 self->thread_pool_write = g_thread_pool_new(s3_thread_write_block, self,
1521 self->nb_threads, 0, NULL);
1522 self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self,
1523 self->nb_threads, 0, NULL);
1524 self->thread_idle_cond = g_cond_new();
1525 self->thread_idle_mutex = g_mutex_new();
1528 for (thread = 0; thread < self->nb_threads; thread++) {
1529 s3_verbose(self->s3t[thread].s3, self->verbose);
1531 if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
1532 device_set_error(d_self, g_strdup_printf(_(
1533 "Error setting S3 SSL/TLS use "
1534 "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
1535 DEVICE_STATUS_DEVICE_ERROR);
1539 if (self->max_send_speed &&
1540 !s3_set_max_send_speed(self->s3t[thread].s3, self->max_send_speed)) {
1541 device_set_error(d_self,
1542 g_strdup("Could not set S3 maximum send speed"),
1543 DEVICE_STATUS_DEVICE_ERROR);
1547 if (self->max_recv_speed &&
1548 !s3_set_max_recv_speed(self->s3t[thread].s3, self->max_recv_speed)) {
1549 device_set_error(d_self,
1550 g_strdup("Could not set S3 maximum recv speed"),
1551 DEVICE_STATUS_DEVICE_ERROR);
1559 static DeviceStatusFlags
1560 s3_device_read_label(Device *pself) {
1561 S3Device *self = S3_DEVICE(pself);
1563 CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
1564 dumpfile_t *amanda_header;
1565 /* note that this may be called from s3_device_start, when
1566 * self->access_mode is not ACCESS_NULL */
1568 amfree(pself->volume_label);
1569 amfree(pself->volume_time);
1570 dumpfile_free(pself->volume_header);
1571 pself->volume_header = NULL;
1573 if (device_in_error(self)) return pself->status;
1575 if (!setup_handle(self)) {
1576 /* setup_handle already set our error message */
1577 return pself->status;
1581 key = special_file_to_key(self, "tapestart", -1);
1582 if (!s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) {
1583 guint response_code;
1584 s3_error_code_t s3_error_code;
1585 s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
1587 /* if it's an expected error (not found), just return FALSE */
1588 if (response_code == 404 &&
1589 (s3_error_code == S3_ERROR_NoSuchKey ||
1590 s3_error_code == S3_ERROR_NoSuchEntity ||
1591 s3_error_code == S3_ERROR_NoSuchBucket)) {
1592 g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)"));
1593 device_set_error(pself,
1594 stralloc(_("Amanda header not found -- unlabeled volume?")),
1595 DEVICE_STATUS_DEVICE_ERROR
1596 | DEVICE_STATUS_VOLUME_ERROR
1597 | DEVICE_STATUS_VOLUME_UNLABELED);
1598 return pself->status;
1601 /* otherwise, log it and return */
1602 device_set_error(pself,
1603 vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3t[0].s3)),
1604 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
1605 return pself->status;
1608 /* handle an empty file gracefully */
1609 if (buf.buffer_len == 0) {
1610 device_set_error(pself, stralloc(_("Empty header file")), DEVICE_STATUS_VOLUME_ERROR);
1611 return pself->status;
1614 g_assert(buf.buffer != NULL);
1615 amanda_header = g_new(dumpfile_t, 1);
1616 parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
1617 pself->volume_header = amanda_header;
1620 if (amanda_header->type != F_TAPESTART) {
1621 device_set_error(pself, stralloc(_("Invalid amanda header")), DEVICE_STATUS_VOLUME_ERROR);
1622 return pself->status;
1625 pself->volume_label = g_strdup(amanda_header->name);
1626 pself->volume_time = g_strdup(amanda_header->datestamp);
1627 /* pself->volume_header is already set */
1629 device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
1631 return pself->status;
1635 s3_device_start (Device * pself, DeviceAccessMode mode,
1636 char * label, char * timestamp) {
1639 guint64 total_size = 0;
1642 self = S3_DEVICE(pself);
1644 if (device_in_error(self)) return FALSE;
1646 if (!setup_handle(self)) {
1647 /* setup_handle already set our error message */
1652 pself->access_mode = mode;
1653 pself->in_file = FALSE;
1655 /* try creating the bucket, in case it doesn't exist */
1656 if (mode != ACCESS_READ && !s3_make_bucket(self->s3t[0].s3, self->bucket)) {
1657 guint response_code;
1658 s3_error_code_t s3_error_code;
1659 s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
1661 /* if it isn't an expected error (bucket already exists),
1663 if (response_code != 409 ||
1664 (s3_error_code != S3_ERROR_BucketAlreadyExists &&
1665 s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) {
1666 device_set_error(pself,
1667 vstrallocf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)),
1668 DEVICE_STATUS_DEVICE_ERROR);
1673 /* take care of any dirty work for this mode */
1676 if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
1677 /* s3_device_read_label already set our error message */
1683 delete_all_files(self);
1685 /* write a new amanda header */
1686 if (!write_amanda_header(self, label, timestamp)) {
1690 pself->volume_label = newstralloc(pself->volume_label, label);
1691 pself->volume_time = newstralloc(pself->volume_time, timestamp);
1693 /* unset the VOLUME_UNLABELED flag, if it was set */
1694 device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS);
1698 if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
1699 /* s3_device_read_label already set our error message */
1702 result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, NULL, &keys, &total_size);
1704 device_set_error(pself,
1705 vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
1706 DEVICE_STATUS_DEVICE_ERROR|DEVICE_STATUS_VOLUME_ERROR);
1709 self->volume_bytes = total_size;
1712 return seek_to_end(self);
1716 g_assert_not_reached();
1726 S3Device *self = S3_DEVICE(pself);
1730 /* we're not in a file anymore */
1731 pself->access_mode = ACCESS_NULL;
1733 if (device_in_error(pself)) return FALSE;
1738 /* functions for writing */
1742 s3_device_start_file (Device *pself, dumpfile_t *jobInfo) {
1743 S3Device *self = S3_DEVICE(pself);
1744 CurlBuffer amanda_header = {NULL, 0, 0, 0};
1750 if (device_in_error(self)) return FALSE;
1753 pself->is_eom = FALSE;
1755 /* Set the blocksize to zero, since there's no header to skip (it's stored
1756 * in a distinct file, rather than block zero) */
1757 jobInfo->blocksize = 0;
1759 /* Build the amanda header. */
1760 header_size = 0; /* no minimum size */
1761 amanda_header.buffer = device_build_amanda_header(pself, jobInfo,
1763 if (amanda_header.buffer == NULL) {
1764 device_set_error(pself,
1765 stralloc(_("Amanda file header won't fit in a single block!")),
1766 DEVICE_STATUS_DEVICE_ERROR);
1769 amanda_header.buffer_len = header_size;
1771 if(check_at_leom(self, header_size))
1772 pself->is_eom = TRUE;
1774 if(check_at_peom(self, header_size)) {
1775 pself->is_eom = TRUE;
1776 device_set_error(pself,
1777 stralloc(_("No space left on device")),
1778 DEVICE_STATUS_DEVICE_ERROR);
1779 g_free(amanda_header.buffer);
1782 /* set the file and block numbers correctly */
1783 pself->file = (pself->file > 0)? pself->file+1 : 1;
1785 pself->in_file = TRUE;
1786 /* write it out as a special block (not the 0th) */
1787 key = special_file_to_key(self, "filestart", pself->file);
1788 result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
1789 &amanda_header, NULL, NULL);
1790 g_free(amanda_header.buffer);
1793 device_set_error(pself,
1794 vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3t[0].s3)),
1795 DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
1799 self->volume_bytes += header_size;
1800 for (thread = 0; thread < self->nb_threads; thread++) {
1801 self->s3t[thread].idle = 1;
1808 s3_device_write_block (Device * pself, guint size, gpointer data) {
1810 S3Device * self = S3_DEVICE(pself);
1811 int idle_thread = 0;
1813 int first_idle = -1;
1815 g_assert (self != NULL);
1816 g_assert (data != NULL);
1817 if (device_in_error(self)) return FALSE;
1819 if(check_at_leom(self, size))
1820 pself->is_eom = TRUE;
1822 if(check_at_peom(self, size)) {
1823 pself->is_eom = TRUE;
1824 device_set_error(pself,
1825 stralloc(_("No space left on device")),
1826 DEVICE_STATUS_DEVICE_ERROR);
1830 filename = file_and_block_to_key(self, pself->file, pself->block);
1832 g_mutex_lock(self->thread_idle_mutex);
1833 while (!idle_thread) {
1835 for (thread = 0; thread < self->nb_threads_backup; thread++) {
1836 if (self->s3t[thread].idle == 1) {
1838 if (first_idle == -1)
1839 first_idle = thread;
1840 /* Check if the thread is in error */
1841 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
1842 device_set_error(pself, (char *)self->s3t[thread].errmsg,
1843 self->s3t[thread].errflags);
1844 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
1845 self->s3t[thread].errmsg = NULL;
1846 g_mutex_unlock(self->thread_idle_mutex);
1852 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
1855 thread = first_idle;
1857 self->s3t[thread].idle = 0;
1858 self->s3t[thread].done = 0;
1859 if (self->s3t[thread].curl_buffer.buffer &&
1860 self->s3t[thread].curl_buffer.buffer_len < size) {
1861 g_free((char *)self->s3t[thread].curl_buffer.buffer);
1862 self->s3t[thread].curl_buffer.buffer = NULL;
1863 self->s3t[thread].curl_buffer.buffer_len = 0;
1864 self->s3t[thread].buffer_len = 0;
1866 if (self->s3t[thread].curl_buffer.buffer == NULL) {
1867 self->s3t[thread].curl_buffer.buffer = g_malloc(size);
1868 self->s3t[thread].curl_buffer.buffer_len = size;
1869 self->s3t[thread].buffer_len = size;
1871 memcpy((char *)self->s3t[thread].curl_buffer.buffer, data, size);
1872 self->s3t[thread].curl_buffer.buffer_pos = 0;
1873 self->s3t[thread].curl_buffer.buffer_len = size;
1874 self->s3t[thread].curl_buffer.max_buffer_size = 0;
1875 self->s3t[thread].filename = filename;
1876 g_thread_pool_push(self->thread_pool_write, &self->s3t[thread], NULL);
1877 g_mutex_unlock(self->thread_idle_mutex);
1880 self->volume_bytes += size;
1885 s3_thread_write_block(
1886 gpointer thread_data,
1889 S3_by_thread *s3t = (S3_by_thread *)thread_data;
1890 Device *pself = (Device *)data;
1891 S3Device *self = S3_DEVICE(pself);
1894 result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename,
1895 S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
1896 g_free((void *)s3t->filename);
1897 s3t->filename = NULL;
1899 s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
1900 s3t->errmsg = g_strdup_printf(_("While writing data block to S3: %s"), s3_strerror(s3t->s3));
1902 g_mutex_lock(self->thread_idle_mutex);
1905 s3t->curl_buffer.buffer_len = s3t->buffer_len;
1906 g_cond_broadcast(self->thread_idle_cond);
1907 g_mutex_unlock(self->thread_idle_mutex);
1911 s3_device_finish_file (Device * pself) {
1912 S3Device *self = S3_DEVICE(pself);
1914 /* Check all threads are done */
1915 int idle_thread = 0;
1918 g_mutex_lock(self->thread_idle_mutex);
1919 while (idle_thread != self->nb_threads) {
1921 for (thread = 0; thread < self->nb_threads; thread++) {
1922 if (self->s3t[thread].idle == 1) {
1925 /* check thread status */
1926 if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
1927 device_set_error(pself, (char *)self->s3t[thread].errmsg,
1928 self->s3t[thread].errflags);
1929 self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
1930 self->s3t[thread].errmsg = NULL;
1933 if (idle_thread != self->nb_threads) {
1934 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
1937 g_mutex_unlock(self->thread_idle_mutex);
1939 if (device_in_error(pself)) return FALSE;
1941 /* we're not in a file anymore */
1942 pself->in_file = FALSE;
1948 s3_device_recycle_file(Device *pself, guint file) {
1949 S3Device *self = S3_DEVICE(pself);
1950 if (device_in_error(self)) return FALSE;
1953 return delete_file(self, file);
1954 /* delete_file already set our error message if necessary */
1958 s3_device_erase(Device *pself) {
1959 S3Device *self = S3_DEVICE(pself);
1961 const char *errmsg = NULL;
1962 guint response_code;
1963 s3_error_code_t s3_error_code;
1965 if (!setup_handle(self)) {
1966 /* error set by setup_handle */
1971 key = special_file_to_key(self, "tapestart", -1);
1972 if (!s3_delete(self->s3t[0].s3, self->bucket, key)) {
1973 s3_error(self->s3t[0].s3, &errmsg, NULL, NULL, NULL, NULL, NULL);
1974 device_set_error(pself,
1976 DEVICE_STATUS_DEVICE_ERROR);
1981 if (!delete_all_files(self))
1984 if (!s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
1985 s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
1988 * ignore the error if the bucket isn't empty (there may be data from elsewhere)
1989 * or the bucket not existing (already deleted perhaps?)
1992 (response_code == 409 && s3_error_code == S3_ERROR_BucketNotEmpty) ||
1993 (response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket))) {
1995 device_set_error(pself,
1997 DEVICE_STATUS_DEVICE_ERROR);
2001 self->volume_bytes = 0;
2005 /* functions for reading */
2008 s3_device_seek_file(Device *pself, guint file) {
2009 S3Device *self = S3_DEVICE(pself);
2012 CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
2013 dumpfile_t *amanda_header;
2014 const char *errmsg = NULL;
2017 if (device_in_error(self)) return NULL;
2022 pself->is_eof = FALSE;
2023 pself->in_file = FALSE;
2025 self->next_block_to_read = 0;
2028 key = special_file_to_key(self, "filestart", pself->file);
2029 result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS,
2034 guint response_code;
2035 s3_error_code_t s3_error_code;
2036 s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL);
2038 /* if it's an expected error (not found), check what to do. */
2039 if (response_code == 404 &&
2040 (s3_error_code == S3_ERROR_NoSuchKey ||
2041 s3_error_code == S3_ERROR_NoSuchEntity)) {
2043 next_file = find_next_file(self, pself->file);
2044 if (next_file > 0) {
2045 /* Note short-circut of dispatcher. */
2046 return s3_device_seek_file(pself, next_file);
2047 } else if (next_file == 0) {
2048 /* No next file. Check if we are one past the end. */
2049 key = special_file_to_key(self, "filestart", pself->file - 1);
2050 result = s3_read(self->s3t[0].s3, self->bucket, key,
2051 S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL);
2054 /* pself->file, etc. are already correct */
2055 return make_tapeend_header();
2057 device_set_error(pself,
2058 stralloc(_("Attempt to read past tape-end file")),
2059 DEVICE_STATUS_SUCCESS);
2064 /* An unexpected error occured finding out if we are the last file. */
2065 device_set_error(pself,
2067 DEVICE_STATUS_DEVICE_ERROR);
2072 /* and make a dumpfile_t out of it */
2073 g_assert(buf.buffer != NULL);
2074 amanda_header = g_new(dumpfile_t, 1);
2075 fh_init(amanda_header);
2076 parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
2079 switch (amanda_header->type) {
2081 case F_CONT_DUMPFILE:
2082 case F_SPLIT_DUMPFILE:
2086 device_set_error(pself,
2087 stralloc(_("Invalid amanda header while reading file header")),
2088 DEVICE_STATUS_VOLUME_ERROR);
2089 g_free(amanda_header);
2093 pself->in_file = TRUE;
2094 for (thread = 0; thread < self->nb_threads; thread++) {
2095 self->s3t[thread].idle = 1;
2096 self->s3t[thread].eof = FALSE;
2098 return amanda_header;
2102 s3_device_seek_block(Device *pself, guint64 block) {
2103 S3Device * self = S3_DEVICE(pself);
2104 if (device_in_error(pself)) return FALSE;
2107 pself->block = block;
2108 self->next_block_to_read = block;
2113 s3_device_read_block (Device * pself, gpointer data, int *size_req) {
2114 S3Device * self = S3_DEVICE(pself);
2119 g_assert (self != NULL);
2120 if (device_in_error(self)) return -1;
2122 g_mutex_lock(self->thread_idle_mutex);
2123 /* start a read ahead for each thread */
2124 for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2125 S3_by_thread *s3t = &self->s3t[thread];
2127 key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
2128 s3t->filename = key;
2132 s3t->errflags = DEVICE_STATUS_SUCCESS;
2133 if (self->s3t[thread].curl_buffer.buffer &&
2134 (int)self->s3t[thread].curl_buffer.buffer_len < *size_req) {
2135 g_free(self->s3t[thread].curl_buffer.buffer);
2136 self->s3t[thread].curl_buffer.buffer = NULL;
2137 self->s3t[thread].curl_buffer.buffer_len = 0;
2138 self->s3t[thread].buffer_len = 0;
2140 if (!self->s3t[thread].curl_buffer.buffer) {
2141 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
2142 self->s3t[thread].curl_buffer.buffer_len = *size_req;
2143 self->s3t[thread].buffer_len = *size_req;
2145 s3t->curl_buffer.buffer_pos = 0;
2146 s3t->curl_buffer.max_buffer_size = S3_DEVICE_MAX_BLOCK_SIZE;
2147 self->next_block_to_read++;
2148 g_thread_pool_push(self->thread_pool_read, s3t, NULL);
2153 key = file_and_block_to_key(self, pself->file, pself->block);
2154 g_assert(key != NULL);
2156 /* find which thread read the key */
2157 for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2159 s3t = &self->s3t[thread];
2162 strcmp(key, (char *)s3t->filename) == 0) {
2166 pself->is_eof = TRUE;
2167 pself->in_file = FALSE;
2168 device_set_error(pself, stralloc(_("EOF")),
2169 DEVICE_STATUS_SUCCESS);
2170 g_mutex_unlock(self->thread_idle_mutex);
2172 } else if (s3t->errflags != DEVICE_STATUS_SUCCESS) {
2173 /* return the error */
2174 device_set_error(pself, (char *)s3t->errmsg, s3t->errflags);
2176 g_mutex_unlock(self->thread_idle_mutex);
2179 } else if ((guint)*size_req >= s3t->curl_buffer.buffer_pos) {
2180 /* return the buffer */
2181 g_mutex_unlock(self->thread_idle_mutex);
2182 memcpy(data, s3t->curl_buffer.buffer,
2183 s3t->curl_buffer.buffer_pos);
2184 *size_req = s3t->curl_buffer.buffer_pos;
2187 g_free((char *)s3t->filename);
2190 g_mutex_lock(self->thread_idle_mutex);
2192 } else { /* buffer not enough large */
2193 *size_req = s3t->curl_buffer.buffer_len;
2195 g_mutex_unlock(self->thread_idle_mutex);
2201 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2205 /* start a read ahead for the thread */
2206 for (thread = 0; thread < self->nb_threads_recovery; thread++) {
2207 S3_by_thread *s3t = &self->s3t[thread];
2209 key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
2210 s3t->filename = key;
2214 s3t->errflags = DEVICE_STATUS_SUCCESS;
2215 if (!self->s3t[thread].curl_buffer.buffer) {
2216 self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
2217 self->s3t[thread].curl_buffer.buffer_len = *size_req;
2219 s3t->curl_buffer.buffer_pos = 0;
2220 self->next_block_to_read++;
2221 g_thread_pool_push(self->thread_pool_read, s3t, NULL);
2224 g_mutex_unlock(self->thread_idle_mutex);
2231 s3_thread_read_block(
2232 gpointer thread_data,
2235 S3_by_thread *s3t = (S3_by_thread *)thread_data;
2236 Device *pself = (Device *)data;
2237 S3Device *self = S3_DEVICE(pself);
2240 result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename, s3_buffer_write_func,
2241 s3_buffer_reset_func, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
2243 g_mutex_lock(self->thread_idle_mutex);
2245 guint response_code;
2246 s3_error_code_t s3_error_code;
2247 s3_error(s3t->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
2249 /* if it's an expected error (not found), just return -1 */
2250 if (response_code == 404 &&
2251 (s3_error_code == S3_ERROR_NoSuchKey ||
2252 s3_error_code == S3_ERROR_NoSuchEntity)) {
2256 /* otherwise, log it and return FALSE */
2257 s3t->errflags = DEVICE_STATUS_VOLUME_ERROR;
2258 s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"),
2259 s3_strerror(s3t->s3));
2263 g_cond_broadcast(self->thread_idle_cond);
2264 g_mutex_unlock(self->thread_idle_mutex);
2270 check_at_peom(S3Device *self, guint64 size)
2272 if(self->enforce_volume_limit && (self->volume_limit > 0)) {
2273 guint64 newtotal = self->volume_bytes + size;
2274 if(newtotal > self->volume_limit) {
2282 check_at_leom(S3Device *self, guint64 size)
2284 guint64 block_size = DEVICE(self)->block_size;
2285 guint64 eom_warning_buffer = block_size *
2286 (EOM_EARLY_WARNING_ZONE_BLOCKS + self->nb_threads);
2291 if(self->enforce_volume_limit && (self->volume_limit > 0)) {
2292 guint64 newtotal = self->volume_bytes + size + eom_warning_buffer;
2293 if(newtotal > self->volume_limit) {
2307 g_mutex_lock(self->thread_idle_mutex);
2308 while(nb_done != self->nb_threads) {
2310 for (thread = 0; thread < self->nb_threads; thread++) {
2311 if (self->s3t[thread].done == 1)
2314 if (nb_done != self->nb_threads) {
2315 g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
2318 g_mutex_unlock(self->thread_idle_mutex);