*/
typedef struct _S3MetadataFile S3MetadataFile;
+typedef struct _S3_by_thread S3_by_thread;
+struct _S3_by_thread {
+ S3Handle * volatile s3;
+ CurlBuffer volatile curl_buffer;
+ guint volatile buffer_len;
+ int volatile idle;
+ int volatile eof;
+ int volatile done;
+ char volatile * volatile filename;
+ DeviceStatusFlags volatile errflags; /* device_status */
+ char volatile * volatile errmsg; /* device error message */
+};
+
typedef struct _S3Device S3Device;
struct _S3Device {
Device __parent__;
/* The "easy" curl handle we use to access Amazon S3 */
- S3Handle *s3;
+ S3_by_thread *s3t;
/* S3 access information */
char *bucket;
char *bucket_location;
char *storage_class;
+ char *host;
+ char *service_path;
char *ca_info;
/* Throttling */
guint64 max_send_speed;
guint64 max_recv_speed;
+
+ gboolean leom;
+ guint64 volume_bytes;
+ guint64 volume_limit;
+ gboolean enforce_volume_limit;
+ gboolean use_subdomain;
+
+ int nb_threads;
+ int nb_threads_backup;
+ int nb_threads_recovery;
+ GThreadPool *thread_pool_write;
+ GThreadPool *thread_pool_read;
+ GCond *thread_idle_cond;
+ GMutex *thread_idle_mutex;
+ int next_block_to_read;
};
/*
#define S3_DEVICE_MIN_BLOCK_SIZE 1024
#define S3_DEVICE_MAX_BLOCK_SIZE (100*1024*1024)
#define S3_DEVICE_DEFAULT_BLOCK_SIZE (10*1024*1024)
+#define EOM_EARLY_WARNING_ZONE_BLOCKS 4
/* This goes in lieu of file number for metadata. */
#define SPECIAL_INFIX "special-"
#define PROPERTY_S3_SECRET_KEY (device_property_s3_secret_key.ID)
#define PROPERTY_S3_ACCESS_KEY (device_property_s3_access_key.ID)
+/* Host and path */
+static DevicePropertyBase device_property_s3_host;
+static DevicePropertyBase device_property_s3_service_path;
+#define PROPERTY_S3_HOST (device_property_s3_host.ID)
+#define PROPERTY_S3_SERVICE_PATH (device_property_s3_service_path.ID)
+
/* Same, but for S3 with DevPay. */
static DevicePropertyBase device_property_s3_user_token;
#define PROPERTY_S3_USER_TOKEN (device_property_s3_user_token.ID)
#define PROPERTY_MAX_SEND_SPEED (device_property_max_send_speed.ID)
#define PROPERTY_MAX_RECV_SPEED (device_property_max_recv_speed.ID)
+/* Whether to use subdomain */
+static DevicePropertyBase device_property_s3_subdomain;
+#define PROPERTY_S3_SUBDOMAIN (device_property_s3_subdomain.ID)
+
+/* Number of threads to use */
+static DevicePropertyBase device_property_nb_threads_backup;
+#define PROPERTY_NB_THREADS_BACKUP (device_property_nb_threads_backup.ID)
+static DevicePropertyBase device_property_nb_threads_recovery;
+#define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID)
/*
* prototypes
static gboolean
delete_all_files(S3Device *self);
-/* Set up self->s3 as best as possible.
+/* Set up self->s3t as best as possible.
*
- * The return value is TRUE iff self->s3 is useable.
+ * The return value is TRUE iff self->s3t is useable.
*
* @param self: the S3Device object
* @returns: TRUE if the handle is set up
DevicePropertyBase *base, GValue *val,
PropertySurety surety, PropertySource source);
+static gboolean s3_device_set_nb_threads_backup(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_nb_threads_recovery(Device *self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_max_volume_usage_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean property_set_leom_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_use_subdomain_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_host_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_service_path_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source);
+
+static void s3_thread_read_block(gpointer thread_data,
+ gpointer data);
+static void s3_thread_write_block(gpointer thread_data,
+ gpointer data);
+
+/* Wait that all threads are done */
+static void reset_thread(S3Device *self);
+
/*
* virtual functions */
static gboolean
s3_device_erase(Device *pself);
+static gboolean
+check_at_leom(S3Device *self,
+ guint64 size);
+
+static gboolean
+check_at_peom(S3Device *self,
+ guint64 size);
+
/*
* Private functions
*/
return FALSE;
}
+ if(check_at_leom(self, header_size))
+ d_self->is_eom = TRUE;
+
+ if(check_at_peom(self, header_size)) {
+ d_self->is_eom = TRUE;
+ device_set_error(d_self,
+ stralloc(_("No space left on device")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ g_free(amanda_header.buffer);
+ return FALSE;
+ }
+
/* write out the header and flush the uploads. */
key = special_file_to_key(self, "tapestart", -1);
g_assert(header_size < G_MAXUINT); /* for cast to guint */
amanda_header.buffer_len = (guint)header_size;
- result = s3_upload(self->s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
+ result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
&amanda_header, NULL, NULL);
g_free(amanda_header.buffer);
g_free(key);
if (!result) {
device_set_error(d_self,
- vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3)),
+ vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
dumpfile_free(dumpinfo);
} else {
dumpfile_free(d_self->volume_header);
d_self->volume_header = dumpinfo;
+ self->volume_bytes += header_size;
}
return result;
Device *d_self = DEVICE(self);
/* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
- result = s3_list_keys(self->s3, self->bucket, self->prefix, "-", &keys);
+ result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-", &keys, NULL);
if (!result) {
device_set_error(d_self,
- vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3)),
+ vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
return -1;
}
Device *d_self = DEVICE(self);
/* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
- result = s3_list_keys(self->s3, self->bucket, self->prefix, "-", &keys);
+ result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-",
+ &keys, NULL);
if (!result) {
device_set_error(d_self,
- vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3)),
+ vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
return -1;
}
{
gboolean result;
GSList *keys;
+ guint64 total_size = 0;
char *my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
Device *d_self = DEVICE(self);
- result = s3_list_keys(self->s3, self->bucket, my_prefix, NULL, &keys);
+ result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL, &keys,
+ &total_size);
if (!result) {
device_set_error(d_self,
- vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3)),
+ vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
return FALSE;
}
/* this will likely be a *lot* of keys */
for (; keys; keys = g_slist_remove(keys, keys->data)) {
if (self->verbose) g_debug(_("Deleting %s"), (char*)keys->data);
- if (!s3_delete(self->s3, self->bucket, keys->data)) {
+ if (!s3_delete(self->s3t[0].s3, self->bucket, keys->data)) {
device_set_error(d_self,
vstrallocf(_("While deleting key '%s': %s"),
- (char*)keys->data, s3_strerror(self->s3)),
+ (char*)keys->data, s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR);
g_slist_free(keys);
return FALSE;
}
}
+ self->volume_bytes = total_size;
return TRUE;
}
if (last_file < 0) {
guint response_code;
s3_error_code_t s3_error_code;
- s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
+ s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
/*
* if the bucket doesn't exist, it doesn't conatin any files,
device_property_fill_and_register(&device_property_s3_access_key,
G_TYPE_STRING, "s3_access_key",
"Access key ID to authenticate with Amazon S3");
+ device_property_fill_and_register(&device_property_s3_host,
+ G_TYPE_STRING, "s3_host",
+ "hostname:port of the server");
+ device_property_fill_and_register(&device_property_s3_service_path,
+ G_TYPE_STRING, "s3_service_path",
+ "path to add in the url");
device_property_fill_and_register(&device_property_s3_user_token,
G_TYPE_STRING, "s3_user_token",
"User token for authentication Amazon devpay requests");
device_property_fill_and_register(&device_property_s3_ssl,
G_TYPE_BOOLEAN, "s3_ssl",
"Whether to use SSL with Amazon S3");
+ device_property_fill_and_register(&device_property_s3_subdomain,
+ G_TYPE_BOOLEAN, "s3_subdomain",
+ "Whether to use subdomain");
device_property_fill_and_register(&device_property_max_send_speed,
G_TYPE_UINT64, "max_send_speed",
"Maximum average upload speed (bytes/sec)");
device_property_fill_and_register(&device_property_max_recv_speed,
G_TYPE_UINT64, "max_recv_speed",
"Maximum average download speed (bytes/sec)");
+ device_property_fill_and_register(&device_property_nb_threads_backup,
+ G_TYPE_UINT64, "nb_threads_backup",
+ "Number of writer thread");
+ device_property_fill_and_register(&device_property_nb_threads_recovery,
+ G_TYPE_UINT64, "nb_threads_recovery",
+ "Number of reader thread");
/* register the device itself */
register_device(s3_device_factory, device_prefix_list);
Device * dself = DEVICE(self);
GValue response;
+ self->volume_bytes = 0;
+ self->volume_limit = 0;
+ self->leom = TRUE;
+ self->enforce_volume_limit = FALSE;
+ self->use_subdomain = FALSE;
+ self->nb_threads = 1;
+ self->nb_threads_backup = 1;
+ self->nb_threads_recovery = 1;
+ self->thread_pool_write = NULL;
+ self->thread_pool_read = NULL;
+ self->thread_idle_cond = NULL;
+ self->thread_idle_mutex = NULL;
+
/* Register property values
* Note: Some aren't added until s3_device_open_device()
*/
&response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
g_value_unset(&response);
+ g_value_init(&response, G_TYPE_BOOLEAN);
+ g_value_set_boolean(&response, FALSE);
+ device_set_simple_property(dself, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
+ &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
+ g_value_unset(&response);
+
+ g_value_init(&response, G_TYPE_BOOLEAN);
+ g_value_set_boolean(&response, FALSE);
+ device_set_simple_property(dself, PROPERTY_S3_SUBDOMAIN,
+ &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
+ g_value_unset(&response);
+
g_value_init(&response, G_TYPE_BOOLEAN);
g_value_set_boolean(&response, FALSE);
device_set_simple_property(dself, PROPERTY_COMPRESSION,
device_simple_property_get_fn,
s3_device_set_secret_key_fn);
+ device_class_register_property(device_class, PROPERTY_S3_HOST,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_host_fn);
+
+ device_class_register_property(device_class, PROPERTY_S3_SERVICE_PATH,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_service_path_fn);
+
device_class_register_property(device_class, PROPERTY_S3_USER_TOKEN,
PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
device_simple_property_get_fn,
device_simple_property_get_fn,
s3_device_set_max_recv_speed_fn);
+ device_class_register_property(device_class, PROPERTY_NB_THREADS_BACKUP,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_nb_threads_backup);
+
+ device_class_register_property(device_class, PROPERTY_NB_THREADS_RECOVERY,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ s3_device_set_nb_threads_recovery);
+
device_class_register_property(device_class, PROPERTY_COMPRESSION,
PROPERTY_ACCESS_GET_MASK,
device_simple_property_get_fn,
NULL);
+
+ device_class_register_property(device_class, PROPERTY_LEOM,
+ PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+ device_simple_property_get_fn,
+ property_set_leom_fn);
+
+ device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
+ (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
+ (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
+ device_simple_property_get_fn,
+ s3_device_set_max_volume_usage_fn);
+
+ device_class_register_property(device_class, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
+ (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
+ (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
+ device_simple_property_get_fn,
+ s3_device_set_enforce_max_volume_usage_fn);
+
+ device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN,
+ (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
+ (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
+ device_simple_property_get_fn,
+ s3_device_set_use_subdomain_fn);
}
static gboolean
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
+static gboolean
+s3_device_set_host_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->host);
+ self->host = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_service_path_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ amfree(self->service_path);
+ self->service_path = g_value_dup_string(val);
+ device_clear_volume_details(p_self);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
static gboolean
s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base,
GValue *val, PropertySurety surety, PropertySource source)
GValue *val, PropertySurety surety, PropertySource source)
{
S3Device *self = S3_DEVICE(p_self);
+ int thread;
self->verbose = g_value_get_boolean(val);
/* Our S3 handle may not yet have been instantiated; if so, it will
* get the proper verbose setting when it is created */
- if (self->s3)
- s3_verbose(self->s3, self->verbose);
+ if (self->s3t) {
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (self->s3t[thread].s3)
+ s3_verbose(self->s3t[thread].s3, self->verbose);
+ }
+ }
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
{
S3Device *self = S3_DEVICE(p_self);
gboolean new_val;
+ int thread;
new_val = g_value_get_boolean(val);
/* Our S3 handle may not yet have been instantiated; if so, it will
* get the proper use_ssl setting when it is created */
- if (self->s3 && !s3_use_ssl(self->s3, new_val)) {
- device_set_error(p_self, g_strdup_printf(_(
- "Error setting S3 SSL/TLS use "
- "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ if (self->s3t) {
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (self->s3t[thread].s3 && !s3_use_ssl(self->s3t[thread].s3, new_val)) {
+ device_set_error(p_self, g_strdup_printf(_(
+ "Error setting S3 SSL/TLS use "
+ "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ }
}
self->use_ssl = new_val;
{
S3Device *self = S3_DEVICE(p_self);
guint64 new_val;
+ int thread;
new_val = g_value_get_uint64(val);
- if (self->s3 && !s3_set_max_send_speed(self->s3, new_val)) {
- device_set_error(p_self,
- g_strdup("Could not set S3 maximum send speed"),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ if (self->s3t) {
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (self->s3t[thread].s3 && !s3_set_max_send_speed(self->s3t[thread].s3, new_val)) {
+ device_set_error(p_self,
+ g_strdup("Could not set S3 maximum send speed"),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ }
}
self->max_send_speed = new_val;
{
S3Device *self = S3_DEVICE(p_self);
guint64 new_val;
+ int thread;
new_val = g_value_get_uint64(val);
- if (self->s3 && !s3_set_max_recv_speed(self->s3, new_val)) {
- device_set_error(p_self,
- g_strdup("Could not set S3 maximum recv speed"),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ if (self->s3t) {
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (self->s3t[thread].s3 &&
+ !s3_set_max_recv_speed(self->s3t[thread].s3, new_val)) {
+ device_set_error(p_self,
+ g_strdup("Could not set S3 maximum recv speed"),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
+ }
}
self->max_recv_speed = new_val;
return device_simple_property_set_fn(p_self, base, val, surety, source);
}
+static gboolean
+s3_device_set_nb_threads_backup(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+ guint64 new_val;
+
+ new_val = g_value_get_uint64(val);
+ self->nb_threads_backup = new_val;
+ if (self->nb_threads_backup > self->nb_threads) {
+ self->nb_threads = self->nb_threads_backup;
+ }
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_nb_threads_recovery(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+ guint64 new_val;
+
+ new_val = g_value_get_uint64(val);
+ self->nb_threads_recovery = new_val;
+ if (self->nb_threads_recovery > self->nb_threads) {
+ self->nb_threads = self->nb_threads_recovery;
+ }
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_max_volume_usage_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ self->volume_limit = g_value_get_uint64(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+
+}
+
+static gboolean
+s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ self->enforce_volume_limit = g_value_get_boolean(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+
+}
+
+static gboolean
+s3_device_set_use_subdomain_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ self->use_subdomain = g_value_get_boolean(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+property_set_leom_fn(Device *p_self,
+ DevicePropertyBase *base, GValue *val,
+ PropertySurety surety, PropertySource source)
+{
+ S3Device *self = S3_DEVICE(p_self);
+
+ self->leom = g_value_get_boolean(val);
+
+ return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
static Device*
s3_device_factory(char * device_name, char * device_type, char * device_node)
{
static void s3_device_finalize(GObject * obj_self) {
S3Device *self = S3_DEVICE (obj_self);
+ int thread;
if(G_OBJECT_CLASS(parent_class)->finalize)
(* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
- if(self->s3) s3_free(self->s3);
+ if (self->thread_pool_write) {
+ g_thread_pool_free(self->thread_pool_write, 1, 1);
+ self->thread_pool_write = NULL;
+ }
+ if (self->thread_pool_read) {
+ g_thread_pool_free(self->thread_pool_read, 1, 1);
+ self->thread_pool_read = NULL;
+ }
+ if (self->thread_idle_mutex) {
+ g_mutex_free(self->thread_idle_mutex);
+ self->thread_idle_mutex = NULL;
+ }
+ if (self->thread_idle_cond) {
+ g_cond_free(self->thread_idle_cond);
+ self->thread_idle_cond = NULL;
+ }
+ if (self->s3t) {
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3);
+ }
+ g_free(self->s3t);
+ }
if(self->bucket) g_free(self->bucket);
if(self->prefix) g_free(self->prefix);
if(self->access_key) g_free(self->access_key);
if(self->secret_key) g_free(self->secret_key);
+ if(self->host) g_free(self->host);
+ if(self->service_path) g_free(self->service_path);
if(self->user_token) g_free(self->user_token);
if(self->bucket_location) g_free(self->bucket_location);
if(self->storage_class) g_free(self->storage_class);
static gboolean setup_handle(S3Device * self) {
Device *d_self = DEVICE(self);
- if (self->s3 == NULL) {
+ int thread;
+ if (self->s3t == NULL) {
+ self->s3t = g_new(S3_by_thread, self->nb_threads);
+ if (self->s3t == NULL) {
+ device_set_error(d_self,
+ stralloc(_("Can't allocate S3Handle array")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
if (self->access_key == NULL || self->access_key[0] == '\0') {
device_set_error(d_self,
stralloc(_("No Amazon access key specified")),
amfree(self->ca_info);
}
- self->s3 = s3_open(self->access_key, self->secret_key, self->user_token,
- self->bucket_location, self->storage_class, self->ca_info);
- if (self->s3 == NULL) {
- device_set_error(d_self,
- stralloc(_("Internal error creating S3 handle")),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ self->s3t[thread].idle = 1;
+ self->s3t[thread].done = 1;
+ self->s3t[thread].eof = FALSE;
+ self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
+ self->s3t[thread].errmsg = NULL;
+ self->s3t[thread].filename = NULL;
+ self->s3t[thread].curl_buffer.buffer = NULL;
+ self->s3t[thread].curl_buffer.buffer_len = 0;
+ self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key,
+ self->host, self->service_path,
+ self->use_subdomain,
+ self->user_token, self->bucket_location,
+ self->storage_class, self->ca_info);
+ if (self->s3t[thread].s3 == NULL) {
+ device_set_error(d_self,
+ stralloc(_("Internal error creating S3 handle")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
}
+
+ g_debug("Create %d threads", self->nb_threads);
+ self->thread_pool_write = g_thread_pool_new(s3_thread_write_block, self,
+ self->nb_threads, 0, NULL);
+ self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self,
+ self->nb_threads, 0, NULL);
+ self->thread_idle_cond = g_cond_new();
+ self->thread_idle_mutex = g_mutex_new();
}
- s3_verbose(self->s3, self->verbose);
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ s3_verbose(self->s3t[thread].s3, self->verbose);
- if (!s3_use_ssl(self->s3, self->use_ssl)) {
- device_set_error(d_self, g_strdup_printf(_(
+ if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
+ device_set_error(d_self, g_strdup_printf(_(
"Error setting S3 SSL/TLS use "
"(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
- DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
- }
+ DEVICE_STATUS_DEVICE_ERROR);
+ return FALSE;
+ }
- if (self->max_send_speed &&
- !s3_set_max_send_speed(self->s3, self->max_send_speed)) {
- device_set_error(d_self,
+ if (self->max_send_speed &&
+ !s3_set_max_send_speed(self->s3t[thread].s3, self->max_send_speed)) {
+ device_set_error(d_self,
g_strdup("Could not set S3 maximum send speed"),
DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
- }
+ return FALSE;
+ }
- if (self->max_recv_speed &&
- !s3_set_max_recv_speed(self->s3, self->max_recv_speed)) {
- device_set_error(d_self,
+ if (self->max_recv_speed &&
+ !s3_set_max_recv_speed(self->s3t[thread].s3, self->max_recv_speed)) {
+ device_set_error(d_self,
g_strdup("Could not set S3 maximum recv speed"),
DEVICE_STATUS_DEVICE_ERROR);
- return FALSE;
+ return FALSE;
+ }
}
return TRUE;
char *key;
CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
dumpfile_t *amanda_header;
-
/* note that this may be called from s3_device_start, when
* self->access_mode is not ACCESS_NULL */
/* setup_handle already set our error message */
return pself->status;
}
+ reset_thread(self);
key = special_file_to_key(self, "tapestart", -1);
- if (!s3_read(self->s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) {
+ if (!s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) {
guint response_code;
s3_error_code_t s3_error_code;
- s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
+ s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
/* if it's an expected error (not found), just return FALSE */
if (response_code == 404 &&
- (s3_error_code == S3_ERROR_NoSuchKey || s3_error_code == S3_ERROR_NoSuchBucket)) {
+ (s3_error_code == S3_ERROR_NoSuchKey ||
+ s3_error_code == S3_ERROR_NoSuchEntity ||
+ s3_error_code == S3_ERROR_NoSuchBucket)) {
g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)"));
device_set_error(pself,
stralloc(_("Amanda header not found -- unlabeled volume?")),
/* otherwise, log it and return */
device_set_error(pself,
- vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3)),
+ vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
return pself->status;
}
s3_device_start (Device * pself, DeviceAccessMode mode,
char * label, char * timestamp) {
S3Device * self;
+ GSList *keys;
+ guint64 total_size = 0;
+ gboolean result;
self = S3_DEVICE(pself);
return FALSE;
}
+ reset_thread(self);
pself->access_mode = mode;
pself->in_file = FALSE;
/* try creating the bucket, in case it doesn't exist */
- if (mode != ACCESS_READ && !s3_make_bucket(self->s3, self->bucket)) {
+ if (mode != ACCESS_READ && !s3_make_bucket(self->s3t[0].s3, self->bucket)) {
guint response_code;
s3_error_code_t s3_error_code;
- s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
+ s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
/* if it isn't an expected error (bucket already exists),
* return FALSE */
if (response_code != 409 ||
- s3_error_code != S3_ERROR_BucketAlreadyExists) {
+ (s3_error_code != S3_ERROR_BucketAlreadyExists &&
+ s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) {
device_set_error(pself,
- vstrallocf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3)),
+ vstrallocf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
/* s3_device_read_label already set our error message */
return FALSE;
- }
+ } else {
+ result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, NULL, &keys, &total_size);
+ if(!result) {
+ device_set_error(pself,
+ vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
+ DEVICE_STATUS_DEVICE_ERROR|DEVICE_STATUS_VOLUME_ERROR);
+ return FALSE;
+ } else {
+ self->volume_bytes = total_size;
+ }
+ }
return seek_to_end(self);
break;
}
static gboolean
-s3_device_finish (Device * pself) {
+s3_device_finish (
+ Device * pself)
+{
+ S3Device *self = S3_DEVICE(pself);
+
+ reset_thread(self);
+
/* we're not in a file anymore */
pself->access_mode = ACCESS_NULL;
CurlBuffer amanda_header = {NULL, 0, 0, 0};
gboolean result;
size_t header_size;
- char *key;
+ char *key;
+ int thread;
if (device_in_error(self)) return FALSE;
+ reset_thread(self);
pself->is_eom = FALSE;
/* Set the blocksize to zero, since there's no header to skip (it's stored
}
amanda_header.buffer_len = header_size;
+ if(check_at_leom(self, header_size))
+ pself->is_eom = TRUE;
+
+ if(check_at_peom(self, header_size)) {
+ pself->is_eom = TRUE;
+ device_set_error(pself,
+ stralloc(_("No space left on device")),
+ DEVICE_STATUS_DEVICE_ERROR);
+ g_free(amanda_header.buffer);
+ return FALSE;
+ }
/* set the file and block numbers correctly */
pself->file = (pself->file > 0)? pself->file+1 : 1;
pself->block = 0;
pself->in_file = TRUE;
-
/* write it out as a special block (not the 0th) */
key = special_file_to_key(self, "filestart", pself->file);
- result = s3_upload(self->s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
+ result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
&amanda_header, NULL, NULL);
g_free(amanda_header.buffer);
g_free(key);
if (!result) {
device_set_error(pself,
- vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3)),
+ vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3t[0].s3)),
DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
return FALSE;
}
+ self->volume_bytes += header_size;
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ self->s3t[thread].idle = 1;
+ }
+
return TRUE;
}
static gboolean
s3_device_write_block (Device * pself, guint size, gpointer data) {
- gboolean result;
char *filename;
S3Device * self = S3_DEVICE(pself);
- CurlBuffer to_write = {data, size, 0, 0};
+ int idle_thread = 0;
+ int thread = -1;
+ int first_idle = -1;
g_assert (self != NULL);
g_assert (data != NULL);
if (device_in_error(self)) return FALSE;
- filename = file_and_block_to_key(self, pself->file, pself->block);
+ if(check_at_leom(self, size))
+ pself->is_eom = TRUE;
- result = s3_upload(self->s3, self->bucket, filename, S3_BUFFER_READ_FUNCS,
- &to_write, NULL, NULL);
- g_free(filename);
- if (!result) {
- device_set_error(pself,
- vstrallocf(_("While writing data block to S3: %s"), s3_strerror(self->s3)),
- DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
+ if(check_at_peom(self, size)) {
+ pself->is_eom = TRUE;
+ device_set_error(pself,
+ stralloc(_("No space left on device")),
+ DEVICE_STATUS_DEVICE_ERROR);
return FALSE;
}
- pself->block++;
+ filename = file_and_block_to_key(self, pself->file, pself->block);
+
+ g_mutex_lock(self->thread_idle_mutex);
+ while (!idle_thread) {
+ idle_thread = 0;
+ for (thread = 0; thread < self->nb_threads_backup; thread++) {
+ if (self->s3t[thread].idle == 1) {
+ idle_thread++;
+ if (first_idle == -1)
+ first_idle = thread;
+ /* Check if the thread is in error */
+ if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
+ device_set_error(pself, (char *)self->s3t[thread].errmsg,
+ self->s3t[thread].errflags);
+ self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
+ self->s3t[thread].errmsg = NULL;
+ g_mutex_unlock(self->thread_idle_mutex);
+ return FALSE;
+ }
+ }
+ }
+ if (!idle_thread) {
+ g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+ }
+ }
+ thread = first_idle;
+
+ self->s3t[thread].idle = 0;
+ self->s3t[thread].done = 0;
+ if (self->s3t[thread].curl_buffer.buffer &&
+ self->s3t[thread].curl_buffer.buffer_len < size) {
+ g_free((char *)self->s3t[thread].curl_buffer.buffer);
+ self->s3t[thread].curl_buffer.buffer = NULL;
+ self->s3t[thread].curl_buffer.buffer_len = 0;
+ self->s3t[thread].buffer_len = 0;
+ }
+ if (self->s3t[thread].curl_buffer.buffer == NULL) {
+ self->s3t[thread].curl_buffer.buffer = g_malloc(size);
+ self->s3t[thread].curl_buffer.buffer_len = size;
+ self->s3t[thread].buffer_len = size;
+ }
+ memcpy((char *)self->s3t[thread].curl_buffer.buffer, data, size);
+ self->s3t[thread].curl_buffer.buffer_pos = 0;
+ self->s3t[thread].curl_buffer.buffer_len = size;
+ self->s3t[thread].curl_buffer.max_buffer_size = 0;
+ self->s3t[thread].filename = filename;
+ g_thread_pool_push(self->thread_pool_write, &self->s3t[thread], NULL);
+ g_mutex_unlock(self->thread_idle_mutex);
+ pself->block++;
+ self->volume_bytes += size;
return TRUE;
}
+static void
+s3_thread_write_block(
+ gpointer thread_data,
+ gpointer data)
+{
+ S3_by_thread *s3t = (S3_by_thread *)thread_data;
+ Device *pself = (Device *)data;
+ S3Device *self = S3_DEVICE(pself);
+ gboolean result;
+
+ result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename,
+ S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
+ g_free((void *)s3t->filename);
+ s3t->filename = NULL;
+ if (!result) {
+ s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
+ s3t->errmsg = g_strdup_printf(_("While writing data block to S3: %s"), s3_strerror(s3t->s3));
+ }
+ g_mutex_lock(self->thread_idle_mutex);
+ s3t->idle = 1;
+ s3t->done = 1;
+ s3t->curl_buffer.buffer_len = s3t->buffer_len;
+ g_cond_broadcast(self->thread_idle_cond);
+ g_mutex_unlock(self->thread_idle_mutex);
+}
+
static gboolean
s3_device_finish_file (Device * pself) {
+ S3Device *self = S3_DEVICE(pself);
+
+ /* Check all threads are done */
+ int idle_thread = 0;
+ int thread;
+
+ g_mutex_lock(self->thread_idle_mutex);
+ while (idle_thread != self->nb_threads) {
+ idle_thread = 0;
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (self->s3t[thread].idle == 1) {
+ idle_thread++;
+ }
+ /* check thread status */
+ if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
+ device_set_error(pself, (char *)self->s3t[thread].errmsg,
+ self->s3t[thread].errflags);
+ self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
+ self->s3t[thread].errmsg = NULL;
+ }
+ }
+ if (idle_thread != self->nb_threads) {
+ g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+ }
+ }
+ g_mutex_unlock(self->thread_idle_mutex);
+
if (device_in_error(pself)) return FALSE;
/* we're not in a file anymore */
S3Device *self = S3_DEVICE(pself);
if (device_in_error(self)) return FALSE;
+ reset_thread(self);
return delete_file(self, file);
/* delete_file already set our error message if necessary */
}
return FALSE;
}
+ reset_thread(self);
key = special_file_to_key(self, "tapestart", -1);
- if (!s3_delete(self->s3, self->bucket, key)) {
- s3_error(self->s3, &errmsg, NULL, NULL, NULL, NULL, NULL);
+ if (!s3_delete(self->s3t[0].s3, self->bucket, key)) {
+ s3_error(self->s3t[0].s3, &errmsg, NULL, NULL, NULL, NULL, NULL);
device_set_error(pself,
stralloc(errmsg),
DEVICE_STATUS_DEVICE_ERROR);
if (!delete_all_files(self))
return FALSE;
- if (!s3_delete_bucket(self->s3, self->bucket)) {
- s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
+ if (!s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
+ s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
/*
* ignore the error if the bucket isn't empty (there may be data from elsewhere)
return FALSE;
}
}
+ self->volume_bytes = 0;
return TRUE;
}
CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
dumpfile_t *amanda_header;
const char *errmsg = NULL;
+ int thread;
if (device_in_error(self)) return NULL;
+ reset_thread(self);
+
pself->file = file;
pself->is_eof = FALSE;
pself->in_file = FALSE;
pself->block = 0;
+ self->next_block_to_read = 0;
/* read it in */
key = special_file_to_key(self, "filestart", pself->file);
- result = s3_read(self->s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS,
+ result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS,
&buf, NULL, NULL);
g_free(key);
if (!result) {
guint response_code;
s3_error_code_t s3_error_code;
- s3_error(self->s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL);
+ s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL);
/* if it's an expected error (not found), check what to do. */
- if (response_code == 404 && s3_error_code == S3_ERROR_NoSuchKey) {
+ if (response_code == 404 &&
+ (s3_error_code == S3_ERROR_NoSuchKey ||
+ s3_error_code == S3_ERROR_NoSuchEntity)) {
int next_file;
next_file = find_next_file(self, pself->file);
if (next_file > 0) {
} else if (next_file == 0) {
/* No next file. Check if we are one past the end. */
key = special_file_to_key(self, "filestart", pself->file - 1);
- result = s3_read(self->s3, self->bucket, key,
+ result = s3_read(self->s3t[0].s3, self->bucket, key,
S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL);
g_free(key);
if (result) {
}
pself->in_file = TRUE;
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ self->s3t[thread].idle = 1;
+ self->s3t[thread].eof = FALSE;
+ }
return amanda_header;
}
static gboolean
s3_device_seek_block(Device *pself, guint64 block) {
+ S3Device * self = S3_DEVICE(pself);
if (device_in_error(pself)) return FALSE;
+ reset_thread(self);
pself->block = block;
+ self->next_block_to_read = block;
return TRUE;
}
-typedef struct s3_read_block_data {
- gpointer data;
- int size_req;
- int size_written;
-
- CurlBuffer curl;
-} s3_read_block_data;
-
-/* wrapper around s3_buffer_write_func to write as much data as possible to
- * the user's buffer, and switch to a dynamically allocated buffer if that
- * isn't large enough */
-static size_t
-s3_read_block_write_func(void *ptr, size_t size, size_t nmemb, void *stream)
-{
- s3_read_block_data *dat = stream;
- guint new_bytes, bytes_needed;
-
- /* if data is NULL, call through to s3_buffer_write_func */
- if (!dat->data) {
- return s3_buffer_write_func(ptr, size, nmemb, (void *)(&dat->curl));
- }
-
- new_bytes = (guint) size * nmemb;
- bytes_needed = dat->size_written + new_bytes;
-
- if (bytes_needed > (guint)dat->size_req) {
- /* this read will overflow the user's buffer, so malloc ourselves
- * a new buffer and keep reading */
- dat->curl.buffer = g_malloc(bytes_needed);
- dat->curl.buffer_len = bytes_needed;
- dat->curl.buffer_pos = dat->size_written;
- memcpy(dat->curl.buffer, dat->data, dat->size_written);
- dat->data = NULL; /* signal that the user's buffer is too small */
- return s3_buffer_write_func(ptr, size, nmemb, (void *)(&dat->curl));
- }
-
- /* copy it into the dat->data buffer, and increment the size */
- memcpy(dat->data + dat->size_written, ptr, new_bytes);
- dat->size_written += new_bytes;
-
- return new_bytes;
-}
-
static int
s3_device_read_block (Device * pself, gpointer data, int *size_req) {
S3Device * self = S3_DEVICE(pself);
char *key;
- s3_read_block_data dat = {NULL, 0, 0, { NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE} };
- gboolean result;
+ int thread;
+ int done = 0;
g_assert (self != NULL);
if (device_in_error(self)) return -1;
+ g_mutex_lock(self->thread_idle_mutex);
+ /* start a read ahead for each thread */
+ for (thread = 0; thread < self->nb_threads_recovery; thread++) {
+ S3_by_thread *s3t = &self->s3t[thread];
+ if (s3t->idle) {
+ key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
+ s3t->filename = key;
+ s3t->done = 0;
+ s3t->idle = 0;
+ s3t->eof = FALSE;
+ s3t->errflags = DEVICE_STATUS_SUCCESS;
+ if (self->s3t[thread].curl_buffer.buffer &&
+ (int)self->s3t[thread].curl_buffer.buffer_len < *size_req) {
+ g_free(self->s3t[thread].curl_buffer.buffer);
+ self->s3t[thread].curl_buffer.buffer = NULL;
+ self->s3t[thread].curl_buffer.buffer_len = 0;
+ self->s3t[thread].buffer_len = 0;
+ }
+ if (!self->s3t[thread].curl_buffer.buffer) {
+ self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
+ self->s3t[thread].curl_buffer.buffer_len = *size_req;
+ self->s3t[thread].buffer_len = *size_req;
+ }
+ s3t->curl_buffer.buffer_pos = 0;
+ s3t->curl_buffer.max_buffer_size = S3_DEVICE_MAX_BLOCK_SIZE;
+ self->next_block_to_read++;
+ g_thread_pool_push(self->thread_pool_read, s3t, NULL);
+ }
+ }
+
/* get the file*/
key = file_and_block_to_key(self, pself->file, pself->block);
g_assert(key != NULL);
- if (self->cached_key && (0 == strcmp(key, self->cached_key))) {
- if (*size_req >= self->cached_size) {
- /* use the cached copy and clear the cache */
- memcpy(data, self->cached_buf, self->cached_size);
- *size_req = self->cached_size;
-
- g_free(key);
- g_free(self->cached_key);
- self->cached_key = NULL;
- g_free(self->cached_buf);
- self->cached_buf = NULL;
-
- pself->block++;
- return *size_req;
- } else {
- *size_req = self->cached_size;
- g_free(key);
- return 0;
+ while (!done) {
+ /* find which thread read the key */
+ for (thread = 0; thread < self->nb_threads_recovery; thread++) {
+ S3_by_thread *s3t;
+ s3t = &self->s3t[thread];
+ if (!s3t->idle &&
+ s3t->done &&
+ strcmp(key, (char *)s3t->filename) == 0) {
+ if (s3t->eof) {
+ /* return eof */
+ g_free(key);
+ pself->is_eof = TRUE;
+ pself->in_file = FALSE;
+ device_set_error(pself, stralloc(_("EOF")),
+ DEVICE_STATUS_SUCCESS);
+ g_mutex_unlock(self->thread_idle_mutex);
+ return -1;
+ } else if (s3t->errflags != DEVICE_STATUS_SUCCESS) {
+ /* return the error */
+ device_set_error(pself, (char *)s3t->errmsg, s3t->errflags);
+ g_free(key);
+ g_mutex_unlock(self->thread_idle_mutex);
+ return -1;
+
+ } else if ((guint)*size_req >= s3t->curl_buffer.buffer_pos) {
+ /* return the buffer */
+ g_mutex_unlock(self->thread_idle_mutex);
+ memcpy(data, s3t->curl_buffer.buffer,
+ s3t->curl_buffer.buffer_pos);
+ *size_req = s3t->curl_buffer.buffer_pos;
+ g_free(key);
+ s3t->idle = 1;
+ g_free((char *)s3t->filename);
+ pself->block++;
+ done = 1;
+ g_mutex_lock(self->thread_idle_mutex);
+ break;
+ } else { /* buffer not enough large */
+ *size_req = s3t->curl_buffer.buffer_len;
+ g_free(key);
+ g_mutex_unlock(self->thread_idle_mutex);
+ return 0;
+ }
+ }
+ }
+ if (!done) {
+ g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
}
}
- /* clear the cache, as it's useless to us */
- if (self->cached_key) {
- g_free(self->cached_key);
- self->cached_key = NULL;
-
- g_free(self->cached_buf);
- self->cached_buf = NULL;
+ /* start a read ahead for the thread */
+ for (thread = 0; thread < self->nb_threads_recovery; thread++) {
+ S3_by_thread *s3t = &self->s3t[thread];
+ if (s3t->idle) {
+ key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
+ s3t->filename = key;
+ s3t->done = 0;
+ s3t->idle = 0;
+ s3t->eof = FALSE;
+ s3t->errflags = DEVICE_STATUS_SUCCESS;
+ if (!self->s3t[thread].curl_buffer.buffer) {
+ self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
+ self->s3t[thread].curl_buffer.buffer_len = *size_req;
+ }
+ s3t->curl_buffer.buffer_pos = 0;
+ self->next_block_to_read++;
+ g_thread_pool_push(self->thread_pool_read, s3t, NULL);
+ }
}
+ g_mutex_unlock(self->thread_idle_mutex);
- /* set up dat for the write_func callback */
- if (!data || *size_req <= 0) {
- dat.data = NULL;
- dat.size_req = 0;
- } else {
- dat.data = data;
- dat.size_req = *size_req;
- }
+ return *size_req;
+
+}
+
+static void
+s3_thread_read_block(
+ gpointer thread_data,
+ gpointer data)
+{
+ S3_by_thread *s3t = (S3_by_thread *)thread_data;
+ Device *pself = (Device *)data;
+ S3Device *self = S3_DEVICE(pself);
+ gboolean result;
+
+ result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename, s3_buffer_write_func,
+ s3_buffer_reset_func, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
- result = s3_read(self->s3, self->bucket, key, s3_read_block_write_func,
- s3_buffer_reset_func, &dat, NULL, NULL);
+ g_mutex_lock(self->thread_idle_mutex);
if (!result) {
guint response_code;
s3_error_code_t s3_error_code;
- s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
-
- g_free(key);
- key = NULL;
+ s3_error(s3t->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
/* if it's an expected error (not found), just return -1 */
- if (response_code == 404 && s3_error_code == S3_ERROR_NoSuchKey) {
- pself->is_eof = TRUE;
- pself->in_file = FALSE;
- device_set_error(pself,
- stralloc(_("EOF")),
- DEVICE_STATUS_SUCCESS);
- return -1;
+ if (response_code == 404 &&
+ (s3_error_code == S3_ERROR_NoSuchKey ||
+ s3_error_code == S3_ERROR_NoSuchEntity)) {
+ s3t->eof = TRUE;
+ } else {
+
+ /* otherwise, log it and return FALSE */
+ s3t->errflags = DEVICE_STATUS_VOLUME_ERROR;
+ s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"),
+ s3_strerror(s3t->s3));
}
+ }
+ s3t->done = 1;
+ g_cond_broadcast(self->thread_idle_cond);
+ g_mutex_unlock(self->thread_idle_mutex);
- /* otherwise, log it and return FALSE */
- device_set_error(pself,
- vstrallocf(_("While reading data block from S3: %s"), s3_strerror(self->s3)),
- DEVICE_STATUS_VOLUME_ERROR);
- return -1;
+ return;
+}
+
+static gboolean
+check_at_peom(S3Device *self, guint64 size)
+{
+ if(self->enforce_volume_limit && (self->volume_limit > 0)) {
+ guint64 newtotal = self->volume_bytes + size;
+ if(newtotal > self->volume_limit) {
+ return TRUE;
+ }
}
+ return FALSE;
+}
- if (dat.data == NULL) {
- /* data was larger than the available space, so cache it and return
- * the actual size */
- self->cached_buf = dat.curl.buffer;
- self->cached_size = dat.curl.buffer_pos;
- self->cached_key = key;
- key = NULL;
+static gboolean
+check_at_leom(S3Device *self, guint64 size)
+{
+ guint64 block_size = DEVICE(self)->block_size;
+ guint64 eom_warning_buffer = block_size *
+ (EOM_EARLY_WARNING_ZONE_BLOCKS + self->nb_threads);
- *size_req = dat.curl.buffer_pos;
- return 0;
+ if(!self->leom)
+ return FALSE;
+
+ if(self->enforce_volume_limit && (self->volume_limit > 0)) {
+ guint64 newtotal = self->volume_bytes + size + eom_warning_buffer;
+ if(newtotal > self->volume_limit) {
+ return TRUE;
+ }
}
+ return FALSE;
+}
- /* ok, the read went directly to the user's buffer, so we need only
- * set and return the size */
- pself->block++;
- g_free(key);
- *size_req = dat.size_written;
- return dat.size_written;
+static void
+reset_thread(
+ S3Device *self)
+{
+ int thread;
+ int nb_done = 0;
+
+ g_mutex_lock(self->thread_idle_mutex);
+ while(nb_done != self->nb_threads) {
+ nb_done = 0;
+ for (thread = 0; thread < self->nb_threads; thread++) {
+ if (self->s3t[thread].done == 1)
+ nb_done++;
+ }
+ if (nb_done != self->nb_threads) {
+ g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+ }
+ }
+ g_mutex_unlock(self->thread_idle_mutex);
}