/*
- * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
+ * Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 as published
* Main object structure
*/
typedef struct _S3MetadataFile S3MetadataFile;
+typedef struct _S3Device S3Device;
typedef struct _S3_by_thread S3_by_thread;
struct _S3_by_thread {
char volatile * volatile filename;
DeviceStatusFlags volatile errflags; /* device_status */
char volatile * volatile errmsg; /* device error message */
+ GMutex *now_mutex;
+ guint64 dlnow, ulnow;
};
-typedef struct _S3Device S3Device;
struct _S3Device {
Device __parent__;
char *swift_account_id;
char *swift_access_key;
+ char *username;
+ char *password;
+ char *tenant_id;
+ char *tenant_name;
+
char *bucket_location;
char *storage_class;
char *host;
char *service_path;
char *server_side_encryption;
+ char *proxy;
char *ca_info;
/* Produce verbose output? */
gboolean verbose;
+ /* create the bucket? */
+ gboolean create_bucket;
+
/* Use SSL? */
gboolean use_ssl;
- gboolean openstack_swift_api;
+ S3_api s3_api;
/* Throttling */
guint64 max_send_speed;
guint64 volume_limit;
gboolean enforce_volume_limit;
gboolean use_subdomain;
+ gboolean use_s3_multi_delete;
int nb_threads;
int nb_threads_backup;
GMutex *thread_idle_mutex;
int next_block_to_read;
GSList *keys;
+
+ guint64 dltotal;
+ guint64 ultotal;
+
+ /* google OAUTH2 */
+ char *client_id;
+ char *client_secret;
+ char *refresh_token;
+ char *project_id;
+
+ gboolean reuse_connection;
};
/*
#define PROPERTY_SWIFT_ACCOUNT_ID (device_property_swift_account_id.ID)
#define PROPERTY_SWIFT_ACCESS_KEY (device_property_swift_access_key.ID)
+/* Authentication information for Openstack Swift. Both of these are strings. */
+static DevicePropertyBase device_property_username;
+static DevicePropertyBase device_property_password;
+static DevicePropertyBase device_property_tenant_id;
+static DevicePropertyBase device_property_tenant_name;
+#define PROPERTY_USERNAME (device_property_username.ID)
+#define PROPERTY_PASSWORD (device_property_password.ID)
+#define PROPERTY_TENANT_ID (device_property_tenant_id.ID)
+#define PROPERTY_TENANT_NAME (device_property_tenant_name.ID)
+
/* Host and path */
static DevicePropertyBase device_property_s3_host;
static DevicePropertyBase device_property_s3_service_path;
static DevicePropertyBase device_property_s3_storage_class;
#define PROPERTY_S3_STORAGE_CLASS (device_property_s3_storage_class.ID)
-/* Storage class */
+/* Server side encryption */
static DevicePropertyBase device_property_s3_server_side_encryption;
#define PROPERTY_S3_SERVER_SIDE_ENCRYPTION (device_property_s3_server_side_encryption.ID)
+/* proxy */
+static DevicePropertyBase device_property_proxy;
+#define PROPERTY_PROXY (device_property_proxy.ID)
+
/* Path to certificate authority certificate */
static DevicePropertyBase device_property_ssl_ca_info;
#define PROPERTY_SSL_CA_INFO (device_property_ssl_ca_info.ID)
+/* Which strotage api to use. */
+static DevicePropertyBase device_property_storage_api;
+#define PROPERTY_STORAGE_API (device_property_storage_api.ID)
+
/* Whether to use openstack protocol. */
+/* DEPRECATED */
static DevicePropertyBase device_property_openstack_swift_api;
#define PROPERTY_OPENSTACK_SWIFT_API (device_property_openstack_swift_api.ID)
static DevicePropertyBase device_property_s3_ssl;
#define PROPERTY_S3_SSL (device_property_s3_ssl.ID)
+/* Whether to re-use connection. */
+static DevicePropertyBase device_property_reuse_connection;
+#define PROPERTY_REUSE_CONNECTION (device_property_reuse_connection.ID)
+
/* Speed limits for sending and receiving */
static DevicePropertyBase device_property_max_send_speed;
static DevicePropertyBase device_property_max_recv_speed;
static DevicePropertyBase device_property_nb_threads_recovery;
#define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID)
+/* If the s3 server have the multi-delete functionality */
+static DevicePropertyBase device_property_s3_multi_delete;
+#define PROPERTY_S3_MULTI_DELETE (device_property_s3_multi_delete.ID)
+
+/* The client_id for OAUTH2 */
+static DevicePropertyBase device_property_client_id;
+#define PROPERTY_CLIENT_ID (device_property_client_id.ID)
+
+/* The client_secret for OAUTH2 */
+static DevicePropertyBase device_property_client_secret;
+#define PROPERTY_CLIENT_SECRET (device_property_client_secret.ID)
+
+/* The refresh token for OAUTH2 */
+static DevicePropertyBase device_property_refresh_token;
+#define PROPERTY_REFRESH_TOKEN (device_property_refresh_token.ID)
+
+/* The PROJECT ID */
+static DevicePropertyBase device_property_project_id;
+#define PROPERTY_PROJECT_ID (device_property_project_id.ID)
+
+/* The PROJECT ID */
+static DevicePropertyBase device_property_create_bucket;
+#define PROPERTY_CREATE_BUCKET (device_property_create_bucket.ID)
+
/*
* prototypes
*/
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
+static gboolean s3_device_set_username(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_password(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_tenant_id(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_tenant_name(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
static gboolean s3_device_set_user_token_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
+static gboolean s3_device_set_proxy_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
static gboolean s3_device_set_ca_info_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
+static gboolean s3_device_set_create_bucket_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_storage_api(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
static gboolean s3_device_set_openstack_swift_api_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
+static gboolean s3_device_set_s3_multi_delete_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
static gboolean s3_device_set_ssl_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
+static gboolean s3_device_set_reuse_connection_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
static gboolean s3_device_set_max_send_speed_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
+static gboolean s3_device_set_client_id_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_client_secret_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_refresh_token_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_project_id_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
static void s3_thread_read_block(gpointer thread_data,
gpointer data);
static void s3_thread_write_block(gpointer thread_data,
static gboolean
s3_device_finish(Device * self);
+static guint64
+s3_device_get_bytes_read(Device * self);
+
+static guint64
+s3_device_get_bytes_written(Device * self);
+
static gboolean
s3_device_start_file(Device * self,
dumpfile_t * jobInfo);
guint64 total_size = 0;
Device *d_self = DEVICE(self);
char *my_prefix;
+
if (file == -1) {
my_prefix = g_strdup_printf("%sf", self->prefix);
} else {
my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
}
- result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL, &keys,
- &total_size);
+ result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL,
+ &keys, &total_size);
if (!result) {
device_set_error(d_self,
- vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
- DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
- return FALSE;
+ g_strdup_printf(_("While listing S3 keys: %s"),
+ s3_strerror(self->s3t[0].s3)),
+ DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
+ return FALSE;
}
g_mutex_lock(self->thread_idle_mutex);
S3_by_thread *s3t = (S3_by_thread *)thread_data;
Device *pself = (Device *)data;
S3Device *self = S3_DEVICE(pself);
- gboolean result = 1;
+ int result = 1;
char *filename;
g_mutex_lock(self->thread_idle_mutex);
while (result && self->keys) {
- filename = self->keys->data;
- self->keys = g_slist_remove(self->keys, self->keys->data);
- count++;
- if (count >= 1000) {
- g_debug("Deleting %s ...", filename);
- count = 0;
- }
- g_mutex_unlock(self->thread_idle_mutex);
- result = s3_delete(s3t->s3, (const char *)self->bucket, (const char *)filename);
- if (!result) {
- s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
- s3t->errmsg = g_strdup_printf(_("While deleting key '%s': %s"),
+ if (self->use_s3_multi_delete) {
+ char **filenames = g_new(char *, 1001);
+ char **f = filenames;
+ int n = 0;
+ while (self->keys && n<1000) {
+ *f++ = self->keys->data;
+ self->keys = g_slist_remove(self->keys, self->keys->data);
+ n++;
+ }
+ *f++ = NULL;
+ g_mutex_unlock(self->thread_idle_mutex);
+ result = s3_multi_delete(s3t->s3, (const char *)self->bucket,
+ (const char **)filenames);
+ if (result != 1) {
+ char **f;
+
+ if (result == 2) {
+ g_debug("Deleting multiple keys not implemented");
+ } else { /* result == 0 */
+ g_debug("Deleteing multiple keys failed: %s",
+ s3_strerror(s3t->s3));
+ }
+
+ self->use_s3_multi_delete = 0;
+ /* re-add all filenames */
+ f = filenames;
+ g_mutex_lock(self->thread_idle_mutex);
+ while(*f) {
+ self->keys = g_slist_prepend(self->keys, *f++);
+ }
+ g_mutex_unlock(self->thread_idle_mutex);
+ g_free(filenames);
+ result = 1;
+ g_mutex_lock(self->thread_idle_mutex);
+ continue;
+ }
+ f = filenames;
+ while(*f) {
+ g_free(*f++);
+ }
+ g_free(filenames);
+ } else {
+ filename = self->keys->data;
+ self->keys = g_slist_remove(self->keys, self->keys->data);
+ count++;
+ if (count >= 1000) {
+ g_debug("Deleting %s ...", filename);
+ count = 0;
+ }
+ g_mutex_unlock(self->thread_idle_mutex);
+ result = s3_delete(s3t->s3, (const char *)self->bucket,
+ (const char *)filename);
+ if (!result) {
+ s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
+ s3t->errmsg = g_strdup_printf(_("While deleting key '%s': %s"),
filename, s3_strerror(s3t->s3));
+ }
+ g_free(filename);
}
- g_free(filename);
g_mutex_lock(self->thread_idle_mutex);
}
s3t->idle = 1;
device_property_fill_and_register(&device_property_swift_access_key,
G_TYPE_STRING, "swift_access_key",
"Access key to authenticate with openstack swift");
+ device_property_fill_and_register(&device_property_username,
+ G_TYPE_STRING, "username",
+ "Username to authenticate with");
+ device_property_fill_and_register(&device_property_password,
+ G_TYPE_STRING, "password",
+ "password to authenticate with");
+ device_property_fill_and_register(&device_property_tenant_id,
+ G_TYPE_STRING, "tenant_id",
+ "tenant_id to authenticate with");
+ device_property_fill_and_register(&device_property_tenant_name,
+ G_TYPE_STRING, "tenant_name",
+ "tenant_name to authenticate with");
device_property_fill_and_register(&device_property_s3_host,
G_TYPE_STRING, "s3_host",
"hostname:port of the server");
device_property_fill_and_register(&device_property_s3_server_side_encryption,
G_TYPE_STRING, "s3_server_side_encryption",
"Serve side encryption as specified by Amazon (AES256)");
+ device_property_fill_and_register(&device_property_proxy,
+ G_TYPE_STRING, "proxy",
+ "The proxy");
device_property_fill_and_register(&device_property_ssl_ca_info,
G_TYPE_STRING, "ssl_ca_info",
"Path to certificate authority certificate");
+ device_property_fill_and_register(&device_property_storage_api,
+ G_TYPE_STRING, "storage_api",
+ "Which cloud API to use.");
device_property_fill_and_register(&device_property_openstack_swift_api,
- G_TYPE_BOOLEAN, "openstack_swift_api",
+ G_TYPE_STRING, "openstack_swift_api",
"Whether to use openstack protocol");
+ device_property_fill_and_register(&device_property_client_id,
+ G_TYPE_STRING, "client_id",
+ "client_id for use with oauth2");
+ device_property_fill_and_register(&device_property_client_secret,
+ G_TYPE_STRING, "client_secret",
+ "client_secret for use with oauth2");
+ device_property_fill_and_register(&device_property_refresh_token,
+ G_TYPE_STRING, "refresh_token",
+ "refresh_token for use with oauth2");
+ device_property_fill_and_register(&device_property_project_id,
+ G_TYPE_STRING, "project_id",
+ "project id for use with google");
device_property_fill_and_register(&device_property_s3_ssl,
G_TYPE_BOOLEAN, "s3_ssl",
"Whether to use SSL with Amazon S3");
+ device_property_fill_and_register(&device_property_reuse_connection,
+ G_TYPE_BOOLEAN, "reuse_connection",
+ "Whether to reuse connection");
+ device_property_fill_and_register(&device_property_create_bucket,
+ G_TYPE_BOOLEAN, "create_bucket",
+ "Whether to create/delete bucket");
device_property_fill_and_register(&device_property_s3_subdomain,
G_TYPE_BOOLEAN, "s3_subdomain",
"Whether to use subdomain");
device_property_fill_and_register(&device_property_nb_threads_recovery,
G_TYPE_UINT64, "nb_threads_recovery",
"Number of reader thread");
+ device_property_fill_and_register(&device_property_s3_multi_delete,
+ G_TYPE_BOOLEAN, "s3_multi_delete",
+ "Whether to use multi-delete");
/* register the device itself */
register_device(s3_device_factory, device_prefix_list);
Device * dself = DEVICE(self);
GValue response;
+ self->s3_api = S3_API_S3;
self->volume_bytes = 0;
self->volume_limit = 0;
self->leom = TRUE;
self->thread_pool_read = NULL;
self->thread_idle_cond = NULL;
self->thread_idle_mutex = NULL;
+ self->use_s3_multi_delete = 1;
/* Register property values
* Note: Some aren't added until s3_device_open_device()
device_class->read_label = s3_device_read_label;
device_class->start = s3_device_start;
device_class->finish = s3_device_finish;
+ device_class->get_bytes_read = s3_device_get_bytes_read;
+ device_class->get_bytes_written = s3_device_get_bytes_written;
device_class->start_file = s3_device_start_file;
device_class->write_block = s3_device_write_block;
device_simple_property_get_fn,
s3_device_set_swift_access_key_fn);
+ device_class_register_property(device_class, PROPERTY_USERNAME,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_username);
+
+ device_class_register_property(device_class, PROPERTY_PASSWORD,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_password);
+
+ device_class_register_property(device_class, PROPERTY_TENANT_ID,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_tenant_id);
+
+ device_class_register_property(device_class, PROPERTY_TENANT_NAME,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_tenant_name);
+
device_class_register_property(device_class, PROPERTY_S3_HOST,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
device_simple_property_get_fn,
s3_device_set_server_side_encryption_fn);
+ device_class_register_property(device_class, PROPERTY_PROXY,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_proxy_fn);
+
device_class_register_property(device_class, PROPERTY_SSL_CA_INFO,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
device_simple_property_get_fn,
s3_device_set_verbose_fn);
+ device_class_register_property(device_class, PROPERTY_CREATE_BUCKET,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_create_bucket_fn);
+
+ device_class_register_property(device_class, PROPERTY_STORAGE_API,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_storage_api);
+
device_class_register_property(device_class, PROPERTY_OPENSTACK_SWIFT_API,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_openstack_swift_api_fn);
+ device_class_register_property(device_class, PROPERTY_S3_MULTI_DELETE,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_s3_multi_delete_fn);
+
device_class_register_property(device_class, PROPERTY_S3_SSL,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_ssl_fn);
+ device_class_register_property(device_class, PROPERTY_REUSE_CONNECTION,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_reuse_connection_fn);
+
device_class_register_property(device_class, PROPERTY_MAX_SEND_SPEED,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_enforce_max_volume_usage_fn);
device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN,
- (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
- (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_use_subdomain_fn);
+
+ device_class_register_property(device_class, PROPERTY_CLIENT_ID,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_client_id_fn);
+
+ device_class_register_property(device_class, PROPERTY_CLIENT_SECRET,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_client_secret_fn);
+
+ device_class_register_property(device_class, PROPERTY_REFRESH_TOKEN,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_refresh_token_fn);
+
+ device_class_register_property(device_class, PROPERTY_PROJECT_ID,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_project_id_fn);
}
static gboolean
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
+static gboolean
+s3_device_set_username(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->username);
+ self->username = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_password(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->password);
+ self->password = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_tenant_id(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->tenant_id);
+ self->tenant_id = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_tenant_name(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->tenant_name);
+ self->tenant_name = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
static gboolean
s3_device_set_host_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
+static gboolean
+s3_device_set_proxy_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+ char *str_val = g_value_dup_string(val);
+
+ amfree(self->proxy);
+ self->proxy = str_val;
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
static gboolean
s3_device_set_ca_info_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
+static gboolean
+s3_device_set_create_bucket_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+ int thread;
+
+ self->create_bucket = g_value_get_boolean(val);
+ /* Our S3 handle may not yet have been instantiated; if so, it will
+ * get the proper verbose setting when it is created */
+ if (self->s3t) {
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (self->s3t[thread].s3)
+ s3_verbose(self->s3t[thread].s3, self->verbose);
+ }
+ }
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_storage_api(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ const char *storage_api = g_value_get_string(val);
+ if (g_str_equal(storage_api, "S3")) {
+ self->s3_api = S3_API_S3;
+ } else if (g_str_equal(storage_api, "SWIFT-1.0")) {
+ self->s3_api = S3_API_SWIFT_1;
+ } else if (g_str_equal(storage_api, "SWIFT-2.0")) {
+ self->s3_api = S3_API_SWIFT_2;
+ } else if (g_str_equal(storage_api, "OAUTH2")) {
+ self->s3_api = S3_API_OAUTH2;
+ } else {
+ g_debug("Invalid STORAGE_API, using \"S3\".");
+ self->s3_api = S3_API_S3;
+ }
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
static gboolean
s3_device_set_openstack_swift_api_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
+
+ const gboolean openstack_swift_api = g_value_get_boolean(val);
+ if (openstack_swift_api) {
+ GValue storage_api_val;
+ g_value_init(&storage_api_val, G_TYPE_STRING);
+ g_value_set_static_string(&storage_api_val, "SWIFT-1.0");
+ return s3_device_set_storage_api(p_self, base, &storage_api_val,
+ surety, source);
+ }
+ return TRUE;
+}
+
+static gboolean
+s3_device_set_s3_multi_delete_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
S3Device *self = S3_DEVICE(p_self);
- self->openstack_swift_api = g_value_get_boolean(val);
+ self->use_s3_multi_delete = g_value_get_boolean(val);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
+static gboolean
+s3_device_set_reuse_connection_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ self->reuse_connection = g_value_get_boolean(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
static gboolean
s3_device_set_max_send_speed_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
+
+static gboolean
+s3_device_set_client_id_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->client_id);
+ self->client_id = g_value_dup_string(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_client_secret_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->client_secret);
+ self->client_secret = g_value_dup_string(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_refresh_token_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->refresh_token);
+ self->refresh_token = g_value_dup_string(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_project_id_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->project_id);
+ self->project_id = g_value_dup_string(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
static Device*
s3_device_factory(char * device_name, char * device_type, char * device_node)
{
/* default values */
self->verbose = FALSE;
- self->openstack_swift_api = FALSE;
+ self->s3_api = S3_API_S3;
/* use SSL if available */
self->use_ssl = s3_curl_supports_ssl();
device_set_simple_property(pself, device_property_s3_ssl.ID,
&tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
+ /* reuse connection */
+ self->reuse_connection = TRUE;
+ bzero(&tmp_value, sizeof(GValue));
+ g_value_init(&tmp_value, G_TYPE_BOOLEAN);
+ g_value_set_boolean(&tmp_value, self->reuse_connection);
+ device_set_simple_property(pself, device_property_reuse_connection.ID,
+ &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
+
+ /* Set default create_bucket */
+ self->create_bucket = TRUE;
+ bzero(&tmp_value, sizeof(GValue));
+ g_value_init(&tmp_value, G_TYPE_BOOLEAN);
+ g_value_set_boolean(&tmp_value, self->create_bucket);
+ device_set_simple_property(pself, device_property_create_bucket.ID,
+ &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
+
if (parent_class->open_device) {
parent_class->open_device(pself, device_name, device_type, device_node);
}
}
if (self->s3t) {
for (thread = 0; thread < self->nb_threads; thread++) {
+ g_mutex_free(self->s3t[thread].now_mutex);
if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3);
+ g_free(self->s3t[thread].curl_buffer.buffer);
}
g_free(self->s3t);
}
if(self->secret_key) g_free(self->secret_key);
if(self->swift_account_id) g_free(self->swift_account_id);
if(self->swift_access_key) g_free(self->swift_access_key);
+ if(self->username) g_free(self->username);
+ if(self->password) g_free(self->password);
+ if(self->tenant_id) g_free(self->tenant_id);
+ if(self->tenant_name) g_free(self->tenant_name);
if(self->host) g_free(self->host);
if(self->service_path) g_free(self->service_path);
if(self->user_token) g_free(self->user_token);
if(self->bucket_location) g_free(self->bucket_location);
if(self->storage_class) g_free(self->storage_class);
if(self->server_side_encryption) g_free(self->server_side_encryption);
+ if(self->proxy) g_free(self->proxy);
if(self->ca_info) g_free(self->ca_info);
}
CURLcode curl_code;
if (self->s3t == NULL) {
- self->s3t = g_new(S3_by_thread, self->nb_threads);
- if (self->s3t == NULL) {
- device_set_error(d_self,
- stralloc(_("Can't allocate S3Handle array")),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
- }
-
- if (!self->openstack_swift_api) {
+ if (self->s3_api == S3_API_S3) {
if (self->access_key == NULL || self->access_key[0] == '\0') {
device_set_error(d_self,
g_strdup(_("No Amazon access key specified")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
- } else {
+ } else if (self->s3_api == S3_API_SWIFT_1) {
if (self->swift_account_id == NULL ||
self->swift_account_id[0] == '\0') {
device_set_error(d_self,
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
+ } else if (self->s3_api == S3_API_SWIFT_2) {
+ if (!((self->username && self->password && self->tenant_id) ||
+ (self->username && self->password && self->tenant_name) ||
+ (self->access_key && self->secret_key && self->tenant_id) ||
+ (self->access_key && self->secret_key && self->tenant_name))) {
+ device_set_error(d_self,
+ g_strdup(_("Missing authorization properties")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ } else if (self->s3_api == S3_API_OAUTH2) {
+ if (self->client_id == NULL ||
+ self->client_id[0] == '\0') {
+ device_set_error(d_self,
+ g_strdup(_("Missing client_id properties")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ if (self->client_secret == NULL ||
+ self->client_secret[0] == '\0') {
+ device_set_error(d_self,
+ g_strdup(_("Missing client_secret properties")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ if (self->refresh_token == NULL ||
+ self->refresh_token[0] == '\0') {
+ device_set_error(d_self,
+ g_strdup(_("Missing refresh_token properties")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ if (self->project_id == NULL ||
+ self->project_id[0] == '\0') {
+ device_set_error(d_self,
+ g_strdup(_("Missing project_id properties")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
}
- if (!self->use_ssl && self->ca_info) {
- amfree(self->ca_info);
+ self->s3t = g_new0(S3_by_thread, self->nb_threads);
+ if (self->s3t == NULL) {
+ device_set_error(d_self,
+ g_strdup(_("Can't allocate S3Handle array")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
}
self->thread_idle_cond = g_cond_new();
self->s3t[thread].filename = NULL;
self->s3t[thread].curl_buffer.buffer = NULL;
self->s3t[thread].curl_buffer.buffer_len = 0;
+ self->s3t[thread].now_mutex = g_mutex_new();
self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key,
self->swift_account_id,
self->swift_access_key,
self->user_token, self->bucket_location,
self->storage_class, self->ca_info,
self->server_side_encryption,
- self->openstack_swift_api);
+ self->proxy,
+ self->s3_api,
+ self->username,
+ self->password,
+ self->tenant_id,
+ self->tenant_name,
+ self->client_id,
+ self->client_secret,
+ self->refresh_token,
+ self->reuse_connection);
if (self->s3t[thread].s3 == NULL) {
device_set_error(d_self,
stralloc(_("Internal error creating S3 handle")),
DEVICE_STATUS_DEVICE_ERROR);
self->nb_threads = thread+1;
return FALSE;
- } else if (self->openstack_swift_api) {
- s3_error(self->s3t[0].s3, NULL, &response_code,
- &s3_error_code, NULL, &curl_code, NULL);
- if (response_code != 200) {
- device_set_error(d_self,
- g_strdup_printf(_("Internal error creating S3 handle: %s"),
- s3_strerror(self->s3t[0].s3)),
- DEVICE_STATUS_DEVICE_ERROR);
- self->nb_threads = thread+1;
- return FALSE;
- }
}
}
self->nb_threads, 0, NULL);
self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self,
self->nb_threads, 0, NULL);
- }
- for (thread = 0; thread < self->nb_threads; thread++) {
- s3_verbose(self->s3t[thread].s3, self->verbose);
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ s3_verbose(self->s3t[thread].s3, self->verbose);
- if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
- device_set_error(d_self, g_strdup_printf(_(
- "Error setting S3 SSL/TLS use "
- "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
- }
+ if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
+ device_set_error(d_self, g_strdup_printf(_(
+ "Error setting S3 SSL/TLS use "
+ "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
- if (self->max_send_speed &&
- !s3_set_max_send_speed(self->s3t[thread].s3, self->max_send_speed)) {
- device_set_error(d_self,
- g_strdup("Could not set S3 maximum send speed"),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ if (self->max_send_speed &&
+ !s3_set_max_send_speed(self->s3t[thread].s3,
+ self->max_send_speed)) {
+ device_set_error(d_self,
+ g_strdup("Could not set S3 maximum send speed"),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+
+ if (self->max_recv_speed &&
+ !s3_set_max_recv_speed(self->s3t[thread].s3,
+ self->max_recv_speed)) {
+ device_set_error(d_self,
+ g_strdup("Could not set S3 maximum recv speed"),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
}
- if (self->max_recv_speed &&
- !s3_set_max_recv_speed(self->s3t[thread].s3, self->max_recv_speed)) {
- device_set_error(d_self,
- g_strdup("Could not set S3 maximum recv speed"),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (!s3_open2(self->s3t[thread].s3)) {
+ if (self->s3_api == S3_API_SWIFT_1 ||
+ self->s3_api == S3_API_SWIFT_2) {
+ s3_error(self->s3t[0].s3, NULL, &response_code,
+ &s3_error_code, NULL, &curl_code, NULL);
+ device_set_error(d_self,
+ g_strdup_printf(_("s3_open2 failed: %s"),
+ s3_strerror(self->s3t[0].s3)),
+ DEVICE_STATUS_DEVICE_ERROR);
+ self->nb_threads = thread+1;
+ return FALSE;
+ } else {
+ device_set_error(d_self,
+ g_strdup("s3_open2 failed"),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ }
}
}
s3_error_code_t s3_error_code;
CURLcode curl_code;
- if (s3_is_bucket_exists(self->s3t[0].s3, self->bucket)) {
+ if (s3_is_bucket_exists(self->s3t[0].s3, self->bucket, self->project_id)) {
return TRUE;
}
return FALSE;
}
- if (!s3_make_bucket(self->s3t[0].s3, self->bucket)) {
+ if (!self->create_bucket) {
+ device_set_error(pself,
+ g_strdup_printf(_("Can't list bucket: %s"),
+ s3_strerror(self->s3t[0].s3)),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+
+ if (!s3_make_bucket(self->s3t[0].s3, self->bucket, self->project_id)) {
s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
/* if it isn't an expected error (bucket already exists),
return TRUE;
}
+static int progress_func(
+ void *thread_data,
+ double dltotal G_GNUC_UNUSED,
+ double dlnow,
+ double ultotal G_GNUC_UNUSED,
+ double ulnow)
+{
+ S3_by_thread *s3t = (S3_by_thread *)thread_data;
+
+ g_mutex_lock(s3t->now_mutex);
+ s3t->dlnow = dlnow;
+ s3t->ulnow = ulnow;
+ g_mutex_unlock(s3t->now_mutex);
+
+ return 0;
+}
+
static DeviceStatusFlags
s3_device_read_label(Device *pself) {
S3Device *self = S3_DEVICE(pself);
reset_thread(self);
pself->access_mode = mode;
+ g_mutex_lock(pself->device_mutex);
pself->in_file = FALSE;
+ g_mutex_unlock(pself->device_mutex);
/* try creating the bucket, in case it doesn't exist */
if (!make_bucket(pself)) {
break;
case ACCESS_WRITE:
- delete_all_files(self);
+ if (!delete_all_files(self)) {
+ return FALSE;
+ }
/* write a new amanda header */
if (!write_amanda_header(self, label, timestamp)) {
/* functions for writing */
+static guint64
+s3_device_get_bytes_read(
+ Device * pself)
+{
+ S3Device *self = S3_DEVICE(pself);
+ int thread;
+ guint64 dltotal;
+
+ g_mutex_unlock(pself->device_mutex);
+ /* Add per thread */
+ g_mutex_lock(self->thread_idle_mutex);
+ dltotal = self->dltotal;
+ for (thread = 0; thread < self->nb_threads_recovery; thread++) {
+ g_mutex_lock(self->s3t[thread].now_mutex);
+ dltotal += self->s3t[thread].dlnow;
+ g_mutex_unlock(self->s3t[thread].now_mutex);
+ }
+ g_mutex_unlock(self->thread_idle_mutex);
+ g_mutex_lock(pself->device_mutex);
+
+ return dltotal;
+}
+
+
+static guint64
+s3_device_get_bytes_written(
+ Device * pself)
+{
+ S3Device *self = S3_DEVICE(pself);
+ int thread;
+ guint64 ultotal;
+
+ g_mutex_unlock(pself->device_mutex);
+ /* Add per thread */
+ g_mutex_lock(self->thread_idle_mutex);
+ ultotal = self->ultotal;
+ for (thread = 0; thread < self->nb_threads_backup; thread++) {
+ g_mutex_lock(self->s3t[thread].now_mutex);
+ ultotal += self->s3t[thread].ulnow;
+ g_mutex_unlock(self->s3t[thread].now_mutex);
+ }
+ g_mutex_unlock(self->thread_idle_mutex);
+ g_mutex_lock(pself->device_mutex);
+
+ return ultotal;
+}
+
+
static gboolean
s3_device_start_file (Device *pself, dumpfile_t *jobInfo) {
S3Device *self = S3_DEVICE(pself);
g_free(amanda_header.buffer);
return FALSE;
}
+
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ self->s3t[thread].idle = 1;
+ self->s3t[thread].ulnow = 0;
+ }
+
/* set the file and block numbers correctly */
pself->file = (pself->file > 0)? pself->file+1 : 1;
pself->block = 0;
+ g_mutex_lock(pself->device_mutex);
pself->in_file = TRUE;
+ pself->bytes_written = 0;
+ g_mutex_unlock(pself->device_mutex);
+ g_mutex_lock(self->thread_idle_mutex);
+ self->ultotal = 0;
+ g_mutex_unlock(self->thread_idle_mutex);
/* write it out as a special block (not the 0th) */
key = special_file_to_key(self, "filestart", pself->file);
result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
}
self->volume_bytes += header_size;
- for (thread = 0; thread < self->nb_threads; thread++) {
- self->s3t[thread].idle = 1;
- }
return TRUE;
}
gboolean result;
result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename,
- S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
+ S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer,
+ progress_func, s3t);
g_free((void *)s3t->filename);
s3t->filename = NULL;
if (!result) {
g_mutex_lock(self->thread_idle_mutex);
s3t->idle = 1;
s3t->done = 1;
+ if (result)
+ self->ultotal += s3t->curl_buffer.buffer_len;
s3t->curl_buffer.buffer_len = s3t->buffer_len;
+ s3t->ulnow = 0;
g_cond_broadcast(self->thread_idle_cond);
g_mutex_unlock(self->thread_idle_mutex);
}
g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
}
}
+ self->ultotal = 0;
g_mutex_unlock(self->thread_idle_mutex);
if (device_in_error(pself)) return FALSE;
/* we're not in a file anymore */
+ g_mutex_lock(pself->device_mutex);
pself->in_file = FALSE;
+ pself->bytes_written = 0;;
+ g_mutex_unlock(pself->device_mutex);
return TRUE;
}
device_set_error(pself, g_strdup("Unlabeled volume"),
DEVICE_STATUS_VOLUME_UNLABELED);
- if (!s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
+ if (self->create_bucket &&
+ !s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
/*
pself->file = file;
pself->is_eof = FALSE;
- pself->in_file = FALSE;
pself->block = 0;
+ g_mutex_lock(pself->device_mutex);
+ pself->in_file = FALSE;
+ pself->bytes_read = 0;
+ g_mutex_unlock(pself->device_mutex);
self->next_block_to_read = 0;
+ g_mutex_lock(self->thread_idle_mutex);
+ self->dltotal = 0;
+ g_mutex_unlock(self->thread_idle_mutex);
/* read it in */
key = special_file_to_key(self, "filestart", pself->file);
return NULL;
}
- pself->in_file = TRUE;
for (thread = 0; thread < self->nb_threads; thread++) {
self->s3t[thread].idle = 1;
self->s3t[thread].eof = FALSE;
+ self->s3t[thread].ulnow = 0;
}
+ g_mutex_lock(pself->device_mutex);
+ pself->in_file = TRUE;
+ g_mutex_unlock(pself->device_mutex);
return amanda_header;
}
s3t->done = 0;
s3t->idle = 0;
s3t->eof = FALSE;
+ s3t->dlnow = 0;
+ s3t->ulnow = 0;
s3t->errflags = DEVICE_STATUS_SUCCESS;
if (self->s3t[thread].curl_buffer.buffer &&
(int)self->s3t[thread].curl_buffer.buffer_len < *size_req) {
/* return eof */
g_free(key);
pself->is_eof = TRUE;
+ g_mutex_lock(pself->device_mutex);
pself->in_file = FALSE;
+ g_mutex_unlock(pself->device_mutex);
device_set_error(pself, stralloc(_("EOF")),
DEVICE_STATUS_SUCCESS);
g_mutex_unlock(self->thread_idle_mutex);
s3t->done = 0;
s3t->idle = 0;
s3t->eof = FALSE;
+ s3t->dlnow = 0;
+ s3t->ulnow = 0;
s3t->errflags = DEVICE_STATUS_SUCCESS;
if (!self->s3t[thread].curl_buffer.buffer) {
self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
S3Device *self = S3_DEVICE(pself);
gboolean result;
- result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename, s3_buffer_write_func,
- s3_buffer_reset_func, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
+ result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename,
+ S3_BUFFER_WRITE_FUNCS,
+ (CurlBuffer *)&s3t->curl_buffer, progress_func, s3t);
g_mutex_lock(self->thread_idle_mutex);
if (!result) {
s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"),
s3_strerror(s3t->s3));
}
+ } else {
+ self->dltotal += s3t->curl_buffer.buffer_len;
}
+ s3t->dlnow = 0;
+ s3t->ulnow = 0;
s3t->done = 1;
g_cond_broadcast(self->thread_idle_cond);
g_mutex_unlock(self->thread_idle_mutex);
int thread;
int nb_done = 0;
- g_mutex_lock(self->thread_idle_mutex);
- while(nb_done != self->nb_threads) {
- nb_done = 0;
- for (thread = 0; thread < self->nb_threads; thread++) {
- if (self->s3t[thread].done == 1)
- nb_done++;
- }
- if (nb_done != self->nb_threads) {
- g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+ if (self->thread_idle_mutex) {
+ g_mutex_lock(self->thread_idle_mutex);
+ while(nb_done != self->nb_threads) {
+ nb_done = 0;
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (self->s3t[thread].done == 1)
+ nb_done++;
+ }
+ if (nb_done != self->nb_threads) {
+ g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+ }
}
+ g_mutex_unlock(self->thread_idle_mutex);
}
- g_mutex_unlock(self->thread_idle_mutex);
}