X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fs3-device.c;fp=device-src%2Fs3-device.c;h=8c545c33f5ddb22d6bf216a1d08008af61a37d08;hb=109540caa4e37a3663b3dcfb9a205b9609e3f561;hp=7a987505c4e81ba70d40c8fb4f70d12aa13b97c3;hpb=4c9eba1feb11adf189bceb4001c425e641f0b56a;p=debian%2Famanda diff --git a/device-src/s3-device.c b/device-src/s3-device.c index 7a98750..8c545c3 100644 --- a/device-src/s3-device.c +++ b/device-src/s3-device.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved. + * Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved. * * This program is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 as published @@ -65,6 +65,7 @@ static GType s3_device_get_type (void); * Main object structure */ typedef struct _S3MetadataFile S3MetadataFile; +typedef struct _S3Device S3Device; typedef struct _S3_by_thread S3_by_thread; struct _S3_by_thread { @@ -77,9 +78,10 @@ struct _S3_by_thread { char volatile * volatile filename; DeviceStatusFlags volatile errflags; /* device_status */ char volatile * volatile errmsg; /* device error message */ + GMutex *now_mutex; + guint64 dlnow, ulnow; }; -typedef struct _S3Device S3Device; struct _S3Device { Device __parent__; @@ -99,11 +101,17 @@ struct _S3Device { char *swift_account_id; char *swift_access_key; + char *username; + char *password; + char *tenant_id; + char *tenant_name; + char *bucket_location; char *storage_class; char *host; char *service_path; char *server_side_encryption; + char *proxy; char *ca_info; @@ -118,9 +126,12 @@ struct _S3Device { /* Produce verbose output? */ gboolean verbose; + /* create the bucket? */ + gboolean create_bucket; + /* Use SSL? */ gboolean use_ssl; - gboolean openstack_swift_api; + S3_api s3_api; /* Throttling */ guint64 max_send_speed; @@ -131,6 +142,7 @@ struct _S3Device { guint64 volume_limit; gboolean enforce_volume_limit; gboolean use_subdomain; + gboolean use_s3_multi_delete; int nb_threads; int nb_threads_backup; @@ -142,6 +154,17 @@ struct _S3Device { GMutex *thread_idle_mutex; int next_block_to_read; GSList *keys; + + guint64 dltotal; + guint64 ultotal; + + /* google OAUTH2 */ + char *client_id; + char *client_secret; + char *refresh_token; + char *project_id; + + gboolean reuse_connection; }; /* @@ -191,6 +214,16 @@ static DevicePropertyBase device_property_swift_access_key; #define PROPERTY_SWIFT_ACCOUNT_ID (device_property_swift_account_id.ID) #define PROPERTY_SWIFT_ACCESS_KEY (device_property_swift_access_key.ID) +/* Authentication information for Openstack Swift. Both of these are strings. */ +static DevicePropertyBase device_property_username; +static DevicePropertyBase device_property_password; +static DevicePropertyBase device_property_tenant_id; +static DevicePropertyBase device_property_tenant_name; +#define PROPERTY_USERNAME (device_property_username.ID) +#define PROPERTY_PASSWORD (device_property_password.ID) +#define PROPERTY_TENANT_ID (device_property_tenant_id.ID) +#define PROPERTY_TENANT_NAME (device_property_tenant_name.ID) + /* Host and path */ static DevicePropertyBase device_property_s3_host; static DevicePropertyBase device_property_s3_service_path; @@ -209,15 +242,24 @@ static DevicePropertyBase device_property_s3_bucket_location; static DevicePropertyBase device_property_s3_storage_class; #define PROPERTY_S3_STORAGE_CLASS (device_property_s3_storage_class.ID) -/* Storage class */ +/* Server side encryption */ static DevicePropertyBase device_property_s3_server_side_encryption; #define PROPERTY_S3_SERVER_SIDE_ENCRYPTION (device_property_s3_server_side_encryption.ID) +/* proxy */ +static DevicePropertyBase device_property_proxy; +#define PROPERTY_PROXY (device_property_proxy.ID) + /* Path to certificate authority certificate */ static DevicePropertyBase device_property_ssl_ca_info; #define PROPERTY_SSL_CA_INFO (device_property_ssl_ca_info.ID) +/* Which strotage api to use. */ +static DevicePropertyBase device_property_storage_api; +#define PROPERTY_STORAGE_API (device_property_storage_api.ID) + /* Whether to use openstack protocol. */ +/* DEPRECATED */ static DevicePropertyBase device_property_openstack_swift_api; #define PROPERTY_OPENSTACK_SWIFT_API (device_property_openstack_swift_api.ID) @@ -225,6 +267,10 @@ static DevicePropertyBase device_property_openstack_swift_api; static DevicePropertyBase device_property_s3_ssl; #define PROPERTY_S3_SSL (device_property_s3_ssl.ID) +/* Whether to re-use connection. */ +static DevicePropertyBase device_property_reuse_connection; +#define PROPERTY_REUSE_CONNECTION (device_property_reuse_connection.ID) + /* Speed limits for sending and receiving */ static DevicePropertyBase device_property_max_send_speed; static DevicePropertyBase device_property_max_recv_speed; @@ -241,6 +287,30 @@ static DevicePropertyBase device_property_nb_threads_backup; static DevicePropertyBase device_property_nb_threads_recovery; #define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID) +/* If the s3 server have the multi-delete functionality */ +static DevicePropertyBase device_property_s3_multi_delete; +#define PROPERTY_S3_MULTI_DELETE (device_property_s3_multi_delete.ID) + +/* The client_id for OAUTH2 */ +static DevicePropertyBase device_property_client_id; +#define PROPERTY_CLIENT_ID (device_property_client_id.ID) + +/* The client_secret for OAUTH2 */ +static DevicePropertyBase device_property_client_secret; +#define PROPERTY_CLIENT_SECRET (device_property_client_secret.ID) + +/* The refresh token for OAUTH2 */ +static DevicePropertyBase device_property_refresh_token; +#define PROPERTY_REFRESH_TOKEN (device_property_refresh_token.ID) + +/* The PROJECT ID */ +static DevicePropertyBase device_property_project_id; +#define PROPERTY_PROJECT_ID (device_property_project_id.ID) + +/* The PROJECT ID */ +static DevicePropertyBase device_property_create_bucket; +#define PROPERTY_CREATE_BUCKET (device_property_create_bucket.ID) + /* * prototypes */ @@ -365,6 +435,22 @@ static gboolean s3_device_set_swift_access_key_fn(Device *self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source); +static gboolean s3_device_set_username(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_password(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_tenant_id(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_tenant_name(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + static gboolean s3_device_set_user_token_fn(Device *self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source); @@ -381,6 +467,10 @@ static gboolean s3_device_set_server_side_encryption_fn(Device *self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source); +static gboolean s3_device_set_proxy_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + static gboolean s3_device_set_ca_info_fn(Device *self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source); @@ -389,14 +479,30 @@ static gboolean s3_device_set_verbose_fn(Device *self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source); +static gboolean s3_device_set_create_bucket_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_storage_api(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + static gboolean s3_device_set_openstack_swift_api_fn(Device *self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source); +static gboolean s3_device_set_s3_multi_delete_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + static gboolean s3_device_set_ssl_fn(Device *self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source); +static gboolean s3_device_set_reuse_connection_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + static gboolean s3_device_set_max_send_speed_fn(Device *self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source); @@ -437,6 +543,22 @@ static gboolean s3_device_set_service_path_fn(Device *p_self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source); +static gboolean s3_device_set_client_id_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_client_secret_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_refresh_token_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_project_id_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + static void s3_thread_read_block(gpointer thread_data, gpointer data); static void s3_thread_write_block(gpointer thread_data, @@ -465,6 +587,12 @@ s3_device_start(Device * self, static gboolean s3_device_finish(Device * self); +static guint64 +s3_device_get_bytes_read(Device * self); + +static guint64 +s3_device_get_bytes_written(Device * self); + static gboolean s3_device_start_file(Device * self, dumpfile_t * jobInfo); @@ -732,19 +860,21 @@ delete_file(S3Device *self, guint64 total_size = 0; Device *d_self = DEVICE(self); char *my_prefix; + if (file == -1) { my_prefix = g_strdup_printf("%sf", self->prefix); } else { my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file); } - result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL, &keys, - &total_size); + result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL, + &keys, &total_size); if (!result) { device_set_error(d_self, - vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)), - DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); - return FALSE; + g_strdup_printf(_("While listing S3 keys: %s"), + s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); + return FALSE; } g_mutex_lock(self->thread_idle_mutex); @@ -793,26 +923,70 @@ s3_thread_delete_block( S3_by_thread *s3t = (S3_by_thread *)thread_data; Device *pself = (Device *)data; S3Device *self = S3_DEVICE(pself); - gboolean result = 1; + int result = 1; char *filename; g_mutex_lock(self->thread_idle_mutex); while (result && self->keys) { - filename = self->keys->data; - self->keys = g_slist_remove(self->keys, self->keys->data); - count++; - if (count >= 1000) { - g_debug("Deleting %s ...", filename); - count = 0; - } - g_mutex_unlock(self->thread_idle_mutex); - result = s3_delete(s3t->s3, (const char *)self->bucket, (const char *)filename); - if (!result) { - s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR; - s3t->errmsg = g_strdup_printf(_("While deleting key '%s': %s"), + if (self->use_s3_multi_delete) { + char **filenames = g_new(char *, 1001); + char **f = filenames; + int n = 0; + while (self->keys && n<1000) { + *f++ = self->keys->data; + self->keys = g_slist_remove(self->keys, self->keys->data); + n++; + } + *f++ = NULL; + g_mutex_unlock(self->thread_idle_mutex); + result = s3_multi_delete(s3t->s3, (const char *)self->bucket, + (const char **)filenames); + if (result != 1) { + char **f; + + if (result == 2) { + g_debug("Deleting multiple keys not implemented"); + } else { /* result == 0 */ + g_debug("Deleteing multiple keys failed: %s", + s3_strerror(s3t->s3)); + } + + self->use_s3_multi_delete = 0; + /* re-add all filenames */ + f = filenames; + g_mutex_lock(self->thread_idle_mutex); + while(*f) { + self->keys = g_slist_prepend(self->keys, *f++); + } + g_mutex_unlock(self->thread_idle_mutex); + g_free(filenames); + result = 1; + g_mutex_lock(self->thread_idle_mutex); + continue; + } + f = filenames; + while(*f) { + g_free(*f++); + } + g_free(filenames); + } else { + filename = self->keys->data; + self->keys = g_slist_remove(self->keys, self->keys->data); + count++; + if (count >= 1000) { + g_debug("Deleting %s ...", filename); + count = 0; + } + g_mutex_unlock(self->thread_idle_mutex); + result = s3_delete(s3t->s3, (const char *)self->bucket, + (const char *)filename); + if (!result) { + s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR; + s3t->errmsg = g_strdup_printf(_("While deleting key '%s': %s"), filename, s3_strerror(s3t->s3)); + } + g_free(filename); } - g_free(filename); g_mutex_lock(self->thread_idle_mutex); } s3t->idle = 1; @@ -880,6 +1054,18 @@ s3_device_register(void) device_property_fill_and_register(&device_property_swift_access_key, G_TYPE_STRING, "swift_access_key", "Access key to authenticate with openstack swift"); + device_property_fill_and_register(&device_property_username, + G_TYPE_STRING, "username", + "Username to authenticate with"); + device_property_fill_and_register(&device_property_password, + G_TYPE_STRING, "password", + "password to authenticate with"); + device_property_fill_and_register(&device_property_tenant_id, + G_TYPE_STRING, "tenant_id", + "tenant_id to authenticate with"); + device_property_fill_and_register(&device_property_tenant_name, + G_TYPE_STRING, "tenant_name", + "tenant_name to authenticate with"); device_property_fill_and_register(&device_property_s3_host, G_TYPE_STRING, "s3_host", "hostname:port of the server"); @@ -898,15 +1084,39 @@ s3_device_register(void) device_property_fill_and_register(&device_property_s3_server_side_encryption, G_TYPE_STRING, "s3_server_side_encryption", "Serve side encryption as specified by Amazon (AES256)"); + device_property_fill_and_register(&device_property_proxy, + G_TYPE_STRING, "proxy", + "The proxy"); device_property_fill_and_register(&device_property_ssl_ca_info, G_TYPE_STRING, "ssl_ca_info", "Path to certificate authority certificate"); + device_property_fill_and_register(&device_property_storage_api, + G_TYPE_STRING, "storage_api", + "Which cloud API to use."); device_property_fill_and_register(&device_property_openstack_swift_api, - G_TYPE_BOOLEAN, "openstack_swift_api", + G_TYPE_STRING, "openstack_swift_api", "Whether to use openstack protocol"); + device_property_fill_and_register(&device_property_client_id, + G_TYPE_STRING, "client_id", + "client_id for use with oauth2"); + device_property_fill_and_register(&device_property_client_secret, + G_TYPE_STRING, "client_secret", + "client_secret for use with oauth2"); + device_property_fill_and_register(&device_property_refresh_token, + G_TYPE_STRING, "refresh_token", + "refresh_token for use with oauth2"); + device_property_fill_and_register(&device_property_project_id, + G_TYPE_STRING, "project_id", + "project id for use with google"); device_property_fill_and_register(&device_property_s3_ssl, G_TYPE_BOOLEAN, "s3_ssl", "Whether to use SSL with Amazon S3"); + device_property_fill_and_register(&device_property_reuse_connection, + G_TYPE_BOOLEAN, "reuse_connection", + "Whether to reuse connection"); + device_property_fill_and_register(&device_property_create_bucket, + G_TYPE_BOOLEAN, "create_bucket", + "Whether to create/delete bucket"); device_property_fill_and_register(&device_property_s3_subdomain, G_TYPE_BOOLEAN, "s3_subdomain", "Whether to use subdomain"); @@ -922,6 +1132,9 @@ s3_device_register(void) device_property_fill_and_register(&device_property_nb_threads_recovery, G_TYPE_UINT64, "nb_threads_recovery", "Number of reader thread"); + device_property_fill_and_register(&device_property_s3_multi_delete, + G_TYPE_BOOLEAN, "s3_multi_delete", + "Whether to use multi-delete"); /* register the device itself */ register_device(s3_device_factory, device_prefix_list); @@ -959,6 +1172,7 @@ s3_device_init(S3Device * self) Device * dself = DEVICE(self); GValue response; + self->s3_api = S3_API_S3; self->volume_bytes = 0; self->volume_limit = 0; self->leom = TRUE; @@ -972,6 +1186,7 @@ s3_device_init(S3Device * self) self->thread_pool_read = NULL; self->thread_idle_cond = NULL; self->thread_idle_mutex = NULL; + self->use_s3_multi_delete = 1; /* Register property values * Note: Some aren't added until s3_device_open_device() @@ -1052,6 +1267,8 @@ s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED) device_class->read_label = s3_device_read_label; device_class->start = s3_device_start; device_class->finish = s3_device_finish; + device_class->get_bytes_read = s3_device_get_bytes_read; + device_class->get_bytes_written = s3_device_get_bytes_written; device_class->start_file = s3_device_start_file; device_class->write_block = s3_device_write_block; @@ -1086,6 +1303,26 @@ s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED) device_simple_property_get_fn, s3_device_set_swift_access_key_fn); + device_class_register_property(device_class, PROPERTY_USERNAME, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_username); + + device_class_register_property(device_class, PROPERTY_PASSWORD, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_password); + + device_class_register_property(device_class, PROPERTY_TENANT_ID, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_tenant_id); + + device_class_register_property(device_class, PROPERTY_TENANT_NAME, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_tenant_name); + device_class_register_property(device_class, PROPERTY_S3_HOST, PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, device_simple_property_get_fn, @@ -1116,6 +1353,11 @@ s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED) device_simple_property_get_fn, s3_device_set_server_side_encryption_fn); + device_class_register_property(device_class, PROPERTY_PROXY, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_proxy_fn); + device_class_register_property(device_class, PROPERTY_SSL_CA_INFO, PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, device_simple_property_get_fn, @@ -1126,16 +1368,36 @@ s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED) device_simple_property_get_fn, s3_device_set_verbose_fn); + device_class_register_property(device_class, PROPERTY_CREATE_BUCKET, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_create_bucket_fn); + + device_class_register_property(device_class, PROPERTY_STORAGE_API, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_storage_api); + device_class_register_property(device_class, PROPERTY_OPENSTACK_SWIFT_API, PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, device_simple_property_get_fn, s3_device_set_openstack_swift_api_fn); + device_class_register_property(device_class, PROPERTY_S3_MULTI_DELETE, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_s3_multi_delete_fn); + device_class_register_property(device_class, PROPERTY_S3_SSL, PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, device_simple_property_get_fn, s3_device_set_ssl_fn); + device_class_register_property(device_class, PROPERTY_REUSE_CONNECTION, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_reuse_connection_fn); + device_class_register_property(device_class, PROPERTY_MAX_SEND_SPEED, PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, device_simple_property_get_fn, @@ -1179,10 +1441,29 @@ s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED) s3_device_set_enforce_max_volume_usage_fn); device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN, - (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) & - (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE), + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, device_simple_property_get_fn, s3_device_set_use_subdomain_fn); + + device_class_register_property(device_class, PROPERTY_CLIENT_ID, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_client_id_fn); + + device_class_register_property(device_class, PROPERTY_CLIENT_SECRET, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_client_secret_fn); + + device_class_register_property(device_class, PROPERTY_REFRESH_TOKEN, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_refresh_token_fn); + + device_class_register_property(device_class, PROPERTY_PROJECT_ID, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_project_id_fn); } static gboolean @@ -1237,6 +1518,58 @@ s3_device_set_swift_access_key_fn(Device *p_self, DevicePropertyBase *base, return device_simple_property_set_fn(p_self, base, val, surety, source); } +static gboolean +s3_device_set_username(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + amfree(self->username); + self->username = g_value_dup_string(val); + device_clear_volume_details(p_self); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_password(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + amfree(self->password); + self->password = g_value_dup_string(val); + device_clear_volume_details(p_self); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_tenant_id(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + amfree(self->tenant_id); + self->tenant_id = g_value_dup_string(val); + device_clear_volume_details(p_self); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_tenant_name(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + amfree(self->tenant_name); + self->tenant_name = g_value_dup_string(val); + device_clear_volume_details(p_self); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + static gboolean s3_device_set_host_fn(Device *p_self, DevicePropertyBase *base, GValue *val, @@ -1340,6 +1673,20 @@ s3_device_set_server_side_encryption_fn(Device *p_self, DevicePropertyBase *base return device_simple_property_set_fn(p_self, base, val, surety, source); } +static gboolean +s3_device_set_proxy_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + char *str_val = g_value_dup_string(val); + + amfree(self->proxy); + self->proxy = str_val; + device_clear_volume_details(p_self); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + static gboolean s3_device_set_ca_info_fn(Device *p_self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source) @@ -1373,13 +1720,73 @@ s3_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base, return device_simple_property_set_fn(p_self, base, val, surety, source); } +static gboolean +s3_device_set_create_bucket_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + int thread; + + self->create_bucket = g_value_get_boolean(val); + /* Our S3 handle may not yet have been instantiated; if so, it will + * get the proper verbose setting when it is created */ + if (self->s3t) { + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].s3) + s3_verbose(self->s3t[thread].s3, self->verbose); + } + } + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_storage_api(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + const char *storage_api = g_value_get_string(val); + if (g_str_equal(storage_api, "S3")) { + self->s3_api = S3_API_S3; + } else if (g_str_equal(storage_api, "SWIFT-1.0")) { + self->s3_api = S3_API_SWIFT_1; + } else if (g_str_equal(storage_api, "SWIFT-2.0")) { + self->s3_api = S3_API_SWIFT_2; + } else if (g_str_equal(storage_api, "OAUTH2")) { + self->s3_api = S3_API_OAUTH2; + } else { + g_debug("Invalid STORAGE_API, using \"S3\"."); + self->s3_api = S3_API_S3; + } + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + static gboolean s3_device_set_openstack_swift_api_fn(Device *p_self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source) { + + const gboolean openstack_swift_api = g_value_get_boolean(val); + if (openstack_swift_api) { + GValue storage_api_val; + g_value_init(&storage_api_val, G_TYPE_STRING); + g_value_set_static_string(&storage_api_val, "SWIFT-1.0"); + return s3_device_set_storage_api(p_self, base, &storage_api_val, + surety, source); + } + return TRUE; +} + +static gboolean +s3_device_set_s3_multi_delete_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ S3Device *self = S3_DEVICE(p_self); - self->openstack_swift_api = g_value_get_boolean(val); + self->use_s3_multi_delete = g_value_get_boolean(val); return device_simple_property_set_fn(p_self, base, val, surety, source); } @@ -1411,6 +1818,17 @@ s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base, return device_simple_property_set_fn(p_self, base, val, surety, source); } +static gboolean +s3_device_set_reuse_connection_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + self->reuse_connection = g_value_get_boolean(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + static gboolean s3_device_set_max_send_speed_fn(Device *p_self, DevicePropertyBase *base, GValue *val, @@ -1545,6 +1963,59 @@ property_set_leom_fn(Device *p_self, return device_simple_property_set_fn(p_self, base, val, surety, source); } + +static gboolean +s3_device_set_client_id_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + amfree(self->client_id); + self->client_id = g_value_dup_string(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_client_secret_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + amfree(self->client_secret); + self->client_secret = g_value_dup_string(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_refresh_token_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + amfree(self->refresh_token); + self->refresh_token = g_value_dup_string(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_project_id_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + amfree(self->project_id); + self->project_id = g_value_dup_string(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + static Device* s3_device_factory(char * device_name, char * device_type, char * device_node) { @@ -1596,7 +2067,7 @@ s3_device_open_device(Device *pself, char *device_name, /* default values */ self->verbose = FALSE; - self->openstack_swift_api = FALSE; + self->s3_api = S3_API_S3; /* use SSL if available */ self->use_ssl = s3_curl_supports_ssl(); @@ -1606,6 +2077,22 @@ s3_device_open_device(Device *pself, char *device_name, device_set_simple_property(pself, device_property_s3_ssl.ID, &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT); + /* reuse connection */ + self->reuse_connection = TRUE; + bzero(&tmp_value, sizeof(GValue)); + g_value_init(&tmp_value, G_TYPE_BOOLEAN); + g_value_set_boolean(&tmp_value, self->reuse_connection); + device_set_simple_property(pself, device_property_reuse_connection.ID, + &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT); + + /* Set default create_bucket */ + self->create_bucket = TRUE; + bzero(&tmp_value, sizeof(GValue)); + g_value_init(&tmp_value, G_TYPE_BOOLEAN); + g_value_set_boolean(&tmp_value, self->create_bucket); + device_set_simple_property(pself, device_property_create_bucket.ID, + &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT); + if (parent_class->open_device) { parent_class->open_device(pself, device_name, device_type, device_node); } @@ -1640,7 +2127,9 @@ static void s3_device_finalize(GObject * obj_self) { } if (self->s3t) { for (thread = 0; thread < self->nb_threads; thread++) { + g_mutex_free(self->s3t[thread].now_mutex); if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3); + g_free(self->s3t[thread].curl_buffer.buffer); } g_free(self->s3t); } @@ -1650,12 +2139,17 @@ static void s3_device_finalize(GObject * obj_self) { if(self->secret_key) g_free(self->secret_key); if(self->swift_account_id) g_free(self->swift_account_id); if(self->swift_access_key) g_free(self->swift_access_key); + if(self->username) g_free(self->username); + if(self->password) g_free(self->password); + if(self->tenant_id) g_free(self->tenant_id); + if(self->tenant_name) g_free(self->tenant_name); if(self->host) g_free(self->host); if(self->service_path) g_free(self->service_path); if(self->user_token) g_free(self->user_token); if(self->bucket_location) g_free(self->bucket_location); if(self->storage_class) g_free(self->storage_class); if(self->server_side_encryption) g_free(self->server_side_encryption); + if(self->proxy) g_free(self->proxy); if(self->ca_info) g_free(self->ca_info); } @@ -1667,15 +2161,7 @@ static gboolean setup_handle(S3Device * self) { CURLcode curl_code; if (self->s3t == NULL) { - self->s3t = g_new(S3_by_thread, self->nb_threads); - if (self->s3t == NULL) { - device_set_error(d_self, - stralloc(_("Can't allocate S3Handle array")), - DEVICE_STATUS_DEVICE_ERROR); - return FALSE; - } - - if (!self->openstack_swift_api) { + if (self->s3_api == S3_API_S3) { if (self->access_key == NULL || self->access_key[0] == '\0') { device_set_error(d_self, g_strdup(_("No Amazon access key specified")), @@ -1689,7 +2175,7 @@ static gboolean setup_handle(S3Device * self) { DEVICE_STATUS_DEVICE_ERROR); return FALSE; } - } else { + } else if (self->s3_api == S3_API_SWIFT_1) { if (self->swift_account_id == NULL || self->swift_account_id[0] == '\0') { device_set_error(d_self, @@ -1704,10 +2190,53 @@ static gboolean setup_handle(S3Device * self) { DEVICE_STATUS_DEVICE_ERROR); return FALSE; } + } else if (self->s3_api == S3_API_SWIFT_2) { + if (!((self->username && self->password && self->tenant_id) || + (self->username && self->password && self->tenant_name) || + (self->access_key && self->secret_key && self->tenant_id) || + (self->access_key && self->secret_key && self->tenant_name))) { + device_set_error(d_self, + g_strdup(_("Missing authorization properties")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + } else if (self->s3_api == S3_API_OAUTH2) { + if (self->client_id == NULL || + self->client_id[0] == '\0') { + device_set_error(d_self, + g_strdup(_("Missing client_id properties")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + if (self->client_secret == NULL || + self->client_secret[0] == '\0') { + device_set_error(d_self, + g_strdup(_("Missing client_secret properties")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + if (self->refresh_token == NULL || + self->refresh_token[0] == '\0') { + device_set_error(d_self, + g_strdup(_("Missing refresh_token properties")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + if (self->project_id == NULL || + self->project_id[0] == '\0') { + device_set_error(d_self, + g_strdup(_("Missing project_id properties")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } } - if (!self->use_ssl && self->ca_info) { - amfree(self->ca_info); + self->s3t = g_new0(S3_by_thread, self->nb_threads); + if (self->s3t == NULL) { + device_set_error(d_self, + g_strdup(_("Can't allocate S3Handle array")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; } self->thread_idle_cond = g_cond_new(); @@ -1722,6 +2251,7 @@ static gboolean setup_handle(S3Device * self) { self->s3t[thread].filename = NULL; self->s3t[thread].curl_buffer.buffer = NULL; self->s3t[thread].curl_buffer.buffer_len = 0; + self->s3t[thread].now_mutex = g_mutex_new(); self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key, self->swift_account_id, self->swift_access_key, @@ -1730,24 +2260,22 @@ static gboolean setup_handle(S3Device * self) { self->user_token, self->bucket_location, self->storage_class, self->ca_info, self->server_side_encryption, - self->openstack_swift_api); + self->proxy, + self->s3_api, + self->username, + self->password, + self->tenant_id, + self->tenant_name, + self->client_id, + self->client_secret, + self->refresh_token, + self->reuse_connection); if (self->s3t[thread].s3 == NULL) { device_set_error(d_self, stralloc(_("Internal error creating S3 handle")), DEVICE_STATUS_DEVICE_ERROR); self->nb_threads = thread+1; return FALSE; - } else if (self->openstack_swift_api) { - s3_error(self->s3t[0].s3, NULL, &response_code, - &s3_error_code, NULL, &curl_code, NULL); - if (response_code != 200) { - device_set_error(d_self, - g_strdup_printf(_("Internal error creating S3 handle: %s"), - s3_strerror(self->s3t[0].s3)), - DEVICE_STATUS_DEVICE_ERROR); - self->nb_threads = thread+1; - return FALSE; - } } } @@ -1759,33 +2287,56 @@ static gboolean setup_handle(S3Device * self) { self->nb_threads, 0, NULL); self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self, self->nb_threads, 0, NULL); - } - for (thread = 0; thread < self->nb_threads; thread++) { - s3_verbose(self->s3t[thread].s3, self->verbose); + for (thread = 0; thread < self->nb_threads; thread++) { + s3_verbose(self->s3t[thread].s3, self->verbose); - if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) { - device_set_error(d_self, g_strdup_printf(_( - "Error setting S3 SSL/TLS use " - "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")), - DEVICE_STATUS_DEVICE_ERROR); - return FALSE; - } + if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) { + device_set_error(d_self, g_strdup_printf(_( + "Error setting S3 SSL/TLS use " + "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } - if (self->max_send_speed && - !s3_set_max_send_speed(self->s3t[thread].s3, self->max_send_speed)) { - device_set_error(d_self, - g_strdup("Could not set S3 maximum send speed"), - DEVICE_STATUS_DEVICE_ERROR); - return FALSE; + if (self->max_send_speed && + !s3_set_max_send_speed(self->s3t[thread].s3, + self->max_send_speed)) { + device_set_error(d_self, + g_strdup("Could not set S3 maximum send speed"), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + + if (self->max_recv_speed && + !s3_set_max_recv_speed(self->s3t[thread].s3, + self->max_recv_speed)) { + device_set_error(d_self, + g_strdup("Could not set S3 maximum recv speed"), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } } - if (self->max_recv_speed && - !s3_set_max_recv_speed(self->s3t[thread].s3, self->max_recv_speed)) { - device_set_error(d_self, - g_strdup("Could not set S3 maximum recv speed"), - DEVICE_STATUS_DEVICE_ERROR); - return FALSE; + for (thread = 0; thread < self->nb_threads; thread++) { + if (!s3_open2(self->s3t[thread].s3)) { + if (self->s3_api == S3_API_SWIFT_1 || + self->s3_api == S3_API_SWIFT_2) { + s3_error(self->s3t[0].s3, NULL, &response_code, + &s3_error_code, NULL, &curl_code, NULL); + device_set_error(d_self, + g_strdup_printf(_("s3_open2 failed: %s"), + s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR); + self->nb_threads = thread+1; + return FALSE; + } else { + device_set_error(d_self, + g_strdup("s3_open2 failed"), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + } } } @@ -1801,7 +2352,7 @@ make_bucket( s3_error_code_t s3_error_code; CURLcode curl_code; - if (s3_is_bucket_exists(self->s3t[0].s3, self->bucket)) { + if (s3_is_bucket_exists(self->s3t[0].s3, self->bucket, self->project_id)) { return TRUE; } @@ -1817,7 +2368,15 @@ make_bucket( return FALSE; } - if (!s3_make_bucket(self->s3t[0].s3, self->bucket)) { + if (!self->create_bucket) { + device_set_error(pself, + g_strdup_printf(_("Can't list bucket: %s"), + s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + + if (!s3_make_bucket(self->s3t[0].s3, self->bucket, self->project_id)) { s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); /* if it isn't an expected error (bucket already exists), @@ -1834,6 +2393,23 @@ make_bucket( return TRUE; } +static int progress_func( + void *thread_data, + double dltotal G_GNUC_UNUSED, + double dlnow, + double ultotal G_GNUC_UNUSED, + double ulnow) +{ + S3_by_thread *s3t = (S3_by_thread *)thread_data; + + g_mutex_lock(s3t->now_mutex); + s3t->dlnow = dlnow; + s3t->ulnow = ulnow; + g_mutex_unlock(s3t->now_mutex); + + return 0; +} + static DeviceStatusFlags s3_device_read_label(Device *pself) { S3Device *self = S3_DEVICE(pself); @@ -1936,7 +2512,9 @@ s3_device_start (Device * pself, DeviceAccessMode mode, reset_thread(self); pself->access_mode = mode; + g_mutex_lock(pself->device_mutex); pself->in_file = FALSE; + g_mutex_unlock(pself->device_mutex); /* try creating the bucket, in case it doesn't exist */ if (!make_bucket(pself)) { @@ -1953,7 +2531,9 @@ s3_device_start (Device * pself, DeviceAccessMode mode, break; case ACCESS_WRITE: - delete_all_files(self); + if (!delete_all_files(self)) { + return FALSE; + } /* write a new amanda header */ if (!write_amanda_header(self, label, timestamp)) { @@ -2011,6 +2591,54 @@ s3_device_finish ( /* functions for writing */ +static guint64 +s3_device_get_bytes_read( + Device * pself) +{ + S3Device *self = S3_DEVICE(pself); + int thread; + guint64 dltotal; + + g_mutex_unlock(pself->device_mutex); + /* Add per thread */ + g_mutex_lock(self->thread_idle_mutex); + dltotal = self->dltotal; + for (thread = 0; thread < self->nb_threads_recovery; thread++) { + g_mutex_lock(self->s3t[thread].now_mutex); + dltotal += self->s3t[thread].dlnow; + g_mutex_unlock(self->s3t[thread].now_mutex); + } + g_mutex_unlock(self->thread_idle_mutex); + g_mutex_lock(pself->device_mutex); + + return dltotal; +} + + +static guint64 +s3_device_get_bytes_written( + Device * pself) +{ + S3Device *self = S3_DEVICE(pself); + int thread; + guint64 ultotal; + + g_mutex_unlock(pself->device_mutex); + /* Add per thread */ + g_mutex_lock(self->thread_idle_mutex); + ultotal = self->ultotal; + for (thread = 0; thread < self->nb_threads_backup; thread++) { + g_mutex_lock(self->s3t[thread].now_mutex); + ultotal += self->s3t[thread].ulnow; + g_mutex_unlock(self->s3t[thread].now_mutex); + } + g_mutex_unlock(self->thread_idle_mutex); + g_mutex_lock(pself->device_mutex); + + return ultotal; +} + + static gboolean s3_device_start_file (Device *pself, dumpfile_t *jobInfo) { S3Device *self = S3_DEVICE(pself); @@ -2052,10 +2680,22 @@ s3_device_start_file (Device *pself, dumpfile_t *jobInfo) { g_free(amanda_header.buffer); return FALSE; } + + for (thread = 0; thread < self->nb_threads; thread++) { + self->s3t[thread].idle = 1; + self->s3t[thread].ulnow = 0; + } + /* set the file and block numbers correctly */ pself->file = (pself->file > 0)? pself->file+1 : 1; pself->block = 0; + g_mutex_lock(pself->device_mutex); pself->in_file = TRUE; + pself->bytes_written = 0; + g_mutex_unlock(pself->device_mutex); + g_mutex_lock(self->thread_idle_mutex); + self->ultotal = 0; + g_mutex_unlock(self->thread_idle_mutex); /* write it out as a special block (not the 0th) */ key = special_file_to_key(self, "filestart", pself->file); result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS, @@ -2070,9 +2710,6 @@ s3_device_start_file (Device *pself, dumpfile_t *jobInfo) { } self->volume_bytes += header_size; - for (thread = 0; thread < self->nb_threads; thread++) { - self->s3t[thread].idle = 1; - } return TRUE; } @@ -2167,7 +2804,8 @@ s3_thread_write_block( gboolean result; result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename, - S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL); + S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer, + progress_func, s3t); g_free((void *)s3t->filename); s3t->filename = NULL; if (!result) { @@ -2177,7 +2815,10 @@ s3_thread_write_block( g_mutex_lock(self->thread_idle_mutex); s3t->idle = 1; s3t->done = 1; + if (result) + self->ultotal += s3t->curl_buffer.buffer_len; s3t->curl_buffer.buffer_len = s3t->buffer_len; + s3t->ulnow = 0; g_cond_broadcast(self->thread_idle_cond); g_mutex_unlock(self->thread_idle_mutex); } @@ -2209,12 +2850,16 @@ s3_device_finish_file (Device * pself) { g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex); } } + self->ultotal = 0; g_mutex_unlock(self->thread_idle_mutex); if (device_in_error(pself)) return FALSE; /* we're not in a file anymore */ + g_mutex_lock(pself->device_mutex); pself->in_file = FALSE; + pself->bytes_written = 0;; + g_mutex_unlock(pself->device_mutex); return TRUE; } @@ -2264,7 +2909,8 @@ s3_device_erase(Device *pself) { device_set_error(pself, g_strdup("Unlabeled volume"), DEVICE_STATUS_VOLUME_UNLABELED); - if (!s3_delete_bucket(self->s3t[0].s3, self->bucket)) { + if (self->create_bucket && + !s3_delete_bucket(self->s3t[0].s3, self->bucket)) { s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); /* @@ -2303,9 +2949,15 @@ s3_device_seek_file(Device *pself, guint file) { pself->file = file; pself->is_eof = FALSE; - pself->in_file = FALSE; pself->block = 0; + g_mutex_lock(pself->device_mutex); + pself->in_file = FALSE; + pself->bytes_read = 0; + g_mutex_unlock(pself->device_mutex); self->next_block_to_read = 0; + g_mutex_lock(self->thread_idle_mutex); + self->dltotal = 0; + g_mutex_unlock(self->thread_idle_mutex); /* read it in */ key = special_file_to_key(self, "filestart", pself->file); @@ -2374,11 +3026,14 @@ s3_device_seek_file(Device *pself, guint file) { return NULL; } - pself->in_file = TRUE; for (thread = 0; thread < self->nb_threads; thread++) { self->s3t[thread].idle = 1; self->s3t[thread].eof = FALSE; + self->s3t[thread].ulnow = 0; } + g_mutex_lock(pself->device_mutex); + pself->in_file = TRUE; + g_mutex_unlock(pself->device_mutex); return amanda_header; } @@ -2413,6 +3068,8 @@ s3_device_read_block (Device * pself, gpointer data, int *size_req) { s3t->done = 0; s3t->idle = 0; s3t->eof = FALSE; + s3t->dlnow = 0; + s3t->ulnow = 0; s3t->errflags = DEVICE_STATUS_SUCCESS; if (self->s3t[thread].curl_buffer.buffer && (int)self->s3t[thread].curl_buffer.buffer_len < *size_req) { @@ -2448,7 +3105,9 @@ s3_device_read_block (Device * pself, gpointer data, int *size_req) { /* return eof */ g_free(key); pself->is_eof = TRUE; + g_mutex_lock(pself->device_mutex); pself->in_file = FALSE; + g_mutex_unlock(pself->device_mutex); device_set_error(pself, stralloc(_("EOF")), DEVICE_STATUS_SUCCESS); g_mutex_unlock(self->thread_idle_mutex); @@ -2495,6 +3154,8 @@ s3_device_read_block (Device * pself, gpointer data, int *size_req) { s3t->done = 0; s3t->idle = 0; s3t->eof = FALSE; + s3t->dlnow = 0; + s3t->ulnow = 0; s3t->errflags = DEVICE_STATUS_SUCCESS; if (!self->s3t[thread].curl_buffer.buffer) { self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req); @@ -2521,8 +3182,9 @@ s3_thread_read_block( S3Device *self = S3_DEVICE(pself); gboolean result; - result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename, s3_buffer_write_func, - s3_buffer_reset_func, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL); + result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename, + S3_BUFFER_WRITE_FUNCS, + (CurlBuffer *)&s3t->curl_buffer, progress_func, s3t); g_mutex_lock(self->thread_idle_mutex); if (!result) { @@ -2543,7 +3205,11 @@ s3_thread_read_block( s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"), s3_strerror(s3t->s3)); } + } else { + self->dltotal += s3t->curl_buffer.buffer_len; } + s3t->dlnow = 0; + s3t->ulnow = 0; s3t->done = 1; g_cond_broadcast(self->thread_idle_cond); g_mutex_unlock(self->thread_idle_mutex); @@ -2589,16 +3255,18 @@ reset_thread( int thread; int nb_done = 0; - g_mutex_lock(self->thread_idle_mutex); - while(nb_done != self->nb_threads) { - nb_done = 0; - for (thread = 0; thread < self->nb_threads; thread++) { - if (self->s3t[thread].done == 1) - nb_done++; - } - if (nb_done != self->nb_threads) { - g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex); + if (self->thread_idle_mutex) { + g_mutex_lock(self->thread_idle_mutex); + while(nb_done != self->nb_threads) { + nb_done = 0; + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].done == 1) + nb_done++; + } + if (nb_done != self->nb_threads) { + g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex); + } } + g_mutex_unlock(self->thread_idle_mutex); } - g_mutex_unlock(self->thread_idle_mutex); }