/*
- * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved.
+ * Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License version 2 as published
- * by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* Main object structure
*/
typedef struct _S3MetadataFile S3MetadataFile;
+typedef struct _S3Device S3Device;
typedef struct _S3_by_thread S3_by_thread;
struct _S3_by_thread {
char volatile * volatile filename;
DeviceStatusFlags volatile errflags; /* device_status */
char volatile * volatile errmsg; /* device error message */
+ GMutex *now_mutex;
+ guint64 dlnow, ulnow;
};
-typedef struct _S3Device S3Device;
struct _S3Device {
Device __parent__;
char *access_key;
char *user_token;
+ /* The Openstack swift information. */
+ char *swift_account_id;
+ char *swift_access_key;
+
+ char *username;
+ char *password;
+ char *tenant_id;
+ char *tenant_name;
+
char *bucket_location;
char *storage_class;
char *host;
char *service_path;
+ char *server_side_encryption;
+ char *proxy;
char *ca_info;
/* Produce verbose output? */
gboolean verbose;
+ /* create the bucket? */
+ gboolean create_bucket;
+
/* Use SSL? */
gboolean use_ssl;
+ S3_api s3_api;
/* Throttling */
guint64 max_send_speed;
guint64 volume_limit;
gboolean enforce_volume_limit;
gboolean use_subdomain;
+ gboolean use_s3_multi_delete;
int nb_threads;
int nb_threads_backup;
int nb_threads_recovery;
+ GThreadPool *thread_pool_delete;
GThreadPool *thread_pool_write;
GThreadPool *thread_pool_read;
GCond *thread_idle_cond;
GMutex *thread_idle_mutex;
int next_block_to_read;
+ GSList *keys;
+
+ guint64 dltotal;
+ guint64 ultotal;
+
+ /* google OAUTH2 */
+ char *client_id;
+ char *client_secret;
+ char *refresh_token;
+ char *project_id;
+
+ gboolean reuse_connection;
+
+ /* CAStor */
+ char *reps;
+ char *reps_bucket;
};
/*
/* Note: for compatability, min can only be decreased and max increased */
#define S3_DEVICE_MIN_BLOCK_SIZE 1024
-#define S3_DEVICE_MAX_BLOCK_SIZE (100*1024*1024)
+#define S3_DEVICE_MAX_BLOCK_SIZE (3*1024*1024*1024ULL)
#define S3_DEVICE_DEFAULT_BLOCK_SIZE (10*1024*1024)
#define EOM_EARLY_WARNING_ZONE_BLOCKS 4
#define PROPERTY_S3_SECRET_KEY (device_property_s3_secret_key.ID)
#define PROPERTY_S3_ACCESS_KEY (device_property_s3_access_key.ID)
+/* Authentication information for Openstack Swift. Both of these are strings. */
+static DevicePropertyBase device_property_swift_account_id;
+static DevicePropertyBase device_property_swift_access_key;
+#define PROPERTY_SWIFT_ACCOUNT_ID (device_property_swift_account_id.ID)
+#define PROPERTY_SWIFT_ACCESS_KEY (device_property_swift_access_key.ID)
+
+/* Authentication information for Openstack Swift. Both of these are strings. */
+static DevicePropertyBase device_property_username;
+static DevicePropertyBase device_property_password;
+static DevicePropertyBase device_property_tenant_id;
+static DevicePropertyBase device_property_tenant_name;
+#define PROPERTY_USERNAME (device_property_username.ID)
+#define PROPERTY_PASSWORD (device_property_password.ID)
+#define PROPERTY_TENANT_ID (device_property_tenant_id.ID)
+#define PROPERTY_TENANT_NAME (device_property_tenant_name.ID)
+
/* Host and path */
static DevicePropertyBase device_property_s3_host;
static DevicePropertyBase device_property_s3_service_path;
static DevicePropertyBase device_property_s3_storage_class;
#define PROPERTY_S3_STORAGE_CLASS (device_property_s3_storage_class.ID)
+/* Server side encryption */
+static DevicePropertyBase device_property_s3_server_side_encryption;
+#define PROPERTY_S3_SERVER_SIDE_ENCRYPTION (device_property_s3_server_side_encryption.ID)
+
+/* proxy */
+static DevicePropertyBase device_property_proxy;
+#define PROPERTY_PROXY (device_property_proxy.ID)
+
/* Path to certificate authority certificate */
static DevicePropertyBase device_property_ssl_ca_info;
#define PROPERTY_SSL_CA_INFO (device_property_ssl_ca_info.ID)
+/* Which strotage api to use. */
+static DevicePropertyBase device_property_storage_api;
+#define PROPERTY_STORAGE_API (device_property_storage_api.ID)
+
+/* Whether to use openstack protocol. */
+/* DEPRECATED */
+static DevicePropertyBase device_property_openstack_swift_api;
+#define PROPERTY_OPENSTACK_SWIFT_API (device_property_openstack_swift_api.ID)
+
/* Whether to use SSL with Amazon S3. */
static DevicePropertyBase device_property_s3_ssl;
#define PROPERTY_S3_SSL (device_property_s3_ssl.ID)
+/* Whether to re-use connection. */
+static DevicePropertyBase device_property_reuse_connection;
+#define PROPERTY_REUSE_CONNECTION (device_property_reuse_connection.ID)
+
/* Speed limits for sending and receiving */
static DevicePropertyBase device_property_max_send_speed;
static DevicePropertyBase device_property_max_recv_speed;
static DevicePropertyBase device_property_nb_threads_recovery;
#define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID)
+/* If the s3 server have the multi-delete functionality */
+static DevicePropertyBase device_property_s3_multi_delete;
+#define PROPERTY_S3_MULTI_DELETE (device_property_s3_multi_delete.ID)
+
+/* The client_id for OAUTH2 */
+static DevicePropertyBase device_property_client_id;
+#define PROPERTY_CLIENT_ID (device_property_client_id.ID)
+
+/* The client_secret for OAUTH2 */
+static DevicePropertyBase device_property_client_secret;
+#define PROPERTY_CLIENT_SECRET (device_property_client_secret.ID)
+
+/* The refresh token for OAUTH2 */
+static DevicePropertyBase device_property_refresh_token;
+#define PROPERTY_REFRESH_TOKEN (device_property_refresh_token.ID)
+
+/* The PROJECT ID */
+static DevicePropertyBase device_property_project_id;
+#define PROPERTY_PROJECT_ID (device_property_project_id.ID)
+
+/* The PROJECT ID */
+static DevicePropertyBase device_property_create_bucket;
+#define PROPERTY_CREATE_BUCKET (device_property_create_bucket.ID)
+
+/* CAStor replication values for objects and buckets */
+static DevicePropertyBase device_property_s3_reps;
+#define PROPERTY_S3_REPS (device_property_s3_reps.ID)
+#define S3_DEVICE_REPS_DEFAULT "2"
+static DevicePropertyBase device_property_s3_reps_bucket;
+#define PROPERTY_S3_REPS_BUCKET (device_property_s3_reps_bucket.ID)
+#define S3_DEVICE_REPS_BUCKET_DEFAULT "4"
+
/*
* prototypes
*/
static gboolean
setup_handle(S3Device * self);
+static void
+s3_wait_thread_delete(S3Device *self);
+
/*
* class mechanics */
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
+static gboolean s3_device_set_swift_account_id_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_swift_access_key_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_username(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_password(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_tenant_id(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_tenant_name(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
static gboolean s3_device_set_user_token_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
+static gboolean s3_device_set_server_side_encryption_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_proxy_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
static gboolean s3_device_set_ca_info_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
+static gboolean s3_device_set_create_bucket_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_storage_api(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_openstack_swift_api_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_s3_multi_delete_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
static gboolean s3_device_set_ssl_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
+static gboolean s3_device_set_reuse_connection_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
static gboolean s3_device_set_max_send_speed_fn(Device *self,
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
+static gboolean s3_device_set_client_id_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_client_secret_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_refresh_token_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_project_id_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_reps_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_reps_bucket_fn(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
static void s3_thread_read_block(gpointer thread_data,
gpointer data);
static void s3_thread_write_block(gpointer thread_data,
gpointer data);
+static gboolean make_bucket(Device * pself);
+
/* Wait that all threads are done */
static void reset_thread(S3Device *self);
static gboolean
s3_device_finish(Device * self);
+static guint64
+s3_device_get_bytes_read(Device * self);
+
+static guint64
+s3_device_get_bytes_written(Device * self);
+
static gboolean
s3_device_start_file(Device * self,
dumpfile_t * jobInfo);
d_self->volume_header = dumpinfo;
self->volume_bytes += header_size;
}
-
+ d_self->header_block_size = header_size;
return result;
}
delete_file(S3Device *self,
int file)
{
+ int thread = -1;
+
gboolean result;
GSList *keys;
guint64 total_size = 0;
- char *my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
Device *d_self = DEVICE(self);
+ char *my_prefix;
+
+ if (file == -1) {
+ my_prefix = g_strdup_printf("%sf", self->prefix);
+ } else {
+ my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
+ }
- result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL, &keys,
- &total_size);
+ result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL,
+ &keys, &total_size);
if (!result) {
device_set_error(d_self,
- vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
- DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
- return FALSE;
+ g_strdup_printf(_("While listing S3 keys: %s"),
+ s3_strerror(self->s3t[0].s3)),
+ DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
+ return FALSE;
}
- /* this will likely be a *lot* of keys */
- for (; keys; keys = g_slist_remove(keys, keys->data)) {
- if (self->verbose) g_debug(_("Deleting %s"), (char*)keys->data);
- if (!s3_delete(self->s3t[0].s3, self->bucket, keys->data)) {
- device_set_error(d_self,
- vstrallocf(_("While deleting key '%s': %s"),
- (char*)keys->data, s3_strerror(self->s3t[0].s3)),
- DEVICE_STATUS_DEVICE_ERROR);
- g_slist_free(keys);
- return FALSE;
- }
+ g_mutex_lock(self->thread_idle_mutex);
+ if (!self->keys) {
+ self->keys = keys;
+ } else {
+ self->keys = g_slist_concat(self->keys, keys);
+ }
+
+ // start the threads
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (self->s3t[thread].idle == 1) {
+ /* Check if the thread is in error */
+ if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
+ device_set_error(d_self,
+ (char *)self->s3t[thread].errmsg,
+ self->s3t[thread].errflags);
+ self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
+ self->s3t[thread].errmsg = NULL;
+ g_mutex_unlock(self->thread_idle_mutex);
+ s3_wait_thread_delete(self);
+ return FALSE;
+ }
+ self->s3t[thread].idle = 0;
+ self->s3t[thread].done = 0;
+ g_thread_pool_push(self->thread_pool_delete, &self->s3t[thread],
+ NULL);
+ }
}
+ g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+ g_mutex_unlock(self->thread_idle_mutex);
+
self->volume_bytes = total_size;
+ s3_wait_thread_delete(self);
+
return TRUE;
}
-static gboolean
-delete_all_files(S3Device *self)
+static void
+s3_thread_delete_block(
+ gpointer thread_data,
+ gpointer data)
{
- int file, last_file;
+ static int count = 0;
+ S3_by_thread *s3t = (S3_by_thread *)thread_data;
+ Device *pself = (Device *)data;
+ S3Device *self = S3_DEVICE(pself);
+ int result = 1;
+ char *filename;
- /*
- * Note: this has to be allowed to retry for a while because the bucket
- * may have been created and not yet appeared
- */
- last_file = find_last_file(self);
- if (last_file < 0) {
- guint response_code;
- s3_error_code_t s3_error_code;
- s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
+ g_mutex_lock(self->thread_idle_mutex);
+ while (result && self->keys) {
+ if (self->use_s3_multi_delete) {
+ char **filenames = g_new(char *, 1001);
+ char **f = filenames;
+ int n = 0;
+ while (self->keys && n<1000) {
+ *f++ = self->keys->data;
+ self->keys = g_slist_remove(self->keys, self->keys->data);
+ n++;
+ }
+ *f++ = NULL;
+ g_mutex_unlock(self->thread_idle_mutex);
+ result = s3_multi_delete(s3t->s3, (const char *)self->bucket,
+ (const char **)filenames);
+ if (result != 1) {
+ char **f;
+
+ if (result == 2) {
+ g_debug("Deleting multiple keys not implemented");
+ } else { /* result == 0 */
+ g_debug("Deleteing multiple keys failed: %s",
+ s3_strerror(s3t->s3));
+ }
- /*
- * if the bucket doesn't exist, it doesn't conatin any files,
- * so the operation is a success
- */
- if ((response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket)) {
- /* find_last_file set an error; clear it */
- device_set_error(DEVICE(self), NULL, DEVICE_STATUS_SUCCESS);
- return TRUE;
- } else {
- /* find_last_file already set the error */
- return FALSE;
- }
+ self->use_s3_multi_delete = 0;
+ /* re-add all filenames */
+ f = filenames;
+ g_mutex_lock(self->thread_idle_mutex);
+ while(*f) {
+ self->keys = g_slist_prepend(self->keys, *f++);
+ }
+ g_mutex_unlock(self->thread_idle_mutex);
+ g_free(filenames);
+ result = 1;
+ g_mutex_lock(self->thread_idle_mutex);
+ continue;
+ }
+ f = filenames;
+ while(*f) {
+ g_free(*f++);
+ }
+ g_free(filenames);
+ } else {
+ filename = self->keys->data;
+ self->keys = g_slist_remove(self->keys, self->keys->data);
+ count++;
+ if (count >= 1000) {
+ g_debug("Deleting %s ...", filename);
+ count = 0;
+ }
+ g_mutex_unlock(self->thread_idle_mutex);
+ result = s3_delete(s3t->s3, (const char *)self->bucket,
+ (const char *)filename);
+ if (!result) {
+ s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
+ s3t->errmsg = g_strdup_printf(_("While deleting key '%s': %s"),
+ filename, s3_strerror(s3t->s3));
+ }
+ g_free(filename);
+ }
+ g_mutex_lock(self->thread_idle_mutex);
}
+ s3t->idle = 1;
+ s3t->done = 1;
+ g_cond_broadcast(self->thread_idle_cond);
+ g_mutex_unlock(self->thread_idle_mutex);
+}
- for (file = 1; file <= last_file; file++) {
- if (!delete_file(self, file))
- /* delete_file already set our error message */
- return FALSE;
+static void
+s3_wait_thread_delete(S3Device *self)
+{
+ Device *d_self = (Device *)self;
+ int idle_thread = 0;
+ int thread;
+
+ g_mutex_lock(self->thread_idle_mutex);
+ while (idle_thread != self->nb_threads) {
+ idle_thread = 0;
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (self->s3t[thread].idle == 1) {
+ idle_thread++;
+ }
+ /* Check if the thread is in error */
+ if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
+ device_set_error(d_self, (char *)self->s3t[thread].errmsg,
+ self->s3t[thread].errflags);
+ self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
+ self->s3t[thread].errmsg = NULL;
+ }
+ }
+ if (idle_thread != self->nb_threads) {
+ g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+ }
}
+ g_mutex_unlock(self->thread_idle_mutex);
+}
- return TRUE;
+
+static gboolean
+delete_all_files(S3Device *self)
+{
+ return delete_file(self, -1);
}
/*
device_property_fill_and_register(&device_property_s3_access_key,
G_TYPE_STRING, "s3_access_key",
"Access key ID to authenticate with Amazon S3");
+ device_property_fill_and_register(&device_property_swift_account_id,
+ G_TYPE_STRING, "swift_account_id",
+ "Account ID to authenticate with openstack swift");
+ device_property_fill_and_register(&device_property_swift_access_key,
+ G_TYPE_STRING, "swift_access_key",
+ "Access key to authenticate with openstack swift");
+ device_property_fill_and_register(&device_property_username,
+ G_TYPE_STRING, "username",
+ "Username to authenticate with");
+ device_property_fill_and_register(&device_property_password,
+ G_TYPE_STRING, "password",
+ "password to authenticate with");
+ device_property_fill_and_register(&device_property_tenant_id,
+ G_TYPE_STRING, "tenant_id",
+ "tenant_id to authenticate with");
+ device_property_fill_and_register(&device_property_tenant_name,
+ G_TYPE_STRING, "tenant_name",
+ "tenant_name to authenticate with");
device_property_fill_and_register(&device_property_s3_host,
G_TYPE_STRING, "s3_host",
"hostname:port of the server");
device_property_fill_and_register(&device_property_s3_storage_class,
G_TYPE_STRING, "s3_storage_class",
"Storage class as specified by Amazon (STANDARD or REDUCED_REDUNDANCY)");
+ device_property_fill_and_register(&device_property_s3_server_side_encryption,
+ G_TYPE_STRING, "s3_server_side_encryption",
+ "Serve side encryption as specified by Amazon (AES256)");
+ device_property_fill_and_register(&device_property_proxy,
+ G_TYPE_STRING, "proxy",
+ "The proxy");
device_property_fill_and_register(&device_property_ssl_ca_info,
G_TYPE_STRING, "ssl_ca_info",
"Path to certificate authority certificate");
+ device_property_fill_and_register(&device_property_storage_api,
+ G_TYPE_STRING, "storage_api",
+ "Which cloud API to use.");
+ device_property_fill_and_register(&device_property_openstack_swift_api,
+ G_TYPE_STRING, "openstack_swift_api",
+ "Whether to use openstack protocol");
+ device_property_fill_and_register(&device_property_client_id,
+ G_TYPE_STRING, "client_id",
+ "client_id for use with oauth2");
+ device_property_fill_and_register(&device_property_client_secret,
+ G_TYPE_STRING, "client_secret",
+ "client_secret for use with oauth2");
+ device_property_fill_and_register(&device_property_refresh_token,
+ G_TYPE_STRING, "refresh_token",
+ "refresh_token for use with oauth2");
+ device_property_fill_and_register(&device_property_project_id,
+ G_TYPE_STRING, "project_id",
+ "project id for use with google");
device_property_fill_and_register(&device_property_s3_ssl,
G_TYPE_BOOLEAN, "s3_ssl",
"Whether to use SSL with Amazon S3");
+ device_property_fill_and_register(&device_property_reuse_connection,
+ G_TYPE_BOOLEAN, "reuse_connection",
+ "Whether to reuse connection");
+ device_property_fill_and_register(&device_property_create_bucket,
+ G_TYPE_BOOLEAN, "create_bucket",
+ "Whether to create/delete bucket");
device_property_fill_and_register(&device_property_s3_subdomain,
G_TYPE_BOOLEAN, "s3_subdomain",
"Whether to use subdomain");
device_property_fill_and_register(&device_property_nb_threads_recovery,
G_TYPE_UINT64, "nb_threads_recovery",
"Number of reader thread");
+ device_property_fill_and_register(&device_property_s3_multi_delete,
+ G_TYPE_BOOLEAN, "s3_multi_delete",
+ "Whether to use multi-delete");
+ device_property_fill_and_register(&device_property_s3_reps,
+ G_TYPE_STRING, "reps",
+ "Number of replicas for data objects in CAStor");
+ device_property_fill_and_register(&device_property_s3_reps_bucket,
+ G_TYPE_STRING, "reps_bucket",
+ "Number of replicas for automatically created buckets in CAStor");
/* register the device itself */
register_device(s3_device_factory, device_prefix_list);
Device * dself = DEVICE(self);
GValue response;
+ self->s3_api = S3_API_S3;
self->volume_bytes = 0;
self->volume_limit = 0;
self->leom = TRUE;
self->nb_threads = 1;
self->nb_threads_backup = 1;
self->nb_threads_recovery = 1;
+ self->thread_pool_delete = NULL;
self->thread_pool_write = NULL;
self->thread_pool_read = NULL;
self->thread_idle_cond = NULL;
self->thread_idle_mutex = NULL;
+ self->use_s3_multi_delete = 1;
+ self->reps = NULL;
+ self->reps_bucket = NULL;
/* Register property values
* Note: Some aren't added until s3_device_open_device()
device_class->read_label = s3_device_read_label;
device_class->start = s3_device_start;
device_class->finish = s3_device_finish;
+ device_class->get_bytes_read = s3_device_get_bytes_read;
+ device_class->get_bytes_written = s3_device_get_bytes_written;
device_class->start_file = s3_device_start_file;
device_class->write_block = s3_device_write_block;
device_simple_property_get_fn,
s3_device_set_secret_key_fn);
+ device_class_register_property(device_class, PROPERTY_SWIFT_ACCOUNT_ID,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_swift_account_id_fn);
+
+ device_class_register_property(device_class, PROPERTY_SWIFT_ACCESS_KEY,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_swift_access_key_fn);
+
+ device_class_register_property(device_class, PROPERTY_USERNAME,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_username);
+
+ device_class_register_property(device_class, PROPERTY_PASSWORD,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_password);
+
+ device_class_register_property(device_class, PROPERTY_TENANT_ID,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_tenant_id);
+
+ device_class_register_property(device_class, PROPERTY_TENANT_NAME,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_tenant_name);
+
device_class_register_property(device_class, PROPERTY_S3_HOST,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
device_simple_property_get_fn,
s3_device_set_storage_class_fn);
+ device_class_register_property(device_class, PROPERTY_S3_SERVER_SIDE_ENCRYPTION,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_server_side_encryption_fn);
+
+ device_class_register_property(device_class, PROPERTY_PROXY,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_proxy_fn);
+
device_class_register_property(device_class, PROPERTY_SSL_CA_INFO,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
device_simple_property_get_fn,
s3_device_set_verbose_fn);
+ device_class_register_property(device_class, PROPERTY_CREATE_BUCKET,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_create_bucket_fn);
+
+ device_class_register_property(device_class, PROPERTY_STORAGE_API,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_storage_api);
+
+ device_class_register_property(device_class, PROPERTY_OPENSTACK_SWIFT_API,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_openstack_swift_api_fn);
+
+ device_class_register_property(device_class, PROPERTY_S3_MULTI_DELETE,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_s3_multi_delete_fn);
+
device_class_register_property(device_class, PROPERTY_S3_SSL,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_ssl_fn);
+ device_class_register_property(device_class, PROPERTY_REUSE_CONNECTION,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_reuse_connection_fn);
+
device_class_register_property(device_class, PROPERTY_MAX_SEND_SPEED,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_enforce_max_volume_usage_fn);
device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN,
- (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
- (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
s3_device_set_use_subdomain_fn);
+
+ device_class_register_property(device_class, PROPERTY_CLIENT_ID,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_client_id_fn);
+
+ device_class_register_property(device_class, PROPERTY_CLIENT_SECRET,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_client_secret_fn);
+
+ device_class_register_property(device_class, PROPERTY_REFRESH_TOKEN,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_refresh_token_fn);
+
+ device_class_register_property(device_class, PROPERTY_PROJECT_ID,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_project_id_fn);
+
+ device_class_register_property(device_class, PROPERTY_S3_REPS,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_reps_fn);
+
+ device_class_register_property(device_class, PROPERTY_S3_REPS_BUCKET,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_reps_bucket_fn);
}
static gboolean
}
static gboolean
-s3_device_set_host_fn(Device *p_self,
- DevicePropertyBase *base, GValue *val,
- PropertySurety surety, PropertySource source)
+s3_device_set_swift_account_id_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
- amfree(self->host);
- self->host = g_value_dup_string(val);
+ amfree(self->swift_account_id);
+ self->swift_account_id = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
-s3_device_set_service_path_fn(Device *p_self,
- DevicePropertyBase *base, GValue *val,
- PropertySurety surety, PropertySource source)
+s3_device_set_swift_access_key_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
- amfree(self->service_path);
- self->service_path = g_value_dup_string(val);
+ amfree(self->swift_access_key);
+ self->swift_access_key = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
-s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base,
+s3_device_set_username(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
- amfree(self->user_token);
- self->user_token = g_value_dup_string(val);
+ amfree(self->username);
+ self->username = g_value_dup_string(val);
device_clear_volume_details(p_self);
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
static gboolean
-s3_device_set_bucket_location_fn(Device *p_self, DevicePropertyBase *base,
+s3_device_set_password(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
- char *str_val = g_value_dup_string(val);
- if (str_val[0] && self->use_ssl && !s3_curl_location_compat()) {
- device_set_error(p_self, stralloc(_(
- "Location constraint given for Amazon S3 bucket, "
- "but libcurl is too old support wildcard certificates.")),
- DEVICE_STATUS_DEVICE_ERROR);
- goto fail;
+ amfree(self->password);
+ self->password = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_tenant_id(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->tenant_id);
+ self->tenant_id = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_tenant_name(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->tenant_name);
+ self->tenant_name = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_host_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->host);
+ self->host = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_service_path_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->service_path);
+ self->service_path = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->user_token);
+ self->user_token = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_bucket_location_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+ char *str_val = g_value_dup_string(val);
+
+ if (str_val[0] && self->use_ssl && !s3_curl_location_compat()) {
+ device_set_error(p_self, stralloc(_(
+ "Location constraint given for Amazon S3 bucket, "
+ "but libcurl is too old support wildcard certificates.")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ goto fail;
}
if (str_val[0] && !s3_bucket_location_compat(self->bucket)) {
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
+static gboolean
+s3_device_set_server_side_encryption_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+ char *str_val = g_value_dup_string(val);
+
+ amfree(self->server_side_encryption);
+ self->server_side_encryption = str_val;
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_proxy_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+ char *str_val = g_value_dup_string(val);
+
+ amfree(self->proxy);
+ self->proxy = str_val;
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
static gboolean
s3_device_set_ca_info_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
+static gboolean
+s3_device_set_create_bucket_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+ int thread;
+
+ self->create_bucket = g_value_get_boolean(val);
+ /* Our S3 handle may not yet have been instantiated; if so, it will
+ * get the proper verbose setting when it is created */
+ if (self->s3t) {
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (self->s3t[thread].s3)
+ s3_verbose(self->s3t[thread].s3, self->verbose);
+ }
+ }
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_storage_api(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ const char *storage_api = g_value_get_string(val);
+ if (g_str_equal(storage_api, "S3")) {
+ self->s3_api = S3_API_S3;
+ } else if (g_str_equal(storage_api, "SWIFT-1.0")) {
+ self->s3_api = S3_API_SWIFT_1;
+ } else if (g_str_equal(storage_api, "SWIFT-2.0")) {
+ self->s3_api = S3_API_SWIFT_2;
+ } else if (g_str_equal(storage_api, "OAUTH2")) {
+ self->s3_api = S3_API_OAUTH2;
+ } else if (g_str_equal(storage_api, "CASTOR")) {
+#if LIBCURL_VERSION_NUM >= 0x071301
+ curl_version_info_data *info;
+ /* check the runtime version too */
+ info = curl_version_info(CURLVERSION_NOW);
+ if (info->version_num >= 0x071301) {
+ self->s3_api = S3_API_CASTOR;
+ } else {
+ device_set_error(p_self, g_strdup_printf(_(
+ "Error setting STORAGE-API to castor "
+ "(You must install libcurl 7.19.1 or newer)")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+#else
+ device_set_error(p_self, g_strdup_printf(_(
+ "Error setting STORAGE-API to castor "
+ "This amanda is compiled with a too old libcurl, you must compile with libcurl 7.19.1 or newer")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+#endif
+ } else {
+ g_debug("Invalid STORAGE_API, using \"S3\".");
+ self->s3_api = S3_API_S3;
+ }
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_openstack_swift_api_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+
+ const gboolean openstack_swift_api = g_value_get_boolean(val);
+ if (openstack_swift_api) {
+ GValue storage_api_val;
+ g_value_init(&storage_api_val, G_TYPE_STRING);
+ g_value_set_static_string(&storage_api_val, "SWIFT-1.0");
+ return s3_device_set_storage_api(p_self, base, &storage_api_val,
+ surety, source);
+ }
+ return TRUE;
+}
+
+static gboolean
+s3_device_set_s3_multi_delete_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ self->use_s3_multi_delete = g_value_get_boolean(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
static gboolean
s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
+static gboolean
+s3_device_set_reuse_connection_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ self->reuse_connection = g_value_get_boolean(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
static gboolean
s3_device_set_max_send_speed_fn(Device *p_self,
DevicePropertyBase *base, GValue *val,
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
+
+static gboolean
+s3_device_set_client_id_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->client_id);
+ self->client_id = g_value_dup_string(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_client_secret_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->client_secret);
+ self->client_secret = g_value_dup_string(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_refresh_token_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->refresh_token);
+ self->refresh_token = g_value_dup_string(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_project_id_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->project_id);
+ self->project_id = g_value_dup_string(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_reps_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->reps);
+ self->reps = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_reps_bucket_fn(Device *p_self, DevicePropertyBase *base,
+ GValue *val, PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->reps_bucket);
+ self->reps_bucket = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
static Device*
s3_device_factory(char * device_name, char * device_type, char * device_node)
{
Device *rval;
- S3Device * s3_rval;
g_assert(0 == strcmp(device_type, S3_DEVICE_NAME));
rval = DEVICE(g_object_new(TYPE_S3_DEVICE, NULL));
- s3_rval = (S3Device*)rval;
device_open_device(rval, device_name, device_type, device_node);
return rval;
return;
}
+ if (self->reps == NULL) {
+ self->reps = g_strdup(S3_DEVICE_REPS_DEFAULT);
+ }
+
+ if (self->reps_bucket == NULL) {
+ self->reps_bucket = g_strdup(S3_DEVICE_REPS_BUCKET_DEFAULT);
+ }
+
g_debug(_("S3 driver using bucket '%s', prefix '%s'"), self->bucket, self->prefix);
/* default values */
self->verbose = FALSE;
+ self->s3_api = S3_API_S3;
/* use SSL if available */
self->use_ssl = s3_curl_supports_ssl();
device_set_simple_property(pself, device_property_s3_ssl.ID,
&tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
+ /* reuse connection */
+ self->reuse_connection = TRUE;
+ bzero(&tmp_value, sizeof(GValue));
+ g_value_init(&tmp_value, G_TYPE_BOOLEAN);
+ g_value_set_boolean(&tmp_value, self->reuse_connection);
+ device_set_simple_property(pself, device_property_reuse_connection.ID,
+ &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
+
+ /* Set default create_bucket */
+ self->create_bucket = TRUE;
+ bzero(&tmp_value, sizeof(GValue));
+ g_value_init(&tmp_value, G_TYPE_BOOLEAN);
+ g_value_set_boolean(&tmp_value, self->create_bucket);
+ device_set_simple_property(pself, device_property_create_bucket.ID,
+ &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT);
+
if (parent_class->open_device) {
parent_class->open_device(pself, device_name, device_type, device_node);
}
if(G_OBJECT_CLASS(parent_class)->finalize)
(* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
+ if (self->thread_pool_delete) {
+ g_thread_pool_free(self->thread_pool_delete, 1, 1);
+ self->thread_pool_delete = NULL;
+ }
if (self->thread_pool_write) {
g_thread_pool_free(self->thread_pool_write, 1, 1);
self->thread_pool_write = NULL;
}
if (self->s3t) {
for (thread = 0; thread < self->nb_threads; thread++) {
+ g_mutex_free(self->s3t[thread].now_mutex);
if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3);
+ g_free(self->s3t[thread].curl_buffer.buffer);
}
g_free(self->s3t);
}
if(self->prefix) g_free(self->prefix);
if(self->access_key) g_free(self->access_key);
if(self->secret_key) g_free(self->secret_key);
+ if(self->swift_account_id) g_free(self->swift_account_id);
+ if(self->swift_access_key) g_free(self->swift_access_key);
+ if(self->username) g_free(self->username);
+ if(self->password) g_free(self->password);
+ if(self->tenant_id) g_free(self->tenant_id);
+ if(self->tenant_name) g_free(self->tenant_name);
if(self->host) g_free(self->host);
if(self->service_path) g_free(self->service_path);
if(self->user_token) g_free(self->user_token);
if(self->bucket_location) g_free(self->bucket_location);
if(self->storage_class) g_free(self->storage_class);
+ if(self->server_side_encryption) g_free(self->server_side_encryption);
+ if(self->proxy) g_free(self->proxy);
if(self->ca_info) g_free(self->ca_info);
+ if(self->reps) g_free(self->reps);
+ if(self->reps_bucket) g_free(self->reps_bucket);
}
static gboolean setup_handle(S3Device * self) {
Device *d_self = DEVICE(self);
int thread;
+ guint response_code;
+ s3_error_code_t s3_error_code;
+ CURLcode curl_code;
if (self->s3t == NULL) {
- self->s3t = g_new(S3_by_thread, self->nb_threads);
- if (self->s3t == NULL) {
- device_set_error(d_self,
- stralloc(_("Can't allocate S3Handle array")),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
- }
- if (self->access_key == NULL || self->access_key[0] == '\0') {
- device_set_error(d_self,
- stralloc(_("No Amazon access key specified")),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
- }
+ if (self->s3_api == S3_API_S3) {
+ if (self->access_key == NULL || self->access_key[0] == '\0') {
+ device_set_error(d_self,
+ g_strdup(_("No Amazon access key specified")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
- if (self->secret_key == NULL || self->secret_key[0] == '\0') {
+ if (self->secret_key == NULL || self->secret_key[0] == '\0') {
+ device_set_error(d_self,
+ g_strdup(_("No Amazon secret key specified")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ } else if (self->s3_api == S3_API_SWIFT_1) {
+ if (self->swift_account_id == NULL ||
+ self->swift_account_id[0] == '\0') {
+ device_set_error(d_self,
+ g_strdup(_("No Swift account id specified")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ if (self->swift_access_key == NULL ||
+ self->swift_access_key[0] == '\0') {
+ device_set_error(d_self,
+ g_strdup(_("No Swift access key specified")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ } else if (self->s3_api == S3_API_SWIFT_2) {
+ if (!((self->username && self->password && self->tenant_id) ||
+ (self->username && self->password && self->tenant_name) ||
+ (self->access_key && self->secret_key && self->tenant_id) ||
+ (self->access_key && self->secret_key && self->tenant_name))) {
+ device_set_error(d_self,
+ g_strdup(_("Missing authorization properties")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ } else if (self->s3_api == S3_API_OAUTH2) {
+ if (self->client_id == NULL ||
+ self->client_id[0] == '\0') {
+ device_set_error(d_self,
+ g_strdup(_("Missing client_id properties")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ if (self->client_secret == NULL ||
+ self->client_secret[0] == '\0') {
+ device_set_error(d_self,
+ g_strdup(_("Missing client_secret properties")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ if (self->refresh_token == NULL ||
+ self->refresh_token[0] == '\0') {
+ device_set_error(d_self,
+ g_strdup(_("Missing refresh_token properties")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ if (self->project_id == NULL ||
+ self->project_id[0] == '\0') {
+ device_set_error(d_self,
+ g_strdup(_("Missing project_id properties")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ } else if (self->s3_api == S3_API_CASTOR) {
+ self->use_s3_multi_delete = 0;
+ self->use_subdomain = FALSE;
+ if(self->service_path) {
+ g_free(self->service_path);
+ self->service_path = NULL;
+ }
+ }
+
+ self->s3t = g_new0(S3_by_thread, self->nb_threads);
+ if (self->s3t == NULL) {
device_set_error(d_self,
- stralloc(_("No Amazon secret key specified")),
+ g_strdup(_("Can't allocate S3Handle array")),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
- if (!self->use_ssl && self->ca_info) {
- amfree(self->ca_info);
- }
+ self->thread_idle_cond = g_cond_new();
+ self->thread_idle_mutex = g_mutex_new();
for (thread = 0; thread < self->nb_threads; thread++) {
self->s3t[thread].idle = 1;
self->s3t[thread].filename = NULL;
self->s3t[thread].curl_buffer.buffer = NULL;
self->s3t[thread].curl_buffer.buffer_len = 0;
+ self->s3t[thread].now_mutex = g_mutex_new();
self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key,
+ self->swift_account_id,
+ self->swift_access_key,
self->host, self->service_path,
self->use_subdomain,
self->user_token, self->bucket_location,
- self->storage_class, self->ca_info);
+ self->storage_class, self->ca_info,
+ self->server_side_encryption,
+ self->proxy,
+ self->s3_api,
+ self->username,
+ self->password,
+ self->tenant_id,
+ self->tenant_name,
+ self->client_id,
+ self->client_secret,
+ self->refresh_token,
+ self->reuse_connection,
+ self->reps, self->reps_bucket);
if (self->s3t[thread].s3 == NULL) {
device_set_error(d_self,
stralloc(_("Internal error creating S3 handle")),
DEVICE_STATUS_DEVICE_ERROR);
+ self->nb_threads = thread+1;
return FALSE;
- }
+ }
}
g_debug("Create %d threads", self->nb_threads);
+ self->thread_pool_delete = g_thread_pool_new(s3_thread_delete_block,
+ self, self->nb_threads, 0,
+ NULL);
self->thread_pool_write = g_thread_pool_new(s3_thread_write_block, self,
self->nb_threads, 0, NULL);
self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self,
self->nb_threads, 0, NULL);
- self->thread_idle_cond = g_cond_new();
- self->thread_idle_mutex = g_mutex_new();
- }
- for (thread = 0; thread < self->nb_threads; thread++) {
- s3_verbose(self->s3t[thread].s3, self->verbose);
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ s3_verbose(self->s3t[thread].s3, self->verbose);
- if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
- device_set_error(d_self, g_strdup_printf(_(
- "Error setting S3 SSL/TLS use "
- "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
+ device_set_error(d_self, g_strdup_printf(_(
+ "Error setting S3 SSL/TLS use "
+ "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+
+ if (self->max_send_speed &&
+ !s3_set_max_send_speed(self->s3t[thread].s3,
+ self->max_send_speed)) {
+ device_set_error(d_self,
+ g_strdup("Could not set S3 maximum send speed"),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+
+ if (self->max_recv_speed &&
+ !s3_set_max_recv_speed(self->s3t[thread].s3,
+ self->max_recv_speed)) {
+ device_set_error(d_self,
+ g_strdup("Could not set S3 maximum recv speed"),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
}
- if (self->max_send_speed &&
- !s3_set_max_send_speed(self->s3t[thread].s3, self->max_send_speed)) {
- device_set_error(d_self,
- g_strdup("Could not set S3 maximum send speed"),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (!s3_open2(self->s3t[thread].s3)) {
+ if (self->s3_api == S3_API_SWIFT_1 ||
+ self->s3_api == S3_API_SWIFT_2) {
+ s3_error(self->s3t[0].s3, NULL, &response_code,
+ &s3_error_code, NULL, &curl_code, NULL);
+ device_set_error(d_self,
+ g_strdup_printf(_("s3_open2 failed: %s"),
+ s3_strerror(self->s3t[0].s3)),
+ DEVICE_STATUS_DEVICE_ERROR);
+ self->nb_threads = thread+1;
+ return FALSE;
+ } else {
+ device_set_error(d_self,
+ g_strdup("s3_open2 failed"),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ }
}
+ }
- if (self->max_recv_speed &&
- !s3_set_max_recv_speed(self->s3t[thread].s3, self->max_recv_speed)) {
- device_set_error(d_self,
- g_strdup("Could not set S3 maximum recv speed"),
+ return TRUE;
+}
+
+static gboolean
+make_bucket(
+ Device * pself)
+{
+ S3Device *self = S3_DEVICE(pself);
+ guint response_code;
+ s3_error_code_t s3_error_code;
+ CURLcode curl_code;
+
+ if (s3_is_bucket_exists(self->s3t[0].s3, self->bucket, self->project_id)) {
+ return TRUE;
+ }
+
+ s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, &curl_code, NULL);
+
+ if (response_code == 0 && s3_error_code == 0 &&
+ (curl_code == CURLE_COULDNT_CONNECT ||
+ curl_code == CURLE_COULDNT_RESOLVE_HOST)) {
+ device_set_error(pself,
+ g_strdup_printf(_("While connecting to S3 bucket: %s"),
+ s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
- }
+ return FALSE;
+ }
+
+ if (!self->create_bucket) {
+ device_set_error(pself,
+ g_strdup_printf(_("Can't list bucket: %s"),
+ s3_strerror(self->s3t[0].s3)),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
}
+ if (!s3_make_bucket(self->s3t[0].s3, self->bucket, self->project_id)) {
+ s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
+
+ /* if it isn't an expected error (bucket already exists),
+ * return FALSE */
+ if (response_code != 409 ||
+ (s3_error_code != S3_ERROR_BucketAlreadyExists &&
+ s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) {
+ device_set_error(pself,
+ g_strdup_printf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ }
return TRUE;
}
+static int progress_func(
+ void *thread_data,
+ double dltotal G_GNUC_UNUSED,
+ double dlnow,
+ double ultotal G_GNUC_UNUSED,
+ double ulnow)
+{
+ S3_by_thread *s3t = (S3_by_thread *)thread_data;
+
+ g_mutex_lock(s3t->now_mutex);
+ s3t->dlnow = dlnow;
+ s3t->ulnow = ulnow;
+ g_mutex_unlock(s3t->now_mutex);
+
+ return 0;
+}
+
static DeviceStatusFlags
s3_device_read_label(Device *pself) {
S3Device *self = S3_DEVICE(pself);
reset_thread(self);
key = special_file_to_key(self, "tapestart", -1);
+
+ if (!make_bucket(pself)) {
+ return pself->status;
+ }
+
if (!s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) {
guint response_code;
s3_error_code_t s3_error_code;
/* if it's an expected error (not found), just return FALSE */
if (response_code == 404 &&
- (s3_error_code == S3_ERROR_NoSuchKey ||
+ (s3_error_code == S3_ERROR_None ||
+ s3_error_code == S3_ERROR_Unknown ||
+ s3_error_code == S3_ERROR_NoSuchKey ||
s3_error_code == S3_ERROR_NoSuchEntity ||
s3_error_code == S3_ERROR_NoSuchBucket)) {
g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)"));
return pself->status;
}
+ pself->header_block_size = buf.buffer_len;
g_assert(buf.buffer != NULL);
amanda_header = g_new(dumpfile_t, 1);
parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
reset_thread(self);
pself->access_mode = mode;
+ g_mutex_lock(pself->device_mutex);
pself->in_file = FALSE;
+ g_mutex_unlock(pself->device_mutex);
/* try creating the bucket, in case it doesn't exist */
- if (mode != ACCESS_READ && !s3_make_bucket(self->s3t[0].s3, self->bucket)) {
- guint response_code;
- s3_error_code_t s3_error_code;
- s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
-
- /* if it isn't an expected error (bucket already exists),
- * return FALSE */
- if (response_code != 409 ||
- (s3_error_code != S3_ERROR_BucketAlreadyExists &&
- s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) {
- device_set_error(pself,
- vstrallocf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
- }
+ if (!make_bucket(pself)) {
+ return FALSE;
}
/* take care of any dirty work for this mode */
break;
case ACCESS_WRITE:
- delete_all_files(self);
+ if (!delete_all_files(self)) {
+ return FALSE;
+ }
/* write a new amanda header */
if (!write_amanda_header(self, label, timestamp)) {
/* functions for writing */
+static guint64
+s3_device_get_bytes_read(
+ Device * pself)
+{
+ S3Device *self = S3_DEVICE(pself);
+ int thread;
+ guint64 dltotal;
+
+ g_mutex_unlock(pself->device_mutex);
+ /* Add per thread */
+ g_mutex_lock(self->thread_idle_mutex);
+ dltotal = self->dltotal;
+ for (thread = 0; thread < self->nb_threads_recovery; thread++) {
+ g_mutex_lock(self->s3t[thread].now_mutex);
+ dltotal += self->s3t[thread].dlnow;
+ g_mutex_unlock(self->s3t[thread].now_mutex);
+ }
+ g_mutex_unlock(self->thread_idle_mutex);
+ g_mutex_lock(pself->device_mutex);
+
+ return dltotal;
+}
+
+
+static guint64
+s3_device_get_bytes_written(
+ Device * pself)
+{
+ S3Device *self = S3_DEVICE(pself);
+ int thread;
+ guint64 ultotal;
+
+ g_mutex_unlock(pself->device_mutex);
+ /* Add per thread */
+ g_mutex_lock(self->thread_idle_mutex);
+ ultotal = self->ultotal;
+ for (thread = 0; thread < self->nb_threads_backup; thread++) {
+ g_mutex_lock(self->s3t[thread].now_mutex);
+ ultotal += self->s3t[thread].ulnow;
+ g_mutex_unlock(self->s3t[thread].now_mutex);
+ }
+ g_mutex_unlock(self->thread_idle_mutex);
+ g_mutex_lock(pself->device_mutex);
+
+ return ultotal;
+}
+
+
static gboolean
s3_device_start_file (Device *pself, dumpfile_t *jobInfo) {
S3Device *self = S3_DEVICE(pself);
g_free(amanda_header.buffer);
return FALSE;
}
+
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ self->s3t[thread].idle = 1;
+ self->s3t[thread].ulnow = 0;
+ }
+
/* set the file and block numbers correctly */
pself->file = (pself->file > 0)? pself->file+1 : 1;
pself->block = 0;
+ g_mutex_lock(pself->device_mutex);
pself->in_file = TRUE;
+ pself->bytes_written = 0;
+ g_mutex_unlock(pself->device_mutex);
+ g_mutex_lock(self->thread_idle_mutex);
+ self->ultotal = 0;
+ g_mutex_unlock(self->thread_idle_mutex);
/* write it out as a special block (not the 0th) */
key = special_file_to_key(self, "filestart", pself->file);
result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
}
self->volume_bytes += header_size;
- for (thread = 0; thread < self->nb_threads; thread++) {
- self->s3t[thread].idle = 1;
- }
return TRUE;
}
for (thread = 0; thread < self->nb_threads_backup; thread++) {
if (self->s3t[thread].idle == 1) {
idle_thread++;
- if (first_idle == -1)
- first_idle = thread;
/* Check if the thread is in error */
if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
device_set_error(pself, (char *)self->s3t[thread].errmsg,
g_mutex_unlock(self->thread_idle_mutex);
return FALSE;
}
+ if (first_idle == -1) {
+ first_idle = thread;
+ break;
+ }
}
}
if (!idle_thread) {
gboolean result;
result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename,
- S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
+ S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer,
+ progress_func, s3t);
g_free((void *)s3t->filename);
s3t->filename = NULL;
if (!result) {
g_mutex_lock(self->thread_idle_mutex);
s3t->idle = 1;
s3t->done = 1;
+ if (result)
+ self->ultotal += s3t->curl_buffer.buffer_len;
s3t->curl_buffer.buffer_len = s3t->buffer_len;
+ s3t->ulnow = 0;
g_cond_broadcast(self->thread_idle_cond);
g_mutex_unlock(self->thread_idle_mutex);
}
g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
}
}
+ self->ultotal = 0;
g_mutex_unlock(self->thread_idle_mutex);
if (device_in_error(pself)) return FALSE;
/* we're not in a file anymore */
+ g_mutex_lock(pself->device_mutex);
pself->in_file = FALSE;
+ pself->bytes_written = 0;;
+ g_mutex_unlock(pself->device_mutex);
return TRUE;
}
if (device_in_error(self)) return FALSE;
reset_thread(self);
- return delete_file(self, file);
+ delete_file(self, file);
+ s3_wait_thread_delete(self);
+ return !device_in_error(self);
/* delete_file already set our error message if necessary */
}
}
g_free(key);
+ dumpfile_free(pself->volume_header);
+ pself->volume_header = NULL;
+
if (!delete_all_files(self))
return FALSE;
- if (!s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
+ device_set_error(pself, g_strdup("Unlabeled volume"),
+ DEVICE_STATUS_VOLUME_UNLABELED);
+
+ if (self->create_bucket &&
+ !s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
/*
pself->file = file;
pself->is_eof = FALSE;
- pself->in_file = FALSE;
pself->block = 0;
+ g_mutex_lock(pself->device_mutex);
+ pself->in_file = FALSE;
+ pself->bytes_read = 0;
+ g_mutex_unlock(pself->device_mutex);
self->next_block_to_read = 0;
+ g_mutex_lock(self->thread_idle_mutex);
+ self->dltotal = 0;
+ g_mutex_unlock(self->thread_idle_mutex);
/* read it in */
key = special_file_to_key(self, "filestart", pself->file);
/* if it's an expected error (not found), check what to do. */
if (response_code == 404 &&
- (s3_error_code == S3_ERROR_NoSuchKey ||
+ (s3_error_code == S3_ERROR_None ||
+ s3_error_code == S3_ERROR_NoSuchKey ||
s3_error_code == S3_ERROR_NoSuchEntity)) {
int next_file;
next_file = find_next_file(self, pself->file);
return NULL;
}
- pself->in_file = TRUE;
for (thread = 0; thread < self->nb_threads; thread++) {
self->s3t[thread].idle = 1;
self->s3t[thread].eof = FALSE;
+ self->s3t[thread].ulnow = 0;
}
+ g_mutex_lock(pself->device_mutex);
+ pself->in_file = TRUE;
+ g_mutex_unlock(pself->device_mutex);
return amanda_header;
}
s3t->done = 0;
s3t->idle = 0;
s3t->eof = FALSE;
+ s3t->dlnow = 0;
+ s3t->ulnow = 0;
s3t->errflags = DEVICE_STATUS_SUCCESS;
if (self->s3t[thread].curl_buffer.buffer &&
(int)self->s3t[thread].curl_buffer.buffer_len < *size_req) {
/* return eof */
g_free(key);
pself->is_eof = TRUE;
+ g_mutex_lock(pself->device_mutex);
pself->in_file = FALSE;
+ g_mutex_unlock(pself->device_mutex);
device_set_error(pself, stralloc(_("EOF")),
DEVICE_STATUS_SUCCESS);
g_mutex_unlock(self->thread_idle_mutex);
s3t->done = 0;
s3t->idle = 0;
s3t->eof = FALSE;
+ s3t->dlnow = 0;
+ s3t->ulnow = 0;
s3t->errflags = DEVICE_STATUS_SUCCESS;
if (!self->s3t[thread].curl_buffer.buffer) {
self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
S3Device *self = S3_DEVICE(pself);
gboolean result;
- result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename, s3_buffer_write_func,
- s3_buffer_reset_func, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
+ result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename,
+ S3_BUFFER_WRITE_FUNCS,
+ (CurlBuffer *)&s3t->curl_buffer, progress_func, s3t);
g_mutex_lock(self->thread_idle_mutex);
if (!result) {
guint response_code;
s3_error_code_t s3_error_code;
s3_error(s3t->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
-
/* if it's an expected error (not found), just return -1 */
if (response_code == 404 &&
- (s3_error_code == S3_ERROR_NoSuchKey ||
+ (s3_error_code == S3_ERROR_None ||
+ s3_error_code == S3_ERROR_Unknown ||
+ s3_error_code == S3_ERROR_NoSuchKey ||
s3_error_code == S3_ERROR_NoSuchEntity)) {
s3t->eof = TRUE;
} else {
s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"),
s3_strerror(s3t->s3));
}
+ } else {
+ self->dltotal += s3t->curl_buffer.buffer_len;
}
+ s3t->dlnow = 0;
+ s3t->ulnow = 0;
s3t->done = 1;
g_cond_broadcast(self->thread_idle_cond);
g_mutex_unlock(self->thread_idle_mutex);
int thread;
int nb_done = 0;
- g_mutex_lock(self->thread_idle_mutex);
- while(nb_done != self->nb_threads) {
- nb_done = 0;
- for (thread = 0; thread < self->nb_threads; thread++) {
- if (self->s3t[thread].done == 1)
- nb_done++;
- }
- if (nb_done != self->nb_threads) {
- g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+ if (self->thread_idle_mutex) {
+ g_mutex_lock(self->thread_idle_mutex);
+ while(nb_done != self->nb_threads) {
+ nb_done = 0;
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (self->s3t[thread].done == 1)
+ nb_done++;
+ }
+ if (nb_done != self->nb_threads) {
+ g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+ }
}
+ g_mutex_unlock(self->thread_idle_mutex);
}
- g_mutex_unlock(self->thread_idle_mutex);
}