X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fs3-device.c;fp=device-src%2Fs3-device.c;h=1f0b601b706d8ababb65793aebe0f4cff7cc7e50;hb=cd0b924f27312d57bd42f6c4fae2b795139e2d0b;hp=d060974fff944a9d2153f69605bd34e77bc12780;hpb=011a59f5a54864108a16af570a6b287410597cc2;p=debian%2Famanda diff --git a/device-src/s3-device.c b/device-src/s3-device.c index d060974..1f0b601 100644 --- a/device-src/s3-device.c +++ b/device-src/s3-device.c @@ -66,12 +66,25 @@ static GType s3_device_get_type (void); */ typedef struct _S3MetadataFile S3MetadataFile; +typedef struct _S3_by_thread S3_by_thread; +struct _S3_by_thread { + S3Handle * volatile s3; + CurlBuffer volatile curl_buffer; + guint volatile buffer_len; + int volatile idle; + int volatile eof; + int volatile done; + char volatile * volatile filename; + DeviceStatusFlags volatile errflags; /* device_status */ + char volatile * volatile errmsg; /* device error message */ +}; + typedef struct _S3Device S3Device; struct _S3Device { Device __parent__; /* The "easy" curl handle we use to access Amazon S3 */ - S3Handle *s3; + S3_by_thread *s3t; /* S3 access information */ char *bucket; @@ -84,6 +97,8 @@ struct _S3Device { char *bucket_location; char *storage_class; + char *host; + char *service_path; char *ca_info; @@ -104,6 +119,21 @@ struct _S3Device { /* Throttling */ guint64 max_send_speed; guint64 max_recv_speed; + + gboolean leom; + guint64 volume_bytes; + guint64 volume_limit; + gboolean enforce_volume_limit; + gboolean use_subdomain; + + int nb_threads; + int nb_threads_backup; + int nb_threads_recovery; + GThreadPool *thread_pool_write; + GThreadPool *thread_pool_read; + GCond *thread_idle_cond; + GMutex *thread_idle_mutex; + int next_block_to_read; }; /* @@ -129,6 +159,7 @@ struct _S3DeviceClass { #define S3_DEVICE_MIN_BLOCK_SIZE 1024 #define S3_DEVICE_MAX_BLOCK_SIZE (100*1024*1024) #define S3_DEVICE_DEFAULT_BLOCK_SIZE (10*1024*1024) +#define EOM_EARLY_WARNING_ZONE_BLOCKS 4 /* This goes in lieu of file number for metadata. */ #define SPECIAL_INFIX "special-" @@ -146,6 +177,12 @@ static DevicePropertyBase device_property_s3_secret_key; #define PROPERTY_S3_SECRET_KEY (device_property_s3_secret_key.ID) #define PROPERTY_S3_ACCESS_KEY (device_property_s3_access_key.ID) +/* Host and path */ +static DevicePropertyBase device_property_s3_host; +static DevicePropertyBase device_property_s3_service_path; +#define PROPERTY_S3_HOST (device_property_s3_host.ID) +#define PROPERTY_S3_SERVICE_PATH (device_property_s3_service_path.ID) + /* Same, but for S3 with DevPay. */ static DevicePropertyBase device_property_s3_user_token; #define PROPERTY_S3_USER_TOKEN (device_property_s3_user_token.ID) @@ -172,6 +209,15 @@ static DevicePropertyBase device_property_max_recv_speed; #define PROPERTY_MAX_SEND_SPEED (device_property_max_send_speed.ID) #define PROPERTY_MAX_RECV_SPEED (device_property_max_recv_speed.ID) +/* Whether to use subdomain */ +static DevicePropertyBase device_property_s3_subdomain; +#define PROPERTY_S3_SUBDOMAIN (device_property_s3_subdomain.ID) + +/* Number of threads to use */ +static DevicePropertyBase device_property_nb_threads_backup; +#define PROPERTY_NB_THREADS_BACKUP (device_property_nb_threads_backup.ID) +static DevicePropertyBase device_property_nb_threads_recovery; +#define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID) /* * prototypes @@ -250,9 +296,9 @@ delete_file(S3Device *self, static gboolean delete_all_files(S3Device *self); -/* Set up self->s3 as best as possible. +/* Set up self->s3t as best as possible. * - * The return value is TRUE iff self->s3 is useable. + * The return value is TRUE iff self->s3t is useable. * * @param self: the S3Device object * @returns: TRUE if the handle is set up @@ -318,6 +364,46 @@ static gboolean s3_device_set_max_recv_speed_fn(Device *self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source); +static gboolean s3_device_set_nb_threads_backup(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_nb_threads_recovery(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_max_volume_usage_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean property_set_leom_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_enforce_max_volume_usage_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_use_subdomain_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_host_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_service_path_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static void s3_thread_read_block(gpointer thread_data, + gpointer data); +static void s3_thread_write_block(gpointer thread_data, + gpointer data); + +/* Wait that all threads are done */ +static void reset_thread(S3Device *self); + /* * virtual functions */ @@ -368,6 +454,14 @@ s3_device_recycle_file(Device *pself, static gboolean s3_device_erase(Device *pself); +static gboolean +check_at_leom(S3Device *self, + guint64 size); + +static gboolean +check_at_peom(S3Device *self, + guint64 size); + /* * Private functions */ @@ -420,23 +514,36 @@ write_amanda_header(S3Device *self, return FALSE; } + if(check_at_leom(self, header_size)) + d_self->is_eom = TRUE; + + if(check_at_peom(self, header_size)) { + d_self->is_eom = TRUE; + device_set_error(d_self, + stralloc(_("No space left on device")), + DEVICE_STATUS_DEVICE_ERROR); + g_free(amanda_header.buffer); + return FALSE; + } + /* write out the header and flush the uploads. */ key = special_file_to_key(self, "tapestart", -1); g_assert(header_size < G_MAXUINT); /* for cast to guint */ amanda_header.buffer_len = (guint)header_size; - result = s3_upload(self->s3, self->bucket, key, S3_BUFFER_READ_FUNCS, + result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS, &amanda_header, NULL, NULL); g_free(amanda_header.buffer); g_free(key); if (!result) { device_set_error(d_self, - vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3)), + vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3t[0].s3)), DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); dumpfile_free(dumpinfo); } else { dumpfile_free(d_self->volume_header); d_self->volume_header = dumpinfo; + self->volume_bytes += header_size; } return result; @@ -511,10 +618,10 @@ find_last_file(S3Device *self) { Device *d_self = DEVICE(self); /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */ - result = s3_list_keys(self->s3, self->bucket, self->prefix, "-", &keys); + result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-", &keys, NULL); if (!result) { device_set_error(d_self, - vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3)), + vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)), DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); return -1; } @@ -542,10 +649,11 @@ find_next_file(S3Device *self, int last_file) { Device *d_self = DEVICE(self); /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */ - result = s3_list_keys(self->s3, self->bucket, self->prefix, "-", &keys); + result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-", + &keys, NULL); if (!result) { device_set_error(d_self, - vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3)), + vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)), DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); return -1; } @@ -576,13 +684,15 @@ delete_file(S3Device *self, { gboolean result; GSList *keys; + guint64 total_size = 0; char *my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file); Device *d_self = DEVICE(self); - result = s3_list_keys(self->s3, self->bucket, my_prefix, NULL, &keys); + result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL, &keys, + &total_size); if (!result) { device_set_error(d_self, - vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3)), + vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)), DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); return FALSE; } @@ -590,15 +700,16 @@ delete_file(S3Device *self, /* this will likely be a *lot* of keys */ for (; keys; keys = g_slist_remove(keys, keys->data)) { if (self->verbose) g_debug(_("Deleting %s"), (char*)keys->data); - if (!s3_delete(self->s3, self->bucket, keys->data)) { + if (!s3_delete(self->s3t[0].s3, self->bucket, keys->data)) { device_set_error(d_self, vstrallocf(_("While deleting key '%s': %s"), - (char*)keys->data, s3_strerror(self->s3)), + (char*)keys->data, s3_strerror(self->s3t[0].s3)), DEVICE_STATUS_DEVICE_ERROR); g_slist_free(keys); return FALSE; } } + self->volume_bytes = total_size; return TRUE; } @@ -616,7 +727,7 @@ delete_all_files(S3Device *self) if (last_file < 0) { guint response_code; s3_error_code_t s3_error_code; - s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); + s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); /* * if the bucket doesn't exist, it doesn't conatin any files, @@ -658,6 +769,12 @@ s3_device_register(void) device_property_fill_and_register(&device_property_s3_access_key, G_TYPE_STRING, "s3_access_key", "Access key ID to authenticate with Amazon S3"); + device_property_fill_and_register(&device_property_s3_host, + G_TYPE_STRING, "s3_host", + "hostname:port of the server"); + device_property_fill_and_register(&device_property_s3_service_path, + G_TYPE_STRING, "s3_service_path", + "path to add in the url"); device_property_fill_and_register(&device_property_s3_user_token, G_TYPE_STRING, "s3_user_token", "User token for authentication Amazon devpay requests"); @@ -673,12 +790,21 @@ s3_device_register(void) device_property_fill_and_register(&device_property_s3_ssl, G_TYPE_BOOLEAN, "s3_ssl", "Whether to use SSL with Amazon S3"); + device_property_fill_and_register(&device_property_s3_subdomain, + G_TYPE_BOOLEAN, "s3_subdomain", + "Whether to use subdomain"); device_property_fill_and_register(&device_property_max_send_speed, G_TYPE_UINT64, "max_send_speed", "Maximum average upload speed (bytes/sec)"); device_property_fill_and_register(&device_property_max_recv_speed, G_TYPE_UINT64, "max_recv_speed", "Maximum average download speed (bytes/sec)"); + device_property_fill_and_register(&device_property_nb_threads_backup, + G_TYPE_UINT64, "nb_threads_backup", + "Number of writer thread"); + device_property_fill_and_register(&device_property_nb_threads_recovery, + G_TYPE_UINT64, "nb_threads_recovery", + "Number of reader thread"); /* register the device itself */ register_device(s3_device_factory, device_prefix_list); @@ -716,6 +842,19 @@ s3_device_init(S3Device * self) Device * dself = DEVICE(self); GValue response; + self->volume_bytes = 0; + self->volume_limit = 0; + self->leom = TRUE; + self->enforce_volume_limit = FALSE; + self->use_subdomain = FALSE; + self->nb_threads = 1; + self->nb_threads_backup = 1; + self->nb_threads_recovery = 1; + self->thread_pool_write = NULL; + self->thread_pool_read = NULL; + self->thread_idle_cond = NULL; + self->thread_idle_mutex = NULL; + /* Register property values * Note: Some aren't added until s3_device_open_device() */ @@ -757,6 +896,18 @@ s3_device_init(S3Device * self) &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED); g_value_unset(&response); + g_value_init(&response, G_TYPE_BOOLEAN); + g_value_set_boolean(&response, FALSE); + device_set_simple_property(dself, PROPERTY_ENFORCE_MAX_VOLUME_USAGE, + &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED); + g_value_unset(&response); + + g_value_init(&response, G_TYPE_BOOLEAN); + g_value_set_boolean(&response, FALSE); + device_set_simple_property(dself, PROPERTY_S3_SUBDOMAIN, + &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED); + g_value_unset(&response); + g_value_init(&response, G_TYPE_BOOLEAN); g_value_set_boolean(&response, FALSE); device_set_simple_property(dself, PROPERTY_COMPRESSION, @@ -807,6 +958,16 @@ s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED) device_simple_property_get_fn, s3_device_set_secret_key_fn); + device_class_register_property(device_class, PROPERTY_S3_HOST, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_host_fn); + + device_class_register_property(device_class, PROPERTY_S3_SERVICE_PATH, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_service_path_fn); + device_class_register_property(device_class, PROPERTY_S3_USER_TOKEN, PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, device_simple_property_get_fn, @@ -847,10 +1008,43 @@ s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED) device_simple_property_get_fn, s3_device_set_max_recv_speed_fn); + device_class_register_property(device_class, PROPERTY_NB_THREADS_BACKUP, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_nb_threads_backup); + + device_class_register_property(device_class, PROPERTY_NB_THREADS_RECOVERY, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_nb_threads_recovery); + device_class_register_property(device_class, PROPERTY_COMPRESSION, PROPERTY_ACCESS_GET_MASK, device_simple_property_get_fn, NULL); + + device_class_register_property(device_class, PROPERTY_LEOM, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + property_set_leom_fn); + + device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE, + (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) & + (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE), + device_simple_property_get_fn, + s3_device_set_max_volume_usage_fn); + + device_class_register_property(device_class, PROPERTY_ENFORCE_MAX_VOLUME_USAGE, + (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) & + (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE), + device_simple_property_get_fn, + s3_device_set_enforce_max_volume_usage_fn); + + device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN, + (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) & + (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE), + device_simple_property_get_fn, + s3_device_set_use_subdomain_fn); } static gboolean @@ -879,6 +1073,34 @@ s3_device_set_secret_key_fn(Device *p_self, DevicePropertyBase *base, return device_simple_property_set_fn(p_self, base, val, surety, source); } +static gboolean +s3_device_set_host_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + amfree(self->host); + self->host = g_value_dup_string(val); + device_clear_volume_details(p_self); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_service_path_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + amfree(self->service_path); + self->service_path = g_value_dup_string(val); + device_clear_volume_details(p_self); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + static gboolean s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source) @@ -958,12 +1180,17 @@ s3_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base, GValue *val, PropertySurety surety, PropertySource source) { S3Device *self = S3_DEVICE(p_self); + int thread; self->verbose = g_value_get_boolean(val); /* Our S3 handle may not yet have been instantiated; if so, it will * get the proper verbose setting when it is created */ - if (self->s3) - s3_verbose(self->s3, self->verbose); + if (self->s3t) { + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].s3) + s3_verbose(self->s3t[thread].s3, self->verbose); + } + } return device_simple_property_set_fn(p_self, base, val, surety, source); } @@ -974,16 +1201,21 @@ s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base, { S3Device *self = S3_DEVICE(p_self); gboolean new_val; + int thread; new_val = g_value_get_boolean(val); /* Our S3 handle may not yet have been instantiated; if so, it will * get the proper use_ssl setting when it is created */ - if (self->s3 && !s3_use_ssl(self->s3, new_val)) { - device_set_error(p_self, g_strdup_printf(_( - "Error setting S3 SSL/TLS use " - "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")), - DEVICE_STATUS_DEVICE_ERROR); - return FALSE; + if (self->s3t) { + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].s3 && !s3_use_ssl(self->s3t[thread].s3, new_val)) { + device_set_error(p_self, g_strdup_printf(_( + "Error setting S3 SSL/TLS use " + "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + } } self->use_ssl = new_val; @@ -997,13 +1229,18 @@ s3_device_set_max_send_speed_fn(Device *p_self, { S3Device *self = S3_DEVICE(p_self); guint64 new_val; + int thread; new_val = g_value_get_uint64(val); - if (self->s3 && !s3_set_max_send_speed(self->s3, new_val)) { - device_set_error(p_self, - g_strdup("Could not set S3 maximum send speed"), - DEVICE_STATUS_DEVICE_ERROR); - return FALSE; + if (self->s3t) { + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].s3 && !s3_set_max_send_speed(self->s3t[thread].s3, new_val)) { + device_set_error(p_self, + g_strdup("Could not set S3 maximum send speed"), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + } } self->max_send_speed = new_val; @@ -1017,19 +1254,108 @@ s3_device_set_max_recv_speed_fn(Device *p_self, { S3Device *self = S3_DEVICE(p_self); guint64 new_val; + int thread; new_val = g_value_get_uint64(val); - if (self->s3 && !s3_set_max_recv_speed(self->s3, new_val)) { - device_set_error(p_self, - g_strdup("Could not set S3 maximum recv speed"), - DEVICE_STATUS_DEVICE_ERROR); - return FALSE; + if (self->s3t) { + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].s3 && + !s3_set_max_recv_speed(self->s3t[thread].s3, new_val)) { + device_set_error(p_self, + g_strdup("Could not set S3 maximum recv speed"), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + } } self->max_recv_speed = new_val; return device_simple_property_set_fn(p_self, base, val, surety, source); } +static gboolean +s3_device_set_nb_threads_backup(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + guint64 new_val; + + new_val = g_value_get_uint64(val); + self->nb_threads_backup = new_val; + if (self->nb_threads_backup > self->nb_threads) { + self->nb_threads = self->nb_threads_backup; + } + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_nb_threads_recovery(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + guint64 new_val; + + new_val = g_value_get_uint64(val); + self->nb_threads_recovery = new_val; + if (self->nb_threads_recovery > self->nb_threads) { + self->nb_threads = self->nb_threads_recovery; + } + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_max_volume_usage_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + self->volume_limit = g_value_get_uint64(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); + +} + +static gboolean +s3_device_set_enforce_max_volume_usage_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + self->enforce_volume_limit = g_value_get_boolean(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); + +} + +static gboolean +s3_device_set_use_subdomain_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + self->use_subdomain = g_value_get_boolean(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +property_set_leom_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + self->leom = g_value_get_boolean(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} static Device* s3_device_factory(char * device_name, char * device_type, char * device_node) { @@ -1099,15 +1425,39 @@ s3_device_open_device(Device *pself, char *device_name, static void s3_device_finalize(GObject * obj_self) { S3Device *self = S3_DEVICE (obj_self); + int thread; if(G_OBJECT_CLASS(parent_class)->finalize) (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self); - if(self->s3) s3_free(self->s3); + if (self->thread_pool_write) { + g_thread_pool_free(self->thread_pool_write, 1, 1); + self->thread_pool_write = NULL; + } + if (self->thread_pool_read) { + g_thread_pool_free(self->thread_pool_read, 1, 1); + self->thread_pool_read = NULL; + } + if (self->thread_idle_mutex) { + g_mutex_free(self->thread_idle_mutex); + self->thread_idle_mutex = NULL; + } + if (self->thread_idle_cond) { + g_cond_free(self->thread_idle_cond); + self->thread_idle_cond = NULL; + } + if (self->s3t) { + for (thread = 0; thread < self->nb_threads; thread++) { + if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3); + } + g_free(self->s3t); + } if(self->bucket) g_free(self->bucket); if(self->prefix) g_free(self->prefix); if(self->access_key) g_free(self->access_key); if(self->secret_key) g_free(self->secret_key); + if(self->host) g_free(self->host); + if(self->service_path) g_free(self->service_path); if(self->user_token) g_free(self->user_token); if(self->bucket_location) g_free(self->bucket_location); if(self->storage_class) g_free(self->storage_class); @@ -1116,8 +1466,16 @@ static void s3_device_finalize(GObject * obj_self) { static gboolean setup_handle(S3Device * self) { Device *d_self = DEVICE(self); - if (self->s3 == NULL) { + int thread; + if (self->s3t == NULL) { + self->s3t = g_new(S3_by_thread, self->nb_threads); + if (self->s3t == NULL) { + device_set_error(d_self, + stralloc(_("Can't allocate S3Handle array")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } if (self->access_key == NULL || self->access_key[0] == '\0') { device_set_error(d_self, stralloc(_("No Amazon access key specified")), @@ -1136,40 +1494,63 @@ static gboolean setup_handle(S3Device * self) { amfree(self->ca_info); } - self->s3 = s3_open(self->access_key, self->secret_key, self->user_token, - self->bucket_location, self->storage_class, self->ca_info); - if (self->s3 == NULL) { - device_set_error(d_self, - stralloc(_("Internal error creating S3 handle")), - DEVICE_STATUS_DEVICE_ERROR); - return FALSE; + for (thread = 0; thread < self->nb_threads; thread++) { + self->s3t[thread].idle = 1; + self->s3t[thread].done = 1; + self->s3t[thread].eof = FALSE; + self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS; + self->s3t[thread].errmsg = NULL; + self->s3t[thread].filename = NULL; + self->s3t[thread].curl_buffer.buffer = NULL; + self->s3t[thread].curl_buffer.buffer_len = 0; + self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key, + self->host, self->service_path, + self->use_subdomain, + self->user_token, self->bucket_location, + self->storage_class, self->ca_info); + if (self->s3t[thread].s3 == NULL) { + device_set_error(d_self, + stralloc(_("Internal error creating S3 handle")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } } + + g_debug("Create %d threads", self->nb_threads); + self->thread_pool_write = g_thread_pool_new(s3_thread_write_block, self, + self->nb_threads, 0, NULL); + self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self, + self->nb_threads, 0, NULL); + self->thread_idle_cond = g_cond_new(); + self->thread_idle_mutex = g_mutex_new(); } - s3_verbose(self->s3, self->verbose); + for (thread = 0; thread < self->nb_threads; thread++) { + s3_verbose(self->s3t[thread].s3, self->verbose); - if (!s3_use_ssl(self->s3, self->use_ssl)) { - device_set_error(d_self, g_strdup_printf(_( + if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) { + device_set_error(d_self, g_strdup_printf(_( "Error setting S3 SSL/TLS use " "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")), - DEVICE_STATUS_DEVICE_ERROR); - return FALSE; - } + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } - if (self->max_send_speed && - !s3_set_max_send_speed(self->s3, self->max_send_speed)) { - device_set_error(d_self, + if (self->max_send_speed && + !s3_set_max_send_speed(self->s3t[thread].s3, self->max_send_speed)) { + device_set_error(d_self, g_strdup("Could not set S3 maximum send speed"), DEVICE_STATUS_DEVICE_ERROR); - return FALSE; - } + return FALSE; + } - if (self->max_recv_speed && - !s3_set_max_recv_speed(self->s3, self->max_recv_speed)) { - device_set_error(d_self, + if (self->max_recv_speed && + !s3_set_max_recv_speed(self->s3t[thread].s3, self->max_recv_speed)) { + device_set_error(d_self, g_strdup("Could not set S3 maximum recv speed"), DEVICE_STATUS_DEVICE_ERROR); - return FALSE; + return FALSE; + } } return TRUE; @@ -1181,7 +1562,6 @@ s3_device_read_label(Device *pself) { char *key; CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE}; dumpfile_t *amanda_header; - /* note that this may be called from s3_device_start, when * self->access_mode is not ACCESS_NULL */ @@ -1196,16 +1576,19 @@ s3_device_read_label(Device *pself) { /* setup_handle already set our error message */ return pself->status; } + reset_thread(self); key = special_file_to_key(self, "tapestart", -1); - if (!s3_read(self->s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) { + if (!s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) { guint response_code; s3_error_code_t s3_error_code; - s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); + s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); /* if it's an expected error (not found), just return FALSE */ if (response_code == 404 && - (s3_error_code == S3_ERROR_NoSuchKey || s3_error_code == S3_ERROR_NoSuchBucket)) { + (s3_error_code == S3_ERROR_NoSuchKey || + s3_error_code == S3_ERROR_NoSuchEntity || + s3_error_code == S3_ERROR_NoSuchBucket)) { g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)")); device_set_error(pself, stralloc(_("Amanda header not found -- unlabeled volume?")), @@ -1217,7 +1600,7 @@ s3_device_read_label(Device *pself) { /* otherwise, log it and return */ device_set_error(pself, - vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3)), + vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3t[0].s3)), DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); return pself->status; } @@ -1252,6 +1635,9 @@ static gboolean s3_device_start (Device * pself, DeviceAccessMode mode, char * label, char * timestamp) { S3Device * self; + GSList *keys; + guint64 total_size = 0; + gboolean result; self = S3_DEVICE(pself); @@ -1262,21 +1648,23 @@ s3_device_start (Device * pself, DeviceAccessMode mode, return FALSE; } + reset_thread(self); pself->access_mode = mode; pself->in_file = FALSE; /* try creating the bucket, in case it doesn't exist */ - if (mode != ACCESS_READ && !s3_make_bucket(self->s3, self->bucket)) { + if (mode != ACCESS_READ && !s3_make_bucket(self->s3t[0].s3, self->bucket)) { guint response_code; s3_error_code_t s3_error_code; - s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); + s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); /* if it isn't an expected error (bucket already exists), * return FALSE */ if (response_code != 409 || - s3_error_code != S3_ERROR_BucketAlreadyExists) { + (s3_error_code != S3_ERROR_BucketAlreadyExists && + s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) { device_set_error(pself, - vstrallocf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3)), + vstrallocf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)), DEVICE_STATUS_DEVICE_ERROR); return FALSE; } @@ -1310,7 +1698,17 @@ s3_device_start (Device * pself, DeviceAccessMode mode, if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) { /* s3_device_read_label already set our error message */ return FALSE; - } + } else { + result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, NULL, &keys, &total_size); + if(!result) { + device_set_error(pself, + vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR|DEVICE_STATUS_VOLUME_ERROR); + return FALSE; + } else { + self->volume_bytes = total_size; + } + } return seek_to_end(self); break; @@ -1322,7 +1720,13 @@ s3_device_start (Device * pself, DeviceAccessMode mode, } static gboolean -s3_device_finish (Device * pself) { +s3_device_finish ( + Device * pself) +{ + S3Device *self = S3_DEVICE(pself); + + reset_thread(self); + /* we're not in a file anymore */ pself->access_mode = ACCESS_NULL; @@ -1340,10 +1744,12 @@ s3_device_start_file (Device *pself, dumpfile_t *jobInfo) { CurlBuffer amanda_header = {NULL, 0, 0, 0}; gboolean result; size_t header_size; - char *key; + char *key; + int thread; if (device_in_error(self)) return FALSE; + reset_thread(self); pself->is_eom = FALSE; /* Set the blocksize to zero, since there's no header to skip (it's stored @@ -1362,57 +1768,174 @@ s3_device_start_file (Device *pself, dumpfile_t *jobInfo) { } amanda_header.buffer_len = header_size; + if(check_at_leom(self, header_size)) + pself->is_eom = TRUE; + + if(check_at_peom(self, header_size)) { + pself->is_eom = TRUE; + device_set_error(pself, + stralloc(_("No space left on device")), + DEVICE_STATUS_DEVICE_ERROR); + g_free(amanda_header.buffer); + return FALSE; + } /* set the file and block numbers correctly */ pself->file = (pself->file > 0)? pself->file+1 : 1; pself->block = 0; pself->in_file = TRUE; - /* write it out as a special block (not the 0th) */ key = special_file_to_key(self, "filestart", pself->file); - result = s3_upload(self->s3, self->bucket, key, S3_BUFFER_READ_FUNCS, + result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS, &amanda_header, NULL, NULL); g_free(amanda_header.buffer); g_free(key); if (!result) { device_set_error(pself, - vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3)), + vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3t[0].s3)), DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); return FALSE; } + self->volume_bytes += header_size; + for (thread = 0; thread < self->nb_threads; thread++) { + self->s3t[thread].idle = 1; + } + return TRUE; } static gboolean s3_device_write_block (Device * pself, guint size, gpointer data) { - gboolean result; char *filename; S3Device * self = S3_DEVICE(pself); - CurlBuffer to_write = {data, size, 0, 0}; + int idle_thread = 0; + int thread = -1; + int first_idle = -1; g_assert (self != NULL); g_assert (data != NULL); if (device_in_error(self)) return FALSE; - filename = file_and_block_to_key(self, pself->file, pself->block); + if(check_at_leom(self, size)) + pself->is_eom = TRUE; - result = s3_upload(self->s3, self->bucket, filename, S3_BUFFER_READ_FUNCS, - &to_write, NULL, NULL); - g_free(filename); - if (!result) { - device_set_error(pself, - vstrallocf(_("While writing data block to S3: %s"), s3_strerror(self->s3)), - DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); + if(check_at_peom(self, size)) { + pself->is_eom = TRUE; + device_set_error(pself, + stralloc(_("No space left on device")), + DEVICE_STATUS_DEVICE_ERROR); return FALSE; } - pself->block++; + filename = file_and_block_to_key(self, pself->file, pself->block); + + g_mutex_lock(self->thread_idle_mutex); + while (!idle_thread) { + idle_thread = 0; + for (thread = 0; thread < self->nb_threads_backup; thread++) { + if (self->s3t[thread].idle == 1) { + idle_thread++; + if (first_idle == -1) + first_idle = thread; + /* Check if the thread is in error */ + if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) { + device_set_error(pself, (char *)self->s3t[thread].errmsg, + self->s3t[thread].errflags); + self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS; + self->s3t[thread].errmsg = NULL; + g_mutex_unlock(self->thread_idle_mutex); + return FALSE; + } + } + } + if (!idle_thread) { + g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex); + } + } + thread = first_idle; + + self->s3t[thread].idle = 0; + self->s3t[thread].done = 0; + if (self->s3t[thread].curl_buffer.buffer && + self->s3t[thread].curl_buffer.buffer_len < size) { + g_free((char *)self->s3t[thread].curl_buffer.buffer); + self->s3t[thread].curl_buffer.buffer = NULL; + self->s3t[thread].curl_buffer.buffer_len = 0; + self->s3t[thread].buffer_len = 0; + } + if (self->s3t[thread].curl_buffer.buffer == NULL) { + self->s3t[thread].curl_buffer.buffer = g_malloc(size); + self->s3t[thread].curl_buffer.buffer_len = size; + self->s3t[thread].buffer_len = size; + } + memcpy((char *)self->s3t[thread].curl_buffer.buffer, data, size); + self->s3t[thread].curl_buffer.buffer_pos = 0; + self->s3t[thread].curl_buffer.buffer_len = size; + self->s3t[thread].curl_buffer.max_buffer_size = 0; + self->s3t[thread].filename = filename; + g_thread_pool_push(self->thread_pool_write, &self->s3t[thread], NULL); + g_mutex_unlock(self->thread_idle_mutex); + pself->block++; + self->volume_bytes += size; return TRUE; } +static void +s3_thread_write_block( + gpointer thread_data, + gpointer data) +{ + S3_by_thread *s3t = (S3_by_thread *)thread_data; + Device *pself = (Device *)data; + S3Device *self = S3_DEVICE(pself); + gboolean result; + + result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename, + S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL); + g_free((void *)s3t->filename); + s3t->filename = NULL; + if (!result) { + s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR; + s3t->errmsg = g_strdup_printf(_("While writing data block to S3: %s"), s3_strerror(s3t->s3)); + } + g_mutex_lock(self->thread_idle_mutex); + s3t->idle = 1; + s3t->done = 1; + s3t->curl_buffer.buffer_len = s3t->buffer_len; + g_cond_broadcast(self->thread_idle_cond); + g_mutex_unlock(self->thread_idle_mutex); +} + static gboolean s3_device_finish_file (Device * pself) { + S3Device *self = S3_DEVICE(pself); + + /* Check all threads are done */ + int idle_thread = 0; + int thread; + + g_mutex_lock(self->thread_idle_mutex); + while (idle_thread != self->nb_threads) { + idle_thread = 0; + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].idle == 1) { + idle_thread++; + } + /* check thread status */ + if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) { + device_set_error(pself, (char *)self->s3t[thread].errmsg, + self->s3t[thread].errflags); + self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS; + self->s3t[thread].errmsg = NULL; + } + } + if (idle_thread != self->nb_threads) { + g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex); + } + } + g_mutex_unlock(self->thread_idle_mutex); + if (device_in_error(pself)) return FALSE; /* we're not in a file anymore */ @@ -1426,6 +1949,7 @@ s3_device_recycle_file(Device *pself, guint file) { S3Device *self = S3_DEVICE(pself); if (device_in_error(self)) return FALSE; + reset_thread(self); return delete_file(self, file); /* delete_file already set our error message if necessary */ } @@ -1443,9 +1967,10 @@ s3_device_erase(Device *pself) { return FALSE; } + reset_thread(self); key = special_file_to_key(self, "tapestart", -1); - if (!s3_delete(self->s3, self->bucket, key)) { - s3_error(self->s3, &errmsg, NULL, NULL, NULL, NULL, NULL); + if (!s3_delete(self->s3t[0].s3, self->bucket, key)) { + s3_error(self->s3t[0].s3, &errmsg, NULL, NULL, NULL, NULL, NULL); device_set_error(pself, stralloc(errmsg), DEVICE_STATUS_DEVICE_ERROR); @@ -1456,8 +1981,8 @@ s3_device_erase(Device *pself) { if (!delete_all_files(self)) return FALSE; - if (!s3_delete_bucket(self->s3, self->bucket)) { - s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); + if (!s3_delete_bucket(self->s3t[0].s3, self->bucket)) { + s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); /* * ignore the error if the bucket isn't empty (there may be data from elsewhere) @@ -1473,6 +1998,7 @@ s3_device_erase(Device *pself) { return FALSE; } } + self->volume_bytes = 0; return TRUE; } @@ -1486,27 +2012,33 @@ s3_device_seek_file(Device *pself, guint file) { CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE}; dumpfile_t *amanda_header; const char *errmsg = NULL; + int thread; if (device_in_error(self)) return NULL; + reset_thread(self); + pself->file = file; pself->is_eof = FALSE; pself->in_file = FALSE; pself->block = 0; + self->next_block_to_read = 0; /* read it in */ key = special_file_to_key(self, "filestart", pself->file); - result = s3_read(self->s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, + result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL); g_free(key); if (!result) { guint response_code; s3_error_code_t s3_error_code; - s3_error(self->s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL); + s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL); /* if it's an expected error (not found), check what to do. */ - if (response_code == 404 && s3_error_code == S3_ERROR_NoSuchKey) { + if (response_code == 404 && + (s3_error_code == S3_ERROR_NoSuchKey || + s3_error_code == S3_ERROR_NoSuchEntity)) { int next_file; next_file = find_next_file(self, pself->file); if (next_file > 0) { @@ -1515,7 +2047,7 @@ s3_device_seek_file(Device *pself, guint file) { } else if (next_file == 0) { /* No next file. Check if we are one past the end. */ key = special_file_to_key(self, "filestart", pself->file - 1); - result = s3_read(self->s3, self->bucket, key, + result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL); g_free(key); if (result) { @@ -1559,155 +2091,229 @@ s3_device_seek_file(Device *pself, guint file) { } pself->in_file = TRUE; + for (thread = 0; thread < self->nb_threads; thread++) { + self->s3t[thread].idle = 1; + self->s3t[thread].eof = FALSE; + } return amanda_header; } static gboolean s3_device_seek_block(Device *pself, guint64 block) { + S3Device * self = S3_DEVICE(pself); if (device_in_error(pself)) return FALSE; + reset_thread(self); pself->block = block; + self->next_block_to_read = block; return TRUE; } -typedef struct s3_read_block_data { - gpointer data; - int size_req; - int size_written; - - CurlBuffer curl; -} s3_read_block_data; - -/* wrapper around s3_buffer_write_func to write as much data as possible to - * the user's buffer, and switch to a dynamically allocated buffer if that - * isn't large enough */ -static size_t -s3_read_block_write_func(void *ptr, size_t size, size_t nmemb, void *stream) -{ - s3_read_block_data *dat = stream; - guint new_bytes, bytes_needed; - - /* if data is NULL, call through to s3_buffer_write_func */ - if (!dat->data) { - return s3_buffer_write_func(ptr, size, nmemb, (void *)(&dat->curl)); - } - - new_bytes = (guint) size * nmemb; - bytes_needed = dat->size_written + new_bytes; - - if (bytes_needed > (guint)dat->size_req) { - /* this read will overflow the user's buffer, so malloc ourselves - * a new buffer and keep reading */ - dat->curl.buffer = g_malloc(bytes_needed); - dat->curl.buffer_len = bytes_needed; - dat->curl.buffer_pos = dat->size_written; - memcpy(dat->curl.buffer, dat->data, dat->size_written); - dat->data = NULL; /* signal that the user's buffer is too small */ - return s3_buffer_write_func(ptr, size, nmemb, (void *)(&dat->curl)); - } - - /* copy it into the dat->data buffer, and increment the size */ - memcpy(dat->data + dat->size_written, ptr, new_bytes); - dat->size_written += new_bytes; - - return new_bytes; -} - static int s3_device_read_block (Device * pself, gpointer data, int *size_req) { S3Device * self = S3_DEVICE(pself); char *key; - s3_read_block_data dat = {NULL, 0, 0, { NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE} }; - gboolean result; + int thread; + int done = 0; g_assert (self != NULL); if (device_in_error(self)) return -1; + g_mutex_lock(self->thread_idle_mutex); + /* start a read ahead for each thread */ + for (thread = 0; thread < self->nb_threads_recovery; thread++) { + S3_by_thread *s3t = &self->s3t[thread]; + if (s3t->idle) { + key = file_and_block_to_key(self, pself->file, self->next_block_to_read); + s3t->filename = key; + s3t->done = 0; + s3t->idle = 0; + s3t->eof = FALSE; + s3t->errflags = DEVICE_STATUS_SUCCESS; + if (self->s3t[thread].curl_buffer.buffer && + (int)self->s3t[thread].curl_buffer.buffer_len < *size_req) { + g_free(self->s3t[thread].curl_buffer.buffer); + self->s3t[thread].curl_buffer.buffer = NULL; + self->s3t[thread].curl_buffer.buffer_len = 0; + self->s3t[thread].buffer_len = 0; + } + if (!self->s3t[thread].curl_buffer.buffer) { + self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req); + self->s3t[thread].curl_buffer.buffer_len = *size_req; + self->s3t[thread].buffer_len = *size_req; + } + s3t->curl_buffer.buffer_pos = 0; + s3t->curl_buffer.max_buffer_size = S3_DEVICE_MAX_BLOCK_SIZE; + self->next_block_to_read++; + g_thread_pool_push(self->thread_pool_read, s3t, NULL); + } + } + /* get the file*/ key = file_and_block_to_key(self, pself->file, pself->block); g_assert(key != NULL); - if (self->cached_key && (0 == strcmp(key, self->cached_key))) { - if (*size_req >= self->cached_size) { - /* use the cached copy and clear the cache */ - memcpy(data, self->cached_buf, self->cached_size); - *size_req = self->cached_size; - - g_free(key); - g_free(self->cached_key); - self->cached_key = NULL; - g_free(self->cached_buf); - self->cached_buf = NULL; - - pself->block++; - return *size_req; - } else { - *size_req = self->cached_size; - g_free(key); - return 0; + while (!done) { + /* find which thread read the key */ + for (thread = 0; thread < self->nb_threads_recovery; thread++) { + S3_by_thread *s3t; + s3t = &self->s3t[thread]; + if (!s3t->idle && + s3t->done && + strcmp(key, (char *)s3t->filename) == 0) { + if (s3t->eof) { + /* return eof */ + g_free(key); + pself->is_eof = TRUE; + pself->in_file = FALSE; + device_set_error(pself, stralloc(_("EOF")), + DEVICE_STATUS_SUCCESS); + g_mutex_unlock(self->thread_idle_mutex); + return -1; + } else if (s3t->errflags != DEVICE_STATUS_SUCCESS) { + /* return the error */ + device_set_error(pself, (char *)s3t->errmsg, s3t->errflags); + g_free(key); + g_mutex_unlock(self->thread_idle_mutex); + return -1; + + } else if ((guint)*size_req >= s3t->curl_buffer.buffer_pos) { + /* return the buffer */ + g_mutex_unlock(self->thread_idle_mutex); + memcpy(data, s3t->curl_buffer.buffer, + s3t->curl_buffer.buffer_pos); + *size_req = s3t->curl_buffer.buffer_pos; + g_free(key); + s3t->idle = 1; + g_free((char *)s3t->filename); + pself->block++; + done = 1; + g_mutex_lock(self->thread_idle_mutex); + break; + } else { /* buffer not enough large */ + *size_req = s3t->curl_buffer.buffer_len; + g_free(key); + g_mutex_unlock(self->thread_idle_mutex); + return 0; + } + } + } + if (!done) { + g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex); } } - /* clear the cache, as it's useless to us */ - if (self->cached_key) { - g_free(self->cached_key); - self->cached_key = NULL; - - g_free(self->cached_buf); - self->cached_buf = NULL; + /* start a read ahead for the thread */ + for (thread = 0; thread < self->nb_threads_recovery; thread++) { + S3_by_thread *s3t = &self->s3t[thread]; + if (s3t->idle) { + key = file_and_block_to_key(self, pself->file, self->next_block_to_read); + s3t->filename = key; + s3t->done = 0; + s3t->idle = 0; + s3t->eof = FALSE; + s3t->errflags = DEVICE_STATUS_SUCCESS; + if (!self->s3t[thread].curl_buffer.buffer) { + self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req); + self->s3t[thread].curl_buffer.buffer_len = *size_req; + } + s3t->curl_buffer.buffer_pos = 0; + self->next_block_to_read++; + g_thread_pool_push(self->thread_pool_read, s3t, NULL); + } } + g_mutex_unlock(self->thread_idle_mutex); - /* set up dat for the write_func callback */ - if (!data || *size_req <= 0) { - dat.data = NULL; - dat.size_req = 0; - } else { - dat.data = data; - dat.size_req = *size_req; - } + return *size_req; + +} + +static void +s3_thread_read_block( + gpointer thread_data, + gpointer data) +{ + S3_by_thread *s3t = (S3_by_thread *)thread_data; + Device *pself = (Device *)data; + S3Device *self = S3_DEVICE(pself); + gboolean result; + + result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename, s3_buffer_write_func, + s3_buffer_reset_func, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL); - result = s3_read(self->s3, self->bucket, key, s3_read_block_write_func, - s3_buffer_reset_func, &dat, NULL, NULL); + g_mutex_lock(self->thread_idle_mutex); if (!result) { guint response_code; s3_error_code_t s3_error_code; - s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); - - g_free(key); - key = NULL; + s3_error(s3t->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); /* if it's an expected error (not found), just return -1 */ - if (response_code == 404 && s3_error_code == S3_ERROR_NoSuchKey) { - pself->is_eof = TRUE; - pself->in_file = FALSE; - device_set_error(pself, - stralloc(_("EOF")), - DEVICE_STATUS_SUCCESS); - return -1; + if (response_code == 404 && + (s3_error_code == S3_ERROR_NoSuchKey || + s3_error_code == S3_ERROR_NoSuchEntity)) { + s3t->eof = TRUE; + } else { + + /* otherwise, log it and return FALSE */ + s3t->errflags = DEVICE_STATUS_VOLUME_ERROR; + s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"), + s3_strerror(s3t->s3)); } + } + s3t->done = 1; + g_cond_broadcast(self->thread_idle_cond); + g_mutex_unlock(self->thread_idle_mutex); - /* otherwise, log it and return FALSE */ - device_set_error(pself, - vstrallocf(_("While reading data block from S3: %s"), s3_strerror(self->s3)), - DEVICE_STATUS_VOLUME_ERROR); - return -1; + return; +} + +static gboolean +check_at_peom(S3Device *self, guint64 size) +{ + if(self->enforce_volume_limit && (self->volume_limit > 0)) { + guint64 newtotal = self->volume_bytes + size; + if(newtotal > self->volume_limit) { + return TRUE; + } } + return FALSE; +} - if (dat.data == NULL) { - /* data was larger than the available space, so cache it and return - * the actual size */ - self->cached_buf = dat.curl.buffer; - self->cached_size = dat.curl.buffer_pos; - self->cached_key = key; - key = NULL; +static gboolean +check_at_leom(S3Device *self, guint64 size) +{ + guint64 block_size = DEVICE(self)->block_size; + guint64 eom_warning_buffer = block_size * + (EOM_EARLY_WARNING_ZONE_BLOCKS + self->nb_threads); - *size_req = dat.curl.buffer_pos; - return 0; + if(!self->leom) + return FALSE; + + if(self->enforce_volume_limit && (self->volume_limit > 0)) { + guint64 newtotal = self->volume_bytes + size + eom_warning_buffer; + if(newtotal > self->volume_limit) { + return TRUE; + } } + return FALSE; +} - /* ok, the read went directly to the user's buffer, so we need only - * set and return the size */ - pself->block++; - g_free(key); - *size_req = dat.size_written; - return dat.size_written; +static void +reset_thread( + S3Device *self) +{ + int thread; + int nb_done = 0; + + g_mutex_lock(self->thread_idle_mutex); + while(nb_done != self->nb_threads) { + nb_done = 0; + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].done == 1) + nb_done++; + } + if (nb_done != self->nb_threads) { + g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex); + } + } + g_mutex_unlock(self->thread_idle_mutex); }