X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=device-src%2Fs3-device.c;h=7a987505c4e81ba70d40c8fb4f70d12aa13b97c3;hb=refs%2Ftags%2Fupstream%2F3.3.1;hp=bebb6efb436a5dd002a343502f850d69dcbb6921;hpb=fb2bd066c2f8b34addafe48d62550e3033a59431;p=debian%2Famanda diff --git a/device-src/s3-device.c b/device-src/s3-device.c index bebb6ef..7a98750 100644 --- a/device-src/s3-device.c +++ b/device-src/s3-device.c @@ -1,29 +1,30 @@ /* - * Copyright (c) 2005 Zmanda, Inc. All Rights Reserved. - * - * This library is free software; you can redistribute it and/or modify it - * under the terms of the GNU Lesser General Public License version 2.1 as - * published by the Free Software Foundation. - * - * This library is distributed in the hope that it will be useful, but + * Copyright (c) 2008, 2009, 2010 Zmanda, Inc. All Rights Reserved. + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License version 2 as published + * by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - * License for more details. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this library; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. - * - * Contact information: Zmanda Inc., 505 N Mathlida Ave, Suite 120 + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License + * for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA + * + * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com */ -/* An S3 device uses Amazon's S3 service (http://www.amazon.com/s3) to store +/* An S3 device uses Amazon's S3 service (http://www.amazon.com/s3) to store * data. It stores data in keys named with a user-specified prefix, inside a - * user-specified bucket. Data is stored in the form of numbered (large) - * blocks. + * user-specified bucket. Data is stored in the form of numbered (large) + * blocks. */ +#include "amanda.h" #include #include #include @@ -32,10 +33,9 @@ #include #include #include "util.h" -#include "amanda.h" #include "conffile.h" #include "device.h" -#include "s3-device.h" +#include "s3.h" #include #ifdef HAVE_OPENSSL_HMAC_H # include @@ -49,16 +49,125 @@ # endif #endif +/* + * Type checking and casting macros + */ +#define TYPE_S3_DEVICE (s3_device_get_type()) +#define S3_DEVICE(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device) +#define S3_DEVICE_CONST(obj) G_TYPE_CHECK_INSTANCE_CAST((obj), s3_device_get_type(), S3Device const) +#define S3_DEVICE_CLASS(klass) G_TYPE_CHECK_CLASS_CAST((klass), s3_device_get_type(), S3DeviceClass) +#define IS_S3_DEVICE(obj) G_TYPE_CHECK_INSTANCE_TYPE((obj), s3_device_get_type ()) + +#define S3_DEVICE_GET_CLASS(obj) G_TYPE_INSTANCE_GET_CLASS((obj), s3_device_get_type(), S3DeviceClass) +static GType s3_device_get_type (void); + +/* + * Main object structure + */ +typedef struct _S3MetadataFile S3MetadataFile; + +typedef struct _S3_by_thread S3_by_thread; +struct _S3_by_thread { + S3Handle * volatile s3; + CurlBuffer volatile curl_buffer; + guint volatile buffer_len; + int volatile idle; + int volatile eof; + int volatile done; + char volatile * volatile filename; + DeviceStatusFlags volatile errflags; /* device_status */ + char volatile * volatile errmsg; /* device error message */ +}; + +typedef struct _S3Device S3Device; +struct _S3Device { + Device __parent__; + + /* The "easy" curl handle we use to access Amazon S3 */ + S3_by_thread *s3t; + + /* S3 access information */ + char *bucket; + char *prefix; + + /* The S3 access information. */ + char *secret_key; + char *access_key; + char *user_token; + + /* The Openstack swift information. */ + char *swift_account_id; + char *swift_access_key; + + char *bucket_location; + char *storage_class; + char *host; + char *service_path; + char *server_side_encryption; + + char *ca_info; + + /* a cache for unsuccessful reads (where we get the file but the caller + * doesn't have space for it or doesn't want it), where we expect the + * next call will request the same file. + */ + char *cached_buf; + char *cached_key; + int cached_size; + + /* Produce verbose output? */ + gboolean verbose; + + /* Use SSL? */ + gboolean use_ssl; + gboolean openstack_swift_api; + + /* Throttling */ + guint64 max_send_speed; + guint64 max_recv_speed; + + gboolean leom; + guint64 volume_bytes; + guint64 volume_limit; + gboolean enforce_volume_limit; + gboolean use_subdomain; + + int nb_threads; + int nb_threads_backup; + int nb_threads_recovery; + GThreadPool *thread_pool_delete; + GThreadPool *thread_pool_write; + GThreadPool *thread_pool_read; + GCond *thread_idle_cond; + GMutex *thread_idle_mutex; + int next_block_to_read; + GSList *keys; +}; + +/* + * Class definition + */ +typedef struct _S3DeviceClass S3DeviceClass; +struct _S3DeviceClass { + DeviceClass __parent__; +}; + + /* * Constants and static data */ +#define S3_DEVICE_NAME "s3" + /* Maximum key length as specified in the S3 documentation * (*excluding* null terminator) */ #define S3_MAX_KEY_LENGTH 1024 +/* Note: for compatability, min can only be decreased and max increased */ #define S3_DEVICE_MIN_BLOCK_SIZE 1024 -#define S3_DEVICE_MAX_BLOCK_SIZE (10*1024*1024) +#define S3_DEVICE_MAX_BLOCK_SIZE (3*1024*1024*1024ULL) +#define S3_DEVICE_DEFAULT_BLOCK_SIZE (10*1024*1024) +#define EOM_EARLY_WARNING_ZONE_BLOCKS 4 /* This goes in lieu of file number for metadata. */ #define SPECIAL_INFIX "special-" @@ -66,23 +175,91 @@ /* pointer to the class of our parent */ static DeviceClass *parent_class = NULL; +/* + * device-specific properties + */ + +/* Authentication information for Amazon S3. Both of these are strings. */ +static DevicePropertyBase device_property_s3_access_key; +static DevicePropertyBase device_property_s3_secret_key; +#define PROPERTY_S3_SECRET_KEY (device_property_s3_secret_key.ID) +#define PROPERTY_S3_ACCESS_KEY (device_property_s3_access_key.ID) + +/* Authentication information for Openstack Swift. Both of these are strings. */ +static DevicePropertyBase device_property_swift_account_id; +static DevicePropertyBase device_property_swift_access_key; +#define PROPERTY_SWIFT_ACCOUNT_ID (device_property_swift_account_id.ID) +#define PROPERTY_SWIFT_ACCESS_KEY (device_property_swift_access_key.ID) + +/* Host and path */ +static DevicePropertyBase device_property_s3_host; +static DevicePropertyBase device_property_s3_service_path; +#define PROPERTY_S3_HOST (device_property_s3_host.ID) +#define PROPERTY_S3_SERVICE_PATH (device_property_s3_service_path.ID) + +/* Same, but for S3 with DevPay. */ +static DevicePropertyBase device_property_s3_user_token; +#define PROPERTY_S3_USER_TOKEN (device_property_s3_user_token.ID) + +/* Location constraint for new buckets created on Amazon S3. */ +static DevicePropertyBase device_property_s3_bucket_location; +#define PROPERTY_S3_BUCKET_LOCATION (device_property_s3_bucket_location.ID) + +/* Storage class */ +static DevicePropertyBase device_property_s3_storage_class; +#define PROPERTY_S3_STORAGE_CLASS (device_property_s3_storage_class.ID) + +/* Storage class */ +static DevicePropertyBase device_property_s3_server_side_encryption; +#define PROPERTY_S3_SERVER_SIDE_ENCRYPTION (device_property_s3_server_side_encryption.ID) + +/* Path to certificate authority certificate */ +static DevicePropertyBase device_property_ssl_ca_info; +#define PROPERTY_SSL_CA_INFO (device_property_ssl_ca_info.ID) + +/* Whether to use openstack protocol. */ +static DevicePropertyBase device_property_openstack_swift_api; +#define PROPERTY_OPENSTACK_SWIFT_API (device_property_openstack_swift_api.ID) + +/* Whether to use SSL with Amazon S3. */ +static DevicePropertyBase device_property_s3_ssl; +#define PROPERTY_S3_SSL (device_property_s3_ssl.ID) + +/* Speed limits for sending and receiving */ +static DevicePropertyBase device_property_max_send_speed; +static DevicePropertyBase device_property_max_recv_speed; +#define PROPERTY_MAX_SEND_SPEED (device_property_max_send_speed.ID) +#define PROPERTY_MAX_RECV_SPEED (device_property_max_recv_speed.ID) + +/* Whether to use subdomain */ +static DevicePropertyBase device_property_s3_subdomain; +#define PROPERTY_S3_SUBDOMAIN (device_property_s3_subdomain.ID) + +/* Number of threads to use */ +static DevicePropertyBase device_property_nb_threads_backup; +#define PROPERTY_NB_THREADS_BACKUP (device_property_nb_threads_backup.ID) +static DevicePropertyBase device_property_nb_threads_recovery; +#define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID) + /* * prototypes */ -/* +void s3_device_register(void); + +/* * utility functions */ /* Given file and block numbers, return an S3 key. - * + * * @param self: the S3Device object * @param file: the file number * @param block: the block within that file * @returns: a newly allocated string containing an S3 key. */ -static char * -file_and_block_to_key(S3Device *self, - int file, +static char * +file_and_block_to_key(S3Device *self, + int file, guint64 block); /* Given the name of a special file (such as 'tapestart'), generate @@ -93,9 +270,9 @@ file_and_block_to_key(S3Device *self, * @param file: a file number to include; omitted if -1 * @returns: a newly alocated string containing an S3 key. */ -static char * -special_file_to_key(S3Device *self, - char *special_name, +static char * +special_file_to_key(S3Device *self, + char *special_name, int file); /* Write an amanda header file to S3. * @@ -103,9 +280,9 @@ special_file_to_key(S3Device *self, * @param label: the volume label * @param timestamp: the volume timestamp */ -static gboolean -write_amanda_header(S3Device *self, - char *label, +static gboolean +write_amanda_header(S3Device *self, + char *label, char * timestamp); /* "Fast forward" this device to the end by looking up the largest file number @@ -113,15 +290,15 @@ write_amanda_header(S3Device *self, * * @param self: the S3Device object */ -static gboolean +static gboolean seek_to_end(S3Device *self); -/* Find the number of the last file that contains any data (even just a header). +/* Find the number of the last file that contains any data (even just a header). * * @param self: the S3Device object * @returns: the last file, or -1 in event of an error */ -static int +static int find_last_file(S3Device *self); /* Delete all blocks in the given file, including the filestart block @@ -129,23 +306,32 @@ find_last_file(S3Device *self); * @param self: the S3Device object * @param file: the file to delete */ -static gboolean -delete_file(S3Device *self, +static gboolean +delete_file(S3Device *self, int file); -/* Set up self->s3 as best as possible. Unless SILENT is TRUE, - * any problems will generate warnings (with g_warning). Regardless, - * the return value is TRUE iff self->s3 is useable. + +/* Delete all files in the given device + * + * @param self: the S3Device object + */ +static gboolean +delete_all_files(S3Device *self); + +/* Set up self->s3t as best as possible. + * + * The return value is TRUE iff self->s3t is useable. * * @param self: the S3Device object - * @param silent: silence warnings * @returns: TRUE if the handle is set up */ -static gboolean -setup_handle(S3Device * self, - gboolean ignore_problems); +static gboolean +setup_handle(S3Device * self); + +static void +s3_wait_thread_delete(S3Device *self); -/* +/* * class mechanics */ static void @@ -158,66 +344,174 @@ static void s3_device_finalize(GObject * o); static Device* -s3_device_factory(char * device_type, - char * device_name); +s3_device_factory(char * device_name, char * device_type, char * device_node); + +/* + * Property{Get,Set}Fns */ + +static gboolean s3_device_set_access_key_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_secret_key_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_swift_account_id_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_swift_access_key_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_user_token_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_bucket_location_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_storage_class_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_server_side_encryption_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_ca_info_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_verbose_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_openstack_swift_api_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_ssl_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_max_send_speed_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_max_recv_speed_fn(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_nb_threads_backup(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_nb_threads_recovery(Device *self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_max_volume_usage_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean property_set_leom_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); -/* +static gboolean s3_device_set_enforce_max_volume_usage_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_use_subdomain_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_host_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static gboolean s3_device_set_service_path_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source); + +static void s3_thread_read_block(gpointer thread_data, + gpointer data); +static void s3_thread_write_block(gpointer thread_data, + gpointer data); +static gboolean make_bucket(Device * pself); + + +/* Wait that all threads are done */ +static void reset_thread(S3Device *self); + +/* * virtual functions */ -static gboolean -s3_device_open_device(Device *pself, - char *device_name); +static void +s3_device_open_device(Device *pself, char *device_name, + char * device_type, char * device_node); -static ReadLabelStatusFlags s3_device_read_label(Device * self); +static DeviceStatusFlags s3_device_read_label(Device * self); -static gboolean -s3_device_start(Device * self, - DeviceAccessMode mode, - char * label, +static gboolean +s3_device_start(Device * self, + DeviceAccessMode mode, + char * label, char * timestamp); -static gboolean +static gboolean +s3_device_finish(Device * self); + +static gboolean s3_device_start_file(Device * self, - const dumpfile_t * jobInfo); + dumpfile_t * jobInfo); -static gboolean -s3_device_write_block(Device * self, - guint size, - gpointer data, - gboolean last); +static gboolean +s3_device_write_block(Device * self, + guint size, + gpointer data); -static gboolean +static gboolean s3_device_finish_file(Device * self); -static dumpfile_t* -s3_device_seek_file(Device *pself, +static dumpfile_t* +s3_device_seek_file(Device *pself, guint file); -static gboolean -s3_device_seek_block(Device *pself, +static gboolean +s3_device_seek_block(Device *pself, guint64 block); static int -s3_device_read_block(Device * pself, - gpointer data, +s3_device_read_block(Device * pself, + gpointer data, int *size_req); -static gboolean -s3_device_recycle_file(Device *pself, +static gboolean +s3_device_recycle_file(Device *pself, guint file); -static gboolean s3_device_property_set(Device * p_self, DevicePropertyId id, - GValue * val); -static gboolean s3_device_property_get(Device * p_self, DevicePropertyId id, - GValue * val); +static gboolean +s3_device_erase(Device *pself); + +static gboolean +check_at_leom(S3Device *self, + guint64 size); + +static gboolean +check_at_peom(S3Device *self, + guint64 size); + /* * Private functions */ -/* {{{ file_and_block_to_key */ static char * -file_and_block_to_key(S3Device *self, - int file, +file_and_block_to_key(S3Device *self, + int file, guint64 block) { char *s3_key = g_strdup_printf("%sf%08x-b%016llx.data", @@ -225,12 +519,10 @@ file_and_block_to_key(S3Device *self, g_assert(strlen(s3_key) <= S3_MAX_KEY_LENGTH); return s3_key; } -/* }}} */ -/* {{{ special_file_to_key */ static char * -special_file_to_key(S3Device *self, - char *special_name, +special_file_to_key(S3Device *self, + char *special_name, int file) { if (file == -1) @@ -238,46 +530,68 @@ special_file_to_key(S3Device *self, else return g_strdup_printf("%sf%08x-%s", self->prefix, file, special_name); } -/* }}} */ -/* {{{ write_amanda_header */ static gboolean -write_amanda_header(S3Device *self, - char *label, +write_amanda_header(S3Device *self, + char *label, char * timestamp) { - char * amanda_header = NULL; + CurlBuffer amanda_header = {NULL, 0, 0, 0}; char * key = NULL; - int header_size; - gboolean header_fits, result; + gboolean result; dumpfile_t * dumpinfo = NULL; + Device *d_self = DEVICE(self); + size_t header_size; /* build the header */ + header_size = 0; /* no minimum size */ dumpinfo = make_tapestart_header(DEVICE(self), label, timestamp); - amanda_header = device_build_amanda_header(DEVICE(self), dumpinfo, - &header_size, &header_fits); - if (!header_fits) { - fprintf(stderr, - _("Amanda tapestart header won't fit in a single block!\n")); - g_free(amanda_header); + amanda_header.buffer = device_build_amanda_header(DEVICE(self), dumpinfo, + &header_size); + if (amanda_header.buffer == NULL) { + device_set_error(d_self, + stralloc(_("Amanda tapestart header won't fit in a single block!")), + DEVICE_STATUS_DEVICE_ERROR); + dumpfile_free(dumpinfo); + g_free(amanda_header.buffer); return FALSE; } + if(check_at_leom(self, header_size)) + d_self->is_eom = TRUE; + + if(check_at_peom(self, header_size)) { + d_self->is_eom = TRUE; + device_set_error(d_self, + stralloc(_("No space left on device")), + DEVICE_STATUS_DEVICE_ERROR); + g_free(amanda_header.buffer); + return FALSE; + } + /* write out the header and flush the uploads. */ key = special_file_to_key(self, "tapestart", -1); - result = s3_upload(self->s3, self->bucket, key, amanda_header, header_size); - g_free(amanda_header); + g_assert(header_size < G_MAXUINT); /* for cast to guint */ + amanda_header.buffer_len = (guint)header_size; + result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS, + &amanda_header, NULL, NULL); + g_free(amanda_header.buffer); g_free(key); if (!result) { - fprintf(stderr, _("While writing amanda header: %s\n"), - s3_strerror(self->s3)); + device_set_error(d_self, + vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); + dumpfile_free(dumpinfo); + } else { + dumpfile_free(d_self->volume_header); + d_self->volume_header = dumpinfo; + self->volume_bytes += header_size; } + d_self->header_block_size = header_size; return result; } -/* }}} */ -/* {{{ seek_to_end */ static gboolean seek_to_end(S3Device *self) { int last_file; @@ -292,7 +606,6 @@ seek_to_end(S3Device *self) { return TRUE; } -/* }}} */ /* Convert an object name into a file number, assuming the given prefix * length. Returns -1 if the object name is invalid, or 0 if the object name @@ -300,20 +613,22 @@ seek_to_end(S3Device *self) { static int key_to_file(guint prefix_len, const char * key) { int file; int i; - + /* skip the prefix */ - g_return_val_if_fail(strlen(key) > prefix_len, -1); + if (strlen(key) <= prefix_len) + return -1; key += prefix_len; if (strncmp(key, SPECIAL_INFIX, strlen(SPECIAL_INFIX)) == 0) { return 0; } - + /* check that key starts with 'f' */ - g_return_val_if_fail(key[0] == 'f', -1); + if (key[0] != 'f') + return -1; key++; - + /* check that key is of the form "%08x-" */ for (i = 0; i < 8; i++) { if (!(key[i] >= '0' && key[i] <= '9') && @@ -330,12 +645,11 @@ static int key_to_file(guint prefix_len, const char * key) { g_warning(_("unparseable file number '%s'"), key); return -1; } - + return file; } -/* {{{ find_last_file */ -/* Find the number of the last file that contains any data (even just a header). +/* Find the number of the last file that contains any data (even just a header). * Returns -1 in event of an error */ static int @@ -344,12 +658,14 @@ find_last_file(S3Device *self) { GSList *keys; unsigned int prefix_len = strlen(self->prefix); int last_file = 0; + Device *d_self = DEVICE(self); /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */ - result = s3_list_keys(self->s3, self->bucket, self->prefix, "-", &keys); + result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-", &keys, NULL); if (!result) { - fprintf(stderr, _("While listing S3 keys: %s\n"), - s3_strerror(self->s3)); + device_set_error(d_self, + vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); return -1; } @@ -363,10 +679,8 @@ find_last_file(S3Device *self) { return last_file; } -/* }}} */ -/* {{{ find_next_file */ -/* Find the number of the file following the requested one, if any. +/* Find the number of the file following the requested one, if any. * Returns 0 if there is no such file or -1 in event of an error */ static int @@ -375,12 +689,15 @@ find_next_file(S3Device *self, int last_file) { GSList *keys; unsigned int prefix_len = strlen(self->prefix); int next_file = 0; + Device *d_self = DEVICE(self); /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */ - result = s3_list_keys(self->s3, self->bucket, self->prefix, "-", &keys); + result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-", + &keys, NULL); if (!result) { - fprintf(stderr, _("While listing S3 keys: %s\n"), - s3_strerror(self->s3)); + device_set_error(d_self, + vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); return -1; } @@ -401,61 +718,220 @@ find_next_file(S3Device *self, int last_file) { } } - return last_file; + return next_file; } -/* }}} */ -/* {{{ delete_file */ static gboolean delete_file(S3Device *self, int file) { + int thread = -1; + gboolean result; GSList *keys; - char *my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file); - - result = s3_list_keys(self->s3, self->bucket, my_prefix, NULL, &keys); + guint64 total_size = 0; + Device *d_self = DEVICE(self); + char *my_prefix; + if (file == -1) { + my_prefix = g_strdup_printf("%sf", self->prefix); + } else { + my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file); + } + + result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL, &keys, + &total_size); if (!result) { - fprintf(stderr, _("While listing S3 keys: %s\n"), - s3_strerror(self->s3)); + device_set_error(d_self, + vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); return FALSE; } - /* this will likely be a *lot* of keys */ - for (; keys; keys = g_slist_remove(keys, keys->data)) { - if (self->verbose) g_debug(_("Deleting %s"), (char*)keys->data); - if (!s3_delete(self->s3, self->bucket, keys->data)) { - fprintf(stderr, _("While deleting key '%s': %s\n"), - (char*)keys->data, s3_strerror(self->s3)); - g_slist_free(keys); - return FALSE; - } + g_mutex_lock(self->thread_idle_mutex); + if (!self->keys) { + self->keys = keys; + } else { + self->keys = g_slist_concat(self->keys, keys); } + // start the threads + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].idle == 1) { + /* Check if the thread is in error */ + if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) { + device_set_error(d_self, + (char *)self->s3t[thread].errmsg, + self->s3t[thread].errflags); + self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS; + self->s3t[thread].errmsg = NULL; + g_mutex_unlock(self->thread_idle_mutex); + s3_wait_thread_delete(self); + return FALSE; + } + self->s3t[thread].idle = 0; + self->s3t[thread].done = 0; + g_thread_pool_push(self->thread_pool_delete, &self->s3t[thread], + NULL); + } + } + g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex); + g_mutex_unlock(self->thread_idle_mutex); + + self->volume_bytes = total_size; + + s3_wait_thread_delete(self); + return TRUE; } -/* }}} */ + +static void +s3_thread_delete_block( + gpointer thread_data, + gpointer data) +{ + static int count = 0; + S3_by_thread *s3t = (S3_by_thread *)thread_data; + Device *pself = (Device *)data; + S3Device *self = S3_DEVICE(pself); + gboolean result = 1; + char *filename; + + g_mutex_lock(self->thread_idle_mutex); + while (result && self->keys) { + filename = self->keys->data; + self->keys = g_slist_remove(self->keys, self->keys->data); + count++; + if (count >= 1000) { + g_debug("Deleting %s ...", filename); + count = 0; + } + g_mutex_unlock(self->thread_idle_mutex); + result = s3_delete(s3t->s3, (const char *)self->bucket, (const char *)filename); + if (!result) { + s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR; + s3t->errmsg = g_strdup_printf(_("While deleting key '%s': %s"), + filename, s3_strerror(s3t->s3)); + } + g_free(filename); + g_mutex_lock(self->thread_idle_mutex); + } + s3t->idle = 1; + s3t->done = 1; + g_cond_broadcast(self->thread_idle_cond); + g_mutex_unlock(self->thread_idle_mutex); +} + +static void +s3_wait_thread_delete(S3Device *self) +{ + Device *d_self = (Device *)self; + int idle_thread = 0; + int thread; + + g_mutex_lock(self->thread_idle_mutex); + while (idle_thread != self->nb_threads) { + idle_thread = 0; + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].idle == 1) { + idle_thread++; + } + /* Check if the thread is in error */ + if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) { + device_set_error(d_self, (char *)self->s3t[thread].errmsg, + self->s3t[thread].errflags); + self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS; + self->s3t[thread].errmsg = NULL; + } + } + if (idle_thread != self->nb_threads) { + g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex); + } + } + g_mutex_unlock(self->thread_idle_mutex); +} + + +static gboolean +delete_all_files(S3Device *self) +{ + return delete_file(self, -1); +} /* * Class mechanics */ -/* {{{ s3_device_register */ -void +void s3_device_register(void) { - static const char * device_prefix_list[] = { "s3", NULL }; + static const char * device_prefix_list[] = { S3_DEVICE_NAME, NULL }; g_assert(s3_init()); + + /* set up our properties */ + device_property_fill_and_register(&device_property_s3_secret_key, + G_TYPE_STRING, "s3_secret_key", + "Secret access key to authenticate with Amazon S3"); + device_property_fill_and_register(&device_property_s3_access_key, + G_TYPE_STRING, "s3_access_key", + "Access key ID to authenticate with Amazon S3"); + device_property_fill_and_register(&device_property_swift_account_id, + G_TYPE_STRING, "swift_account_id", + "Account ID to authenticate with openstack swift"); + device_property_fill_and_register(&device_property_swift_access_key, + G_TYPE_STRING, "swift_access_key", + "Access key to authenticate with openstack swift"); + device_property_fill_and_register(&device_property_s3_host, + G_TYPE_STRING, "s3_host", + "hostname:port of the server"); + device_property_fill_and_register(&device_property_s3_service_path, + G_TYPE_STRING, "s3_service_path", + "path to add in the url"); + device_property_fill_and_register(&device_property_s3_user_token, + G_TYPE_STRING, "s3_user_token", + "User token for authentication Amazon devpay requests"); + device_property_fill_and_register(&device_property_s3_bucket_location, + G_TYPE_STRING, "s3_bucket_location", + "Location constraint for buckets on Amazon S3"); + device_property_fill_and_register(&device_property_s3_storage_class, + G_TYPE_STRING, "s3_storage_class", + "Storage class as specified by Amazon (STANDARD or REDUCED_REDUNDANCY)"); + device_property_fill_and_register(&device_property_s3_server_side_encryption, + G_TYPE_STRING, "s3_server_side_encryption", + "Serve side encryption as specified by Amazon (AES256)"); + device_property_fill_and_register(&device_property_ssl_ca_info, + G_TYPE_STRING, "ssl_ca_info", + "Path to certificate authority certificate"); + device_property_fill_and_register(&device_property_openstack_swift_api, + G_TYPE_BOOLEAN, "openstack_swift_api", + "Whether to use openstack protocol"); + device_property_fill_and_register(&device_property_s3_ssl, + G_TYPE_BOOLEAN, "s3_ssl", + "Whether to use SSL with Amazon S3"); + device_property_fill_and_register(&device_property_s3_subdomain, + G_TYPE_BOOLEAN, "s3_subdomain", + "Whether to use subdomain"); + device_property_fill_and_register(&device_property_max_send_speed, + G_TYPE_UINT64, "max_send_speed", + "Maximum average upload speed (bytes/sec)"); + device_property_fill_and_register(&device_property_max_recv_speed, + G_TYPE_UINT64, "max_recv_speed", + "Maximum average download speed (bytes/sec)"); + device_property_fill_and_register(&device_property_nb_threads_backup, + G_TYPE_UINT64, "nb_threads_backup", + "Number of writer thread"); + device_property_fill_and_register(&device_property_nb_threads_recovery, + G_TYPE_UINT64, "nb_threads_recovery", + "Number of reader thread"); + + /* register the device itself */ register_device(s3_device_factory, device_prefix_list); } -/* }}} */ -/* {{{ s3_device_get_type */ -GType +static GType s3_device_get_type(void) { static GType type = 0; - + if G_UNLIKELY(type == 0) { static const GTypeInfo info = { sizeof (S3DeviceClass), @@ -469,94 +945,102 @@ s3_device_get_type(void) (GInstanceInitFunc) s3_device_init, NULL }; - + type = g_type_register_static (TYPE_DEVICE, "S3Device", &info, (GTypeFlags)0); } return type; } -/* }}} */ -/* {{{ s3_device_init */ -static void +static void s3_device_init(S3Device * self) { - Device * o; - DeviceProperty prop; + Device * dself = DEVICE(self); GValue response; - self->initializing = TRUE; - - /* Register property values */ - o = (Device*)(self); + self->volume_bytes = 0; + self->volume_limit = 0; + self->leom = TRUE; + self->enforce_volume_limit = FALSE; + self->use_subdomain = FALSE; + self->nb_threads = 1; + self->nb_threads_backup = 1; + self->nb_threads_recovery = 1; + self->thread_pool_delete = NULL; + self->thread_pool_write = NULL; + self->thread_pool_read = NULL; + self->thread_idle_cond = NULL; + self->thread_idle_mutex = NULL; + + /* Register property values + * Note: Some aren't added until s3_device_open_device() + */ bzero(&response, sizeof(response)); - prop.base = &device_property_concurrency; - prop.access = PROPERTY_ACCESS_GET_MASK; g_value_init(&response, CONCURRENCY_PARADIGM_TYPE); g_value_set_enum(&response, CONCURRENCY_PARADIGM_SHARED_READ); - device_add_property(o, &prop, &response); + device_set_simple_property(dself, PROPERTY_CONCURRENCY, + &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED); g_value_unset(&response); - - prop.base = &device_property_streaming; + g_value_init(&response, STREAMING_REQUIREMENT_TYPE); g_value_set_enum(&response, STREAMING_REQUIREMENT_NONE); - device_add_property(o, &prop, &response); + device_set_simple_property(dself, PROPERTY_STREAMING, + &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED); g_value_unset(&response); - - prop.base = &device_property_block_size; - g_value_init(&response, G_TYPE_INT); - g_value_set_int(&response, -1); /* indicates a variable block size; see below */ - device_add_property(o, &prop, &response); - g_value_unset(&response); - - prop.base = &device_property_min_block_size; - g_value_init(&response, G_TYPE_UINT); - g_value_set_uint(&response, S3_DEVICE_MIN_BLOCK_SIZE); - device_add_property(o, &prop, &response); - - prop.base = &device_property_max_block_size; - g_value_set_uint(&response, S3_DEVICE_MAX_BLOCK_SIZE); - device_add_property(o, &prop, &response); + + g_value_init(&response, G_TYPE_BOOLEAN); + g_value_set_boolean(&response, TRUE); + device_set_simple_property(dself, PROPERTY_APPENDABLE, + &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED); g_value_unset(&response); - prop.base = &device_property_appendable; g_value_init(&response, G_TYPE_BOOLEAN); g_value_set_boolean(&response, TRUE); - device_add_property(o, &prop, &response); + device_set_simple_property(dself, PROPERTY_PARTIAL_DELETION, + &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED); + g_value_unset(&response); - prop.base = &device_property_partial_deletion; + g_value_init(&response, G_TYPE_BOOLEAN); g_value_set_boolean(&response, TRUE); - device_add_property(o, &prop, &response); + device_set_simple_property(dself, PROPERTY_FULL_DELETION, + &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED); + g_value_unset(&response); + + g_value_init(&response, G_TYPE_BOOLEAN); + g_value_set_boolean(&response, TRUE); /* well, there *is* no EOM on S3 .. */ + device_set_simple_property(dself, PROPERTY_LEOM, + &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED); + g_value_unset(&response); + + g_value_init(&response, G_TYPE_BOOLEAN); + g_value_set_boolean(&response, FALSE); + device_set_simple_property(dself, PROPERTY_ENFORCE_MAX_VOLUME_USAGE, + &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED); + g_value_unset(&response); + + g_value_init(&response, G_TYPE_BOOLEAN); + g_value_set_boolean(&response, FALSE); + device_set_simple_property(dself, PROPERTY_S3_SUBDOMAIN, + &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED); g_value_unset(&response); - prop.base = &device_property_canonical_name; - g_value_init(&response, G_TYPE_STRING); - g_value_set_static_string(&response, "s3:"); - device_add_property(o, &prop, &response); + g_value_init(&response, G_TYPE_BOOLEAN); + g_value_set_boolean(&response, FALSE); + device_set_simple_property(dself, PROPERTY_COMPRESSION, + &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED); g_value_unset(&response); - prop.base = &device_property_medium_access_type; g_value_init(&response, MEDIA_ACCESS_MODE_TYPE); g_value_set_enum(&response, MEDIA_ACCESS_MODE_READ_WRITE); - device_add_property(o, &prop, &response); + device_set_simple_property(dself, PROPERTY_MEDIUM_ACCESS_TYPE, + &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED); g_value_unset(&response); - - prop.access = PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START; - prop.base = &device_property_s3_secret_key; - device_add_property(o, &prop, NULL); - prop.base = &device_property_s3_access_key; - device_add_property(o, &prop, NULL); -#ifdef WANT_DEVPAY - prop.base = &device_property_s3_user_token; - device_add_property(o, &prop, NULL); -#endif + } -/* }}} */ -/* {{{ s3_device_class_init */ -static void +static void s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED) { GObjectClass *g_object_class = (GObjectClass*) c; @@ -567,6 +1051,7 @@ s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED) device_class->open_device = s3_device_open_device; device_class->read_label = s3_device_read_label; device_class->start = s3_device_start; + device_class->finish = s3_device_finish; device_class->start_file = s3_device_start_file; device_class->write_block = s3_device_write_block; @@ -577,476 +1062,1268 @@ s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED) device_class->read_block = s3_device_read_block; device_class->recycle_file = s3_device_recycle_file; - device_class->property_set = s3_device_property_set; - device_class->property_get = s3_device_property_get; + device_class->erase = s3_device_erase; g_object_class->finalize = s3_device_finalize; + + device_class_register_property(device_class, PROPERTY_S3_ACCESS_KEY, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_access_key_fn); + + device_class_register_property(device_class, PROPERTY_S3_SECRET_KEY, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_secret_key_fn); + + device_class_register_property(device_class, PROPERTY_SWIFT_ACCOUNT_ID, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_swift_account_id_fn); + + device_class_register_property(device_class, PROPERTY_SWIFT_ACCESS_KEY, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_swift_access_key_fn); + + device_class_register_property(device_class, PROPERTY_S3_HOST, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_host_fn); + + device_class_register_property(device_class, PROPERTY_S3_SERVICE_PATH, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_service_path_fn); + + device_class_register_property(device_class, PROPERTY_S3_USER_TOKEN, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_user_token_fn); + + device_class_register_property(device_class, PROPERTY_S3_BUCKET_LOCATION, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_bucket_location_fn); + + device_class_register_property(device_class, PROPERTY_S3_STORAGE_CLASS, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_storage_class_fn); + + device_class_register_property(device_class, PROPERTY_S3_SERVER_SIDE_ENCRYPTION, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_server_side_encryption_fn); + + device_class_register_property(device_class, PROPERTY_SSL_CA_INFO, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_ca_info_fn); + + device_class_register_property(device_class, PROPERTY_VERBOSE, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_verbose_fn); + + device_class_register_property(device_class, PROPERTY_OPENSTACK_SWIFT_API, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_openstack_swift_api_fn); + + device_class_register_property(device_class, PROPERTY_S3_SSL, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_ssl_fn); + + device_class_register_property(device_class, PROPERTY_MAX_SEND_SPEED, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_max_send_speed_fn); + + device_class_register_property(device_class, PROPERTY_MAX_RECV_SPEED, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_max_recv_speed_fn); + + device_class_register_property(device_class, PROPERTY_NB_THREADS_BACKUP, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_nb_threads_backup); + + device_class_register_property(device_class, PROPERTY_NB_THREADS_RECOVERY, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + s3_device_set_nb_threads_recovery); + + device_class_register_property(device_class, PROPERTY_COMPRESSION, + PROPERTY_ACCESS_GET_MASK, + device_simple_property_get_fn, + NULL); + + device_class_register_property(device_class, PROPERTY_LEOM, + PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START, + device_simple_property_get_fn, + property_set_leom_fn); + + device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE, + (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) & + (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE), + device_simple_property_get_fn, + s3_device_set_max_volume_usage_fn); + + device_class_register_property(device_class, PROPERTY_ENFORCE_MAX_VOLUME_USAGE, + (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) & + (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE), + device_simple_property_get_fn, + s3_device_set_enforce_max_volume_usage_fn); + + device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN, + (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) & + (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE), + device_simple_property_get_fn, + s3_device_set_use_subdomain_fn); } -/* }}} */ -/* {{{ s3_device_factory */ -static Device* -s3_device_factory(char * device_type, - char * device_name) +static gboolean +s3_device_set_access_key_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) { - Device *rval; - S3Device * s3_rval; - g_assert(0 == strcmp(device_type, "s3")); - rval = DEVICE(g_object_new(TYPE_S3_DEVICE, NULL)); - s3_rval = (S3Device*)rval; + S3Device *self = S3_DEVICE(p_self); - if (!device_open_device(rval, device_name)) { - g_object_unref(rval); - return NULL; - } else { - s3_rval->initializing = FALSE; - return rval; - } - -} -/* }}} */ + amfree(self->access_key); + self->access_key = g_value_dup_string(val); + device_clear_volume_details(p_self); -/* - * Virtual function overrides - */ + return device_simple_property_set_fn(p_self, base, val, surety, source); +} -/* {{{ s3_device_open_device */ -static gboolean -s3_device_open_device(Device *pself, - char *device_name) +static gboolean +s3_device_set_secret_key_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) { - S3Device *self = S3_DEVICE(pself); - char * name_colon; - - g_return_val_if_fail(self != NULL, FALSE); + S3Device *self = S3_DEVICE(p_self); - /* Device name may be bucket/prefix, to support multiple volumes in a - * single bucket. */ - name_colon = index(device_name, '/'); - if (name_colon == NULL) { - self->bucket = g_strdup(device_name); - self->prefix = g_strdup(""); - } else { - self->bucket = g_strndup(device_name, name_colon - device_name); - self->prefix = g_strdup(name_colon + 1); - } - - if (self->bucket == NULL || self->bucket[0] == '\0') { - fprintf(stderr, _("Empty bucket name in device %s.\n"), device_name); - amfree(self->bucket); - amfree(self->prefix); - return FALSE; - } + amfree(self->secret_key); + self->secret_key = g_value_dup_string(val); + device_clear_volume_details(p_self); - g_debug(_("S3 driver using bucket '%s', prefix '%s'"), self->bucket, self->prefix); + return device_simple_property_set_fn(p_self, base, val, surety, source); +} - /* default value */ - self->verbose = FALSE; +static gboolean +s3_device_set_swift_account_id_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); - if (parent_class->open_device) { - parent_class->open_device(pself, device_name); - } + amfree(self->swift_account_id); + self->swift_account_id = g_value_dup_string(val); + device_clear_volume_details(p_self); - return TRUE; + return device_simple_property_set_fn(p_self, base, val, surety, source); } -/* }}} */ -/* {{{ s3_device_finalize */ -static void s3_device_finalize(GObject * obj_self) { - S3Device *self = S3_DEVICE (obj_self); +static gboolean +s3_device_set_swift_access_key_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); - if(G_OBJECT_CLASS(parent_class)->finalize) - (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self); + amfree(self->swift_access_key); + self->swift_access_key = g_value_dup_string(val); + device_clear_volume_details(p_self); - if(self->s3) s3_free(self->s3); - if(self->bucket) g_free(self->bucket); - if(self->prefix) g_free(self->prefix); + return device_simple_property_set_fn(p_self, base, val, surety, source); } -/* }}} */ -static gboolean setup_handle(S3Device * self, G_GNUC_UNUSED gboolean silent) { - if (self->s3 == NULL) { - if (self->access_key == NULL) { - if (!silent) fprintf(stderr, _("No S3 access key specified\n")); - return FALSE; - } - if (self->secret_key == NULL) { - if (!silent) fprintf(stderr, _("No S3 secret key specified\n")); - return FALSE; - } -#ifdef WANT_DEVPAY - if (self->user_token == NULL) { - if (!silent) fprintf(stderr, _("No S3 user token specified\n")); - return FALSE; - } -#endif - self->s3 = s3_open(self->access_key, self->secret_key -#ifdef WANT_DEVPAY - , self->user_token -#endif - ); - if (self->s3 == NULL) { - fprintf(stderr, "Internal error creating S3 handle.\n"); - return FALSE; - } - } +static gboolean +s3_device_set_host_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); - s3_verbose(self->s3, self->verbose); + amfree(self->host); + self->host = g_value_dup_string(val); + device_clear_volume_details(p_self); - return TRUE; + return device_simple_property_set_fn(p_self, base, val, surety, source); } -/* {{{ s3_device_read_label */ -static ReadLabelStatusFlags -s3_device_read_label(Device *pself) { - S3Device *self = S3_DEVICE(pself); - char *key; - gpointer buf; - guint buf_size; - dumpfile_t amanda_header; - - if (!setup_handle(self, self->initializing)) - return READ_LABEL_STATUS_DEVICE_ERROR; +static gboolean +s3_device_set_service_path_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); - key = special_file_to_key(self, "tapestart", -1); - if (!s3_read(self->s3, self->bucket, key, &buf, &buf_size, S3_DEVICE_MAX_BLOCK_SIZE)) { - guint response_code; - s3_error_code_t s3_error_code; - s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); + amfree(self->service_path); + self->service_path = g_value_dup_string(val); + device_clear_volume_details(p_self); - /* if it's an expected error (not found), just return FALSE */ - if (response_code == 404 && - (s3_error_code == S3_ERROR_NoSuchKey || s3_error_code == S3_ERROR_NoSuchBucket)) { - g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)")); - return READ_LABEL_STATUS_VOLUME_UNLABELED; - } + return device_simple_property_set_fn(p_self, base, val, surety, source); +} - /* otherwise, log it and return */ - fprintf(stderr, _("While trying to read tapestart header: %s\n"), - s3_strerror(self->s3)); - return READ_LABEL_STATUS_DEVICE_ERROR; - } +static gboolean +s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); - g_assert(buf != NULL); - fh_init(&amanda_header); - parse_file_header(buf, &amanda_header, buf_size); + amfree(self->user_token); + self->user_token = g_value_dup_string(val); + device_clear_volume_details(p_self); - g_free(buf); + return device_simple_property_set_fn(p_self, base, val, surety, source); +} - if (amanda_header.type != F_TAPESTART) { - fprintf(stderr, _("Invalid amanda header\n")); - return READ_LABEL_STATUS_VOLUME_ERROR; +static gboolean +s3_device_set_bucket_location_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + char *str_val = g_value_dup_string(val); + + if (str_val[0] && self->use_ssl && !s3_curl_location_compat()) { + device_set_error(p_self, stralloc(_( + "Location constraint given for Amazon S3 bucket, " + "but libcurl is too old support wildcard certificates.")), + DEVICE_STATUS_DEVICE_ERROR); + goto fail; } - amfree(pself->volume_label); - pself->volume_label = g_strdup(amanda_header.name); - amfree(pself->volume_time); - pself->volume_time = g_strdup(amanda_header.datestamp); + if (str_val[0] && !s3_bucket_location_compat(self->bucket)) { + device_set_error(p_self, g_strdup_printf(_( + "Location constraint given for Amazon S3 bucket, " + "but the bucket name (%s) is not usable as a subdomain."), + self->bucket), + DEVICE_STATUS_DEVICE_ERROR); + goto fail; + } - return READ_LABEL_STATUS_SUCCESS; + amfree(self->bucket_location); + self->bucket_location = str_val; + device_clear_volume_details(p_self); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +fail: + g_free(str_val); + return FALSE; } -/* }}} */ -/* {{{ s3_device_start */ -static gboolean -s3_device_start (Device * pself, DeviceAccessMode mode, - char * label, char * timestamp) { - S3Device * self; - int file, last_file; +static gboolean +s3_device_set_storage_class_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + char *str_val = g_value_dup_string(val); - self = S3_DEVICE(pself); - g_return_val_if_fail (self != NULL, FALSE); + amfree(self->storage_class); + self->storage_class = str_val; + device_clear_volume_details(p_self); - if (!setup_handle(self, FALSE)) - return FALSE; + return device_simple_property_set_fn(p_self, base, val, surety, source); +} - /* try creating the bucket, in case it doesn't exist */ - if (mode != ACCESS_READ && !s3_make_bucket(self->s3, self->bucket)) { - guint response_code; - s3_error_code_t s3_error_code; - s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); +static gboolean +s3_device_set_server_side_encryption_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + char *str_val = g_value_dup_string(val); - /* if it isn't an expected error (bucket already exists), - * return FALSE */ - if (response_code != 409 || - s3_error_code != S3_ERROR_BucketAlreadyExists) { - fprintf(stderr, _("While creating new S3 bucket: %s\n"), - s3_strerror(self->s3)); + amfree(self->server_side_encryption); + self->server_side_encryption = str_val; + device_clear_volume_details(p_self); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_ca_info_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + amfree(self->ca_info); + self->ca_info = g_value_dup_string(val); + device_clear_volume_details(p_self); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + int thread; + + self->verbose = g_value_get_boolean(val); + /* Our S3 handle may not yet have been instantiated; if so, it will + * get the proper verbose setting when it is created */ + if (self->s3t) { + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].s3) + s3_verbose(self->s3t[thread].s3, self->verbose); + } + } + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_openstack_swift_api_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + self->openstack_swift_api = g_value_get_boolean(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base, + GValue *val, PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + gboolean new_val; + int thread; + + new_val = g_value_get_boolean(val); + /* Our S3 handle may not yet have been instantiated; if so, it will + * get the proper use_ssl setting when it is created */ + if (self->s3t) { + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].s3 && !s3_use_ssl(self->s3t[thread].s3, new_val)) { + device_set_error(p_self, g_strdup_printf(_( + "Error setting S3 SSL/TLS use " + "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + } + } + self->use_ssl = new_val; + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_max_send_speed_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + guint64 new_val; + int thread; + + new_val = g_value_get_uint64(val); + if (self->s3t) { + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].s3 && !s3_set_max_send_speed(self->s3t[thread].s3, new_val)) { + device_set_error(p_self, + g_strdup("Could not set S3 maximum send speed"), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + } + } + self->max_send_speed = new_val; + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_max_recv_speed_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + guint64 new_val; + int thread; + + new_val = g_value_get_uint64(val); + if (self->s3t) { + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].s3 && + !s3_set_max_recv_speed(self->s3t[thread].s3, new_val)) { + device_set_error(p_self, + g_strdup("Could not set S3 maximum recv speed"), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + } + } + self->max_recv_speed = new_val; + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_nb_threads_backup(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + guint64 new_val; + + new_val = g_value_get_uint64(val); + self->nb_threads_backup = new_val; + if (self->nb_threads_backup > self->nb_threads) { + self->nb_threads = self->nb_threads_backup; + } + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_nb_threads_recovery(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + guint64 new_val; + + new_val = g_value_get_uint64(val); + self->nb_threads_recovery = new_val; + if (self->nb_threads_recovery > self->nb_threads) { + self->nb_threads = self->nb_threads_recovery; + } + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +s3_device_set_max_volume_usage_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + self->volume_limit = g_value_get_uint64(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); + +} + +static gboolean +s3_device_set_enforce_max_volume_usage_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + self->enforce_volume_limit = g_value_get_boolean(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); + +} + +static gboolean +s3_device_set_use_subdomain_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + self->use_subdomain = g_value_get_boolean(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} + +static gboolean +property_set_leom_fn(Device *p_self, + DevicePropertyBase *base, GValue *val, + PropertySurety surety, PropertySource source) +{ + S3Device *self = S3_DEVICE(p_self); + + self->leom = g_value_get_boolean(val); + + return device_simple_property_set_fn(p_self, base, val, surety, source); +} +static Device* +s3_device_factory(char * device_name, char * device_type, char * device_node) +{ + Device *rval; + g_assert(0 == strcmp(device_type, S3_DEVICE_NAME)); + rval = DEVICE(g_object_new(TYPE_S3_DEVICE, NULL)); + + device_open_device(rval, device_name, device_type, device_node); + return rval; +} + +/* + * Virtual function overrides + */ + +static void +s3_device_open_device(Device *pself, char *device_name, + char * device_type, char * device_node) +{ + S3Device *self = S3_DEVICE(pself); + char * name_colon; + GValue tmp_value; + + pself->min_block_size = S3_DEVICE_MIN_BLOCK_SIZE; + pself->max_block_size = S3_DEVICE_MAX_BLOCK_SIZE; + pself->block_size = S3_DEVICE_DEFAULT_BLOCK_SIZE; + + /* Device name may be bucket/prefix, to support multiple volumes in a + * single bucket. */ + name_colon = strchr(device_node, '/'); + if (name_colon == NULL) { + self->bucket = g_strdup(device_node); + self->prefix = g_strdup(""); + } else { + self->bucket = g_strndup(device_node, name_colon - device_node); + self->prefix = g_strdup(name_colon + 1); + } + + if (self->bucket == NULL || self->bucket[0] == '\0') { + device_set_error(pself, + vstrallocf(_("Empty bucket name in device %s"), device_name), + DEVICE_STATUS_DEVICE_ERROR); + amfree(self->bucket); + amfree(self->prefix); + return; + } + + g_debug(_("S3 driver using bucket '%s', prefix '%s'"), self->bucket, self->prefix); + + /* default values */ + self->verbose = FALSE; + self->openstack_swift_api = FALSE; + + /* use SSL if available */ + self->use_ssl = s3_curl_supports_ssl(); + bzero(&tmp_value, sizeof(GValue)); + g_value_init(&tmp_value, G_TYPE_BOOLEAN); + g_value_set_boolean(&tmp_value, self->use_ssl); + device_set_simple_property(pself, device_property_s3_ssl.ID, + &tmp_value, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DEFAULT); + + if (parent_class->open_device) { + parent_class->open_device(pself, device_name, device_type, device_node); + } +} + +static void s3_device_finalize(GObject * obj_self) { + S3Device *self = S3_DEVICE (obj_self); + int thread; + + if(G_OBJECT_CLASS(parent_class)->finalize) + (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self); + + if (self->thread_pool_delete) { + g_thread_pool_free(self->thread_pool_delete, 1, 1); + self->thread_pool_delete = NULL; + } + if (self->thread_pool_write) { + g_thread_pool_free(self->thread_pool_write, 1, 1); + self->thread_pool_write = NULL; + } + if (self->thread_pool_read) { + g_thread_pool_free(self->thread_pool_read, 1, 1); + self->thread_pool_read = NULL; + } + if (self->thread_idle_mutex) { + g_mutex_free(self->thread_idle_mutex); + self->thread_idle_mutex = NULL; + } + if (self->thread_idle_cond) { + g_cond_free(self->thread_idle_cond); + self->thread_idle_cond = NULL; + } + if (self->s3t) { + for (thread = 0; thread < self->nb_threads; thread++) { + if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3); + } + g_free(self->s3t); + } + if(self->bucket) g_free(self->bucket); + if(self->prefix) g_free(self->prefix); + if(self->access_key) g_free(self->access_key); + if(self->secret_key) g_free(self->secret_key); + if(self->swift_account_id) g_free(self->swift_account_id); + if(self->swift_access_key) g_free(self->swift_access_key); + if(self->host) g_free(self->host); + if(self->service_path) g_free(self->service_path); + if(self->user_token) g_free(self->user_token); + if(self->bucket_location) g_free(self->bucket_location); + if(self->storage_class) g_free(self->storage_class); + if(self->server_side_encryption) g_free(self->server_side_encryption); + if(self->ca_info) g_free(self->ca_info); +} + +static gboolean setup_handle(S3Device * self) { + Device *d_self = DEVICE(self); + int thread; + guint response_code; + s3_error_code_t s3_error_code; + CURLcode curl_code; + + if (self->s3t == NULL) { + self->s3t = g_new(S3_by_thread, self->nb_threads); + if (self->s3t == NULL) { + device_set_error(d_self, + stralloc(_("Can't allocate S3Handle array")), + DEVICE_STATUS_DEVICE_ERROR); return FALSE; + } + + if (!self->openstack_swift_api) { + if (self->access_key == NULL || self->access_key[0] == '\0') { + device_set_error(d_self, + g_strdup(_("No Amazon access key specified")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + + if (self->secret_key == NULL || self->secret_key[0] == '\0') { + device_set_error(d_self, + g_strdup(_("No Amazon secret key specified")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + } else { + if (self->swift_account_id == NULL || + self->swift_account_id[0] == '\0') { + device_set_error(d_self, + g_strdup(_("No Swift account id specified")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + if (self->swift_access_key == NULL || + self->swift_access_key[0] == '\0') { + device_set_error(d_self, + g_strdup(_("No Swift access key specified")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + } + + if (!self->use_ssl && self->ca_info) { + amfree(self->ca_info); + } + + self->thread_idle_cond = g_cond_new(); + self->thread_idle_mutex = g_mutex_new(); + + for (thread = 0; thread < self->nb_threads; thread++) { + self->s3t[thread].idle = 1; + self->s3t[thread].done = 1; + self->s3t[thread].eof = FALSE; + self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS; + self->s3t[thread].errmsg = NULL; + self->s3t[thread].filename = NULL; + self->s3t[thread].curl_buffer.buffer = NULL; + self->s3t[thread].curl_buffer.buffer_len = 0; + self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key, + self->swift_account_id, + self->swift_access_key, + self->host, self->service_path, + self->use_subdomain, + self->user_token, self->bucket_location, + self->storage_class, self->ca_info, + self->server_side_encryption, + self->openstack_swift_api); + if (self->s3t[thread].s3 == NULL) { + device_set_error(d_self, + stralloc(_("Internal error creating S3 handle")), + DEVICE_STATUS_DEVICE_ERROR); + self->nb_threads = thread+1; + return FALSE; + } else if (self->openstack_swift_api) { + s3_error(self->s3t[0].s3, NULL, &response_code, + &s3_error_code, NULL, &curl_code, NULL); + if (response_code != 200) { + device_set_error(d_self, + g_strdup_printf(_("Internal error creating S3 handle: %s"), + s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR); + self->nb_threads = thread+1; + return FALSE; + } + } } + + g_debug("Create %d threads", self->nb_threads); + self->thread_pool_delete = g_thread_pool_new(s3_thread_delete_block, + self, self->nb_threads, 0, + NULL); + self->thread_pool_write = g_thread_pool_new(s3_thread_write_block, self, + self->nb_threads, 0, NULL); + self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self, + self->nb_threads, 0, NULL); } - /* call up to the parent (Device) to set access_mode, volume_label, - * and volume_time, either from the arguments (ACCESS_WRITE) or by - * reading from the 0th file (otherwise) - */ - if (parent_class->start) - if (!parent_class->start((Device*)self, mode, label, timestamp)) + for (thread = 0; thread < self->nb_threads; thread++) { + s3_verbose(self->s3t[thread].s3, self->verbose); + + if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) { + device_set_error(d_self, g_strdup_printf(_( + "Error setting S3 SSL/TLS use " + "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + + if (self->max_send_speed && + !s3_set_max_send_speed(self->s3t[thread].s3, self->max_send_speed)) { + device_set_error(d_self, + g_strdup("Could not set S3 maximum send speed"), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + + if (self->max_recv_speed && + !s3_set_max_recv_speed(self->s3t[thread].s3, self->max_recv_speed)) { + device_set_error(d_self, + g_strdup("Could not set S3 maximum recv speed"), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + } + + return TRUE; +} + +static gboolean +make_bucket( + Device * pself) +{ + S3Device *self = S3_DEVICE(pself); + guint response_code; + s3_error_code_t s3_error_code; + CURLcode curl_code; + + if (s3_is_bucket_exists(self->s3t[0].s3, self->bucket)) { + return TRUE; + } + + s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, &curl_code, NULL); + + if (response_code == 0 && s3_error_code == 0 && + (curl_code == CURLE_COULDNT_CONNECT || + curl_code == CURLE_COULDNT_RESOLVE_HOST)) { + device_set_error(pself, + g_strdup_printf(_("While connecting to S3 bucket: %s"), + s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + + if (!s3_make_bucket(self->s3t[0].s3, self->bucket)) { + s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); + + /* if it isn't an expected error (bucket already exists), + * return FALSE */ + if (response_code != 409 || + (s3_error_code != S3_ERROR_BucketAlreadyExists && + s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) { + device_set_error(pself, + g_strdup_printf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR); return FALSE; + } + } + return TRUE; +} + +static DeviceStatusFlags +s3_device_read_label(Device *pself) { + S3Device *self = S3_DEVICE(pself); + char *key; + CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE}; + dumpfile_t *amanda_header; + /* note that this may be called from s3_device_start, when + * self->access_mode is not ACCESS_NULL */ + + amfree(pself->volume_label); + amfree(pself->volume_time); + dumpfile_free(pself->volume_header); + pself->volume_header = NULL; + + if (device_in_error(self)) return pself->status; + + if (!setup_handle(self)) { + /* setup_handle already set our error message */ + return pself->status; + } + reset_thread(self); + + key = special_file_to_key(self, "tapestart", -1); + + if (!make_bucket(pself)) { + return pself->status; + } + + if (!s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) { + guint response_code; + s3_error_code_t s3_error_code; + s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); + + /* if it's an expected error (not found), just return FALSE */ + if (response_code == 404 && + (s3_error_code == S3_ERROR_None || + s3_error_code == S3_ERROR_Unknown || + s3_error_code == S3_ERROR_NoSuchKey || + s3_error_code == S3_ERROR_NoSuchEntity || + s3_error_code == S3_ERROR_NoSuchBucket)) { + g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)")); + device_set_error(pself, + stralloc(_("Amanda header not found -- unlabeled volume?")), + DEVICE_STATUS_DEVICE_ERROR + | DEVICE_STATUS_VOLUME_ERROR + | DEVICE_STATUS_VOLUME_UNLABELED); + return pself->status; + } + + /* otherwise, log it and return */ + device_set_error(pself, + vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); + return pself->status; + } + + /* handle an empty file gracefully */ + if (buf.buffer_len == 0) { + device_set_error(pself, stralloc(_("Empty header file")), DEVICE_STATUS_VOLUME_ERROR); + return pself->status; + } + + pself->header_block_size = buf.buffer_len; + g_assert(buf.buffer != NULL); + amanda_header = g_new(dumpfile_t, 1); + parse_file_header(buf.buffer, amanda_header, buf.buffer_pos); + pself->volume_header = amanda_header; + g_free(buf.buffer); + + if (amanda_header->type != F_TAPESTART) { + device_set_error(pself, stralloc(_("Invalid amanda header")), DEVICE_STATUS_VOLUME_ERROR); + return pself->status; + } + + pself->volume_label = g_strdup(amanda_header->name); + pself->volume_time = g_strdup(amanda_header->datestamp); + /* pself->volume_header is already set */ + + device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS); + + return pself->status; +} + +static gboolean +s3_device_start (Device * pself, DeviceAccessMode mode, + char * label, char * timestamp) { + S3Device * self; + GSList *keys; + guint64 total_size = 0; + gboolean result; + + self = S3_DEVICE(pself); + + if (device_in_error(self)) return FALSE; + + if (!setup_handle(self)) { + /* setup_handle already set our error message */ + return FALSE; + } + + reset_thread(self); + pself->access_mode = mode; + pself->in_file = FALSE; + + /* try creating the bucket, in case it doesn't exist */ + if (!make_bucket(pself)) { + return FALSE; + } /* take care of any dirty work for this mode */ switch (mode) { case ACCESS_READ: + if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) { + /* s3_device_read_label already set our error message */ + return FALSE; + } break; case ACCESS_WRITE: - /* delete all files */ - last_file = find_last_file(self); - if (last_file < 0) return FALSE; - for (file = 0; file <= last_file; file++) { - if (!delete_file(self, file)) return FALSE; - } + delete_all_files(self); /* write a new amanda header */ if (!write_amanda_header(self, label, timestamp)) { return FALSE; } + + pself->volume_label = newstralloc(pself->volume_label, label); + pself->volume_time = newstralloc(pself->volume_time, timestamp); + + /* unset the VOLUME_UNLABELED flag, if it was set */ + device_set_error(pself, NULL, DEVICE_STATUS_SUCCESS); break; case ACCESS_APPEND: + if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) { + /* s3_device_read_label already set our error message */ + return FALSE; + } else { + result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, NULL, &keys, &total_size); + if(!result) { + device_set_error(pself, + vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR|DEVICE_STATUS_VOLUME_ERROR); + return FALSE; + } else { + self->volume_bytes = total_size; + } + } return seek_to_end(self); break; + case ACCESS_NULL: g_assert_not_reached(); } - g_assert(pself->access_mode == mode); - return TRUE; } -/* }}} */ - -static gboolean s3_device_property_get(Device * p_self, DevicePropertyId id, - GValue * val) { - S3Device * self; - const DevicePropertyBase * base; - - self = S3_DEVICE(p_self); - g_return_val_if_fail(self != NULL, FALSE); - - base = device_property_get_by_id(id); - g_return_val_if_fail(self != NULL, FALSE); - - g_value_unset_init(val, base->type); - - if (id == PROPERTY_S3_SECRET_KEY) { - if (self->secret_key != NULL) { - g_value_set_string(val, self->secret_key); - return TRUE; - } else { - return FALSE; - } - } else if (id == PROPERTY_S3_ACCESS_KEY) { - if (self->access_key != NULL) { - g_value_set_string(val, self->access_key); - return TRUE; - } else { - return FALSE; - } - } -#ifdef WANT_DEVPAY - else if (id == PROPERTY_S3_USER_TOKEN) { - if (self->user_token != NULL) { - g_value_set_string(val, self->user_token); - return TRUE; - } else { - return FALSE; - } - } -#endif /* WANT_DEVPAY */ - else if (id == PROPERTY_VERBOSE) { - g_value_set_boolean(val, self->verbose); - return TRUE; - } else { - /* chain up */ - if (parent_class->property_get) { - return (parent_class->property_get)(p_self, id, val); - } else { - return FALSE; - } - } - - g_assert_not_reached(); -} -static gboolean s3_device_property_set(Device * p_self, DevicePropertyId id, - GValue * val) { - S3Device * self; - const DevicePropertyBase * base; - - self = S3_DEVICE(p_self); - g_return_val_if_fail(self != NULL, FALSE); +static gboolean +s3_device_finish ( + Device * pself) +{ + S3Device *self = S3_DEVICE(pself); - base = device_property_get_by_id(id); - g_return_val_if_fail(self != NULL, FALSE); + reset_thread(self); - g_return_val_if_fail(G_VALUE_HOLDS(val, base->type), FALSE); + /* we're not in a file anymore */ + pself->access_mode = ACCESS_NULL; - if (id == PROPERTY_S3_SECRET_KEY) { - if (p_self->access_mode != ACCESS_NULL) - return FALSE; - amfree(self->secret_key); - self->secret_key = g_value_dup_string(val); - device_clear_volume_details(p_self); - return TRUE; - } else if (id == PROPERTY_S3_ACCESS_KEY) { - if (p_self->access_mode != ACCESS_NULL) - return FALSE; - amfree(self->access_key); - self->access_key = g_value_dup_string(val); - device_clear_volume_details(p_self); - return TRUE; - } -#ifdef WANT_DEVPAY - else if (id == PROPERTY_S3_USER_TOKEN) { - if (p_self->access_mode != ACCESS_NULL) - return FALSE; - amfree(self->user_token); - self->user_token = g_value_dup_string(val); - device_clear_volume_details(p_self); - return TRUE; - } -#endif /* WANT_DEVPAY */ - else if (id == PROPERTY_VERBOSE) { - self->verbose = g_value_get_boolean(val); - /* Our S3 handle may not yet have been instantiated; if so, it will - * get the proper verbose setting when it is created */ - if (self->s3) - s3_verbose(self->s3, self->verbose); - return TRUE; - } else { - if (parent_class->property_set) { - return (parent_class->property_set)(p_self, id, val); - } else { - return FALSE; - } - } + if (device_in_error(pself)) return FALSE; - g_assert_not_reached(); + return TRUE; } /* functions for writing */ -/* {{{ s3_device_start_file */ static gboolean -s3_device_start_file (Device *pself, const dumpfile_t *jobInfo) { +s3_device_start_file (Device *pself, dumpfile_t *jobInfo) { S3Device *self = S3_DEVICE(pself); - char *amanda_header; - int header_size; - gboolean header_fits, result; - char *key; + CurlBuffer amanda_header = {NULL, 0, 0, 0}; + gboolean result; + size_t header_size; + char *key; + int thread; + + if (device_in_error(self)) return FALSE; - g_return_val_if_fail (self != NULL, FALSE); + reset_thread(self); + pself->is_eom = FALSE; + + /* Set the blocksize to zero, since there's no header to skip (it's stored + * in a distinct file, rather than block zero) */ + jobInfo->blocksize = 0; /* Build the amanda header. */ - amanda_header = device_build_amanda_header(pself, jobInfo, - &header_size, &header_fits); - g_return_val_if_fail(amanda_header != NULL, FALSE); - g_return_val_if_fail(header_fits, FALSE); + header_size = 0; /* no minimum size */ + amanda_header.buffer = device_build_amanda_header(pself, jobInfo, + &header_size); + if (amanda_header.buffer == NULL) { + device_set_error(pself, + stralloc(_("Amanda file header won't fit in a single block!")), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + amanda_header.buffer_len = header_size; + if(check_at_leom(self, header_size)) + pself->is_eom = TRUE; + + if(check_at_peom(self, header_size)) { + pself->is_eom = TRUE; + device_set_error(pself, + stralloc(_("No space left on device")), + DEVICE_STATUS_DEVICE_ERROR); + g_free(amanda_header.buffer); + return FALSE; + } /* set the file and block numbers correctly */ pself->file = (pself->file > 0)? pself->file+1 : 1; pself->block = 0; pself->in_file = TRUE; - /* write it out as a special block (not the 0th) */ key = special_file_to_key(self, "filestart", pself->file); - result = s3_upload(self->s3, self->bucket, key, amanda_header, header_size); - g_free(amanda_header); + result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS, + &amanda_header, NULL, NULL); + g_free(amanda_header.buffer); g_free(key); if (!result) { - fprintf(stderr, _("While writing filestart header: %s\n"), - s3_strerror(self->s3)); + device_set_error(pself, + vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3t[0].s3)), + DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR); return FALSE; } + self->volume_bytes += header_size; + for (thread = 0; thread < self->nb_threads; thread++) { + self->s3t[thread].idle = 1; + } + return TRUE; } -/* }}} */ -/* {{{ s3_device_write_block */ static gboolean -s3_device_write_block (Device * pself, guint size, gpointer data, - gboolean last_block) { - gboolean result; +s3_device_write_block (Device * pself, guint size, gpointer data) { char *filename; - S3Device * self = S3_DEVICE(pself);; + S3Device * self = S3_DEVICE(pself); + int idle_thread = 0; + int thread = -1; + int first_idle = -1; g_assert (self != NULL); g_assert (data != NULL); - - filename = file_and_block_to_key(self, pself->file, pself->block); + if (device_in_error(self)) return FALSE; - result = s3_upload(self->s3, self->bucket, filename, data, size); - g_free(filename); - if (!result) { - fprintf(stderr, _("While writing data block to S3: %s\n"), - s3_strerror(self->s3)); + if(check_at_leom(self, size)) + pself->is_eom = TRUE; + + if(check_at_peom(self, size)) { + pself->is_eom = TRUE; + device_set_error(pself, + stralloc(_("No space left on device")), + DEVICE_STATUS_DEVICE_ERROR); return FALSE; } - pself->block++; + filename = file_and_block_to_key(self, pself->file, pself->block); - /* if this is the last block, finish the file */ - if (last_block) { - return s3_device_finish_file(pself); + g_mutex_lock(self->thread_idle_mutex); + while (!idle_thread) { + idle_thread = 0; + for (thread = 0; thread < self->nb_threads_backup; thread++) { + if (self->s3t[thread].idle == 1) { + idle_thread++; + /* Check if the thread is in error */ + if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) { + device_set_error(pself, (char *)self->s3t[thread].errmsg, + self->s3t[thread].errflags); + self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS; + self->s3t[thread].errmsg = NULL; + g_mutex_unlock(self->thread_idle_mutex); + return FALSE; + } + if (first_idle == -1) { + first_idle = thread; + break; + } + } + } + if (!idle_thread) { + g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex); + } + } + thread = first_idle; + + self->s3t[thread].idle = 0; + self->s3t[thread].done = 0; + if (self->s3t[thread].curl_buffer.buffer && + self->s3t[thread].curl_buffer.buffer_len < size) { + g_free((char *)self->s3t[thread].curl_buffer.buffer); + self->s3t[thread].curl_buffer.buffer = NULL; + self->s3t[thread].curl_buffer.buffer_len = 0; + self->s3t[thread].buffer_len = 0; } + if (self->s3t[thread].curl_buffer.buffer == NULL) { + self->s3t[thread].curl_buffer.buffer = g_malloc(size); + self->s3t[thread].curl_buffer.buffer_len = size; + self->s3t[thread].buffer_len = size; + } + memcpy((char *)self->s3t[thread].curl_buffer.buffer, data, size); + self->s3t[thread].curl_buffer.buffer_pos = 0; + self->s3t[thread].curl_buffer.buffer_len = size; + self->s3t[thread].curl_buffer.max_buffer_size = 0; + self->s3t[thread].filename = filename; + g_thread_pool_push(self->thread_pool_write, &self->s3t[thread], NULL); + g_mutex_unlock(self->thread_idle_mutex); + pself->block++; + self->volume_bytes += size; return TRUE; } -/* }}} */ -/* {{{ s3_device_finish_file */ +static void +s3_thread_write_block( + gpointer thread_data, + gpointer data) +{ + S3_by_thread *s3t = (S3_by_thread *)thread_data; + Device *pself = (Device *)data; + S3Device *self = S3_DEVICE(pself); + gboolean result; + + result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename, + S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL); + g_free((void *)s3t->filename); + s3t->filename = NULL; + if (!result) { + s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR; + s3t->errmsg = g_strdup_printf(_("While writing data block to S3: %s"), s3_strerror(s3t->s3)); + } + g_mutex_lock(self->thread_idle_mutex); + s3t->idle = 1; + s3t->done = 1; + s3t->curl_buffer.buffer_len = s3t->buffer_len; + g_cond_broadcast(self->thread_idle_cond); + g_mutex_unlock(self->thread_idle_mutex); +} + static gboolean s3_device_finish_file (Device * pself) { + S3Device *self = S3_DEVICE(pself); + + /* Check all threads are done */ + int idle_thread = 0; + int thread; + + g_mutex_lock(self->thread_idle_mutex); + while (idle_thread != self->nb_threads) { + idle_thread = 0; + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].idle == 1) { + idle_thread++; + } + /* check thread status */ + if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) { + device_set_error(pself, (char *)self->s3t[thread].errmsg, + self->s3t[thread].errflags); + self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS; + self->s3t[thread].errmsg = NULL; + } + } + if (idle_thread != self->nb_threads) { + g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex); + } + } + g_mutex_unlock(self->thread_idle_mutex); + + if (device_in_error(pself)) return FALSE; + /* we're not in a file anymore */ pself->in_file = FALSE; return TRUE; } -/* }}} */ -/* {{{ s3_device_recycle_file */ static gboolean s3_device_recycle_file(Device *pself, guint file) { S3Device *self = S3_DEVICE(pself); + if (device_in_error(self)) return FALSE; + + reset_thread(self); + delete_file(self, file); + s3_wait_thread_delete(self); + return !device_in_error(self); + /* delete_file already set our error message if necessary */ +} + +static gboolean +s3_device_erase(Device *pself) { + S3Device *self = S3_DEVICE(pself); + char *key = NULL; + const char *errmsg = NULL; + guint response_code; + s3_error_code_t s3_error_code; + + if (!setup_handle(self)) { + /* error set by setup_handle */ + return FALSE; + } + + reset_thread(self); + key = special_file_to_key(self, "tapestart", -1); + if (!s3_delete(self->s3t[0].s3, self->bucket, key)) { + s3_error(self->s3t[0].s3, &errmsg, NULL, NULL, NULL, NULL, NULL); + device_set_error(pself, + stralloc(errmsg), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + g_free(key); + + dumpfile_free(pself->volume_header); + pself->volume_header = NULL; + + if (!delete_all_files(self)) + return FALSE; - return delete_file(self, file); + device_set_error(pself, g_strdup("Unlabeled volume"), + DEVICE_STATUS_VOLUME_UNLABELED); + + if (!s3_delete_bucket(self->s3t[0].s3, self->bucket)) { + s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); + + /* + * ignore the error if the bucket isn't empty (there may be data from elsewhere) + * or the bucket not existing (already deleted perhaps?) + */ + if (!( + (response_code == 409 && s3_error_code == S3_ERROR_BucketNotEmpty) || + (response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket))) { + + device_set_error(pself, + stralloc(errmsg), + DEVICE_STATUS_DEVICE_ERROR); + return FALSE; + } + } + self->volume_bytes = 0; + return TRUE; } -/* }}} */ /* functions for reading */ -/* {{{ s3_device_seek_file */ static dumpfile_t* s3_device_seek_file(Device *pself, guint file) { S3Device *self = S3_DEVICE(pself); gboolean result; char *key; - gpointer buf; - guint buf_size; + CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE}; dumpfile_t *amanda_header; + const char *errmsg = NULL; + int thread; + + if (device_in_error(self)) return NULL; + + reset_thread(self); pself->file = file; + pself->is_eof = FALSE; + pself->in_file = FALSE; pself->block = 0; - pself->in_file = TRUE; + self->next_block_to_read = 0; /* read it in */ key = special_file_to_key(self, "filestart", pself->file); - result = s3_read(self->s3, self->bucket, key, &buf, &buf_size, S3_DEVICE_MAX_BLOCK_SIZE); + result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, + &buf, NULL, NULL); g_free(key); - + if (!result) { guint response_code; s3_error_code_t s3_error_code; - s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); + s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL); /* if it's an expected error (not found), check what to do. */ - if (response_code == 404 && s3_error_code == S3_ERROR_NoSuchKey) { + if (response_code == 404 && + (s3_error_code == S3_ERROR_None || + s3_error_code == S3_ERROR_NoSuchKey || + s3_error_code == S3_ERROR_NoSuchEntity)) { int next_file; - pself->file = -1; - pself->in_file = FALSE; next_file = find_next_file(self, pself->file); if (next_file > 0) { /* Note short-circut of dispatcher. */ @@ -1054,131 +2331,274 @@ s3_device_seek_file(Device *pself, guint file) { } else if (next_file == 0) { /* No next file. Check if we are one past the end. */ key = special_file_to_key(self, "filestart", pself->file - 1); - result = s3_read(self->s3, self->bucket, key, &buf, &buf_size, - S3_DEVICE_MAX_BLOCK_SIZE); + result = s3_read(self->s3t[0].s3, self->bucket, key, + S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL); g_free(key); if (result) { + /* pself->file, etc. are already correct */ return make_tapeend_header(); } else { + device_set_error(pself, + stralloc(_("Attempt to read past tape-end file")), + DEVICE_STATUS_SUCCESS); return NULL; } } } else { - /* An error occured finding out if we are the last file. */ + /* An unexpected error occured finding out if we are the last file. */ + device_set_error(pself, + stralloc(errmsg), + DEVICE_STATUS_DEVICE_ERROR); return NULL; } } - + /* and make a dumpfile_t out of it */ - g_assert(buf != NULL); + g_assert(buf.buffer != NULL); amanda_header = g_new(dumpfile_t, 1); fh_init(amanda_header); - parse_file_header(buf, amanda_header, buf_size); - g_free(buf); + parse_file_header(buf.buffer, amanda_header, buf.buffer_pos); + g_free(buf.buffer); switch (amanda_header->type) { case F_DUMPFILE: case F_CONT_DUMPFILE: case F_SPLIT_DUMPFILE: - return amanda_header; + break; default: - fprintf(stderr, - _("Invalid amanda header while reading file header\n")); + device_set_error(pself, + stralloc(_("Invalid amanda header while reading file header")), + DEVICE_STATUS_VOLUME_ERROR); g_free(amanda_header); return NULL; } + + pself->in_file = TRUE; + for (thread = 0; thread < self->nb_threads; thread++) { + self->s3t[thread].idle = 1; + self->s3t[thread].eof = FALSE; + } + return amanda_header; } -/* }}} */ -/* {{{ s3_device_seek_block */ static gboolean s3_device_seek_block(Device *pself, guint64 block) { + S3Device * self = S3_DEVICE(pself); + if (device_in_error(pself)) return FALSE; + + reset_thread(self); pself->block = block; + self->next_block_to_read = block; return TRUE; } -/* }}} */ -/* {{{ s3_device_read_block */ static int s3_device_read_block (Device * pself, gpointer data, int *size_req) { S3Device * self = S3_DEVICE(pself); char *key; - gpointer buf; - gboolean result; - guint buf_size; + int thread; + int done = 0; g_assert (self != NULL); + if (device_in_error(self)) return -1; + + g_mutex_lock(self->thread_idle_mutex); + /* start a read ahead for each thread */ + for (thread = 0; thread < self->nb_threads_recovery; thread++) { + S3_by_thread *s3t = &self->s3t[thread]; + if (s3t->idle) { + key = file_and_block_to_key(self, pself->file, self->next_block_to_read); + s3t->filename = key; + s3t->done = 0; + s3t->idle = 0; + s3t->eof = FALSE; + s3t->errflags = DEVICE_STATUS_SUCCESS; + if (self->s3t[thread].curl_buffer.buffer && + (int)self->s3t[thread].curl_buffer.buffer_len < *size_req) { + g_free(self->s3t[thread].curl_buffer.buffer); + self->s3t[thread].curl_buffer.buffer = NULL; + self->s3t[thread].curl_buffer.buffer_len = 0; + self->s3t[thread].buffer_len = 0; + } + if (!self->s3t[thread].curl_buffer.buffer) { + self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req); + self->s3t[thread].curl_buffer.buffer_len = *size_req; + self->s3t[thread].buffer_len = *size_req; + } + s3t->curl_buffer.buffer_pos = 0; + s3t->curl_buffer.max_buffer_size = S3_DEVICE_MAX_BLOCK_SIZE; + self->next_block_to_read++; + g_thread_pool_push(self->thread_pool_read, s3t, NULL); + } + } /* get the file*/ key = file_and_block_to_key(self, pself->file, pself->block); g_assert(key != NULL); - if (self->cached_key && (0 == strcmp(key, self->cached_key))) { - /* use the cached copy and clear the cache */ - buf = self->cached_buf; - buf_size = self->cached_size; - - self->cached_buf = NULL; - g_free(self->cached_key); - self->cached_key = NULL; - } else { - /* clear the cache and actually download the file */ - if (self->cached_buf) { - g_free(self->cached_buf); - self->cached_buf = NULL; - } - if (self->cached_key) { - g_free(self->cached_key); - self->cached_key = NULL; - } + while (!done) { + /* find which thread read the key */ + for (thread = 0; thread < self->nb_threads_recovery; thread++) { + S3_by_thread *s3t; + s3t = &self->s3t[thread]; + if (!s3t->idle && + s3t->done && + strcmp(key, (char *)s3t->filename) == 0) { + if (s3t->eof) { + /* return eof */ + g_free(key); + pself->is_eof = TRUE; + pself->in_file = FALSE; + device_set_error(pself, stralloc(_("EOF")), + DEVICE_STATUS_SUCCESS); + g_mutex_unlock(self->thread_idle_mutex); + return -1; + } else if (s3t->errflags != DEVICE_STATUS_SUCCESS) { + /* return the error */ + device_set_error(pself, (char *)s3t->errmsg, s3t->errflags); + g_free(key); + g_mutex_unlock(self->thread_idle_mutex); + return -1; + + } else if ((guint)*size_req >= s3t->curl_buffer.buffer_pos) { + /* return the buffer */ + g_mutex_unlock(self->thread_idle_mutex); + memcpy(data, s3t->curl_buffer.buffer, + s3t->curl_buffer.buffer_pos); + *size_req = s3t->curl_buffer.buffer_pos; + g_free(key); + s3t->idle = 1; + g_free((char *)s3t->filename); + pself->block++; + done = 1; + g_mutex_lock(self->thread_idle_mutex); + break; + } else { /* buffer not enough large */ + *size_req = s3t->curl_buffer.buffer_len; + g_free(key); + g_mutex_unlock(self->thread_idle_mutex); + return 0; + } + } + } + if (!done) { + g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex); + } + } - result = s3_read(self->s3, self->bucket, key, &buf, &buf_size, S3_DEVICE_MAX_BLOCK_SIZE); - if (!result) { - guint response_code; - s3_error_code_t s3_error_code; - s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); + /* start a read ahead for the thread */ + for (thread = 0; thread < self->nb_threads_recovery; thread++) { + S3_by_thread *s3t = &self->s3t[thread]; + if (s3t->idle) { + key = file_and_block_to_key(self, pself->file, self->next_block_to_read); + s3t->filename = key; + s3t->done = 0; + s3t->idle = 0; + s3t->eof = FALSE; + s3t->errflags = DEVICE_STATUS_SUCCESS; + if (!self->s3t[thread].curl_buffer.buffer) { + self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req); + self->s3t[thread].curl_buffer.buffer_len = *size_req; + } + s3t->curl_buffer.buffer_pos = 0; + self->next_block_to_read++; + g_thread_pool_push(self->thread_pool_read, s3t, NULL); + } + } + g_mutex_unlock(self->thread_idle_mutex); - g_free(key); - key = NULL; + return *size_req; - /* if it's an expected error (not found), just return -1 */ - if (response_code == 404 && s3_error_code == S3_ERROR_NoSuchKey) { - pself->is_eof = TRUE; - pself->in_file = FALSE; - return -1; - } +} - /* otherwise, log it and return FALSE */ - fprintf(stderr, _("While reading data block from S3: %s\n"), - s3_strerror(self->s3)); - return -1; - } +static void +s3_thread_read_block( + gpointer thread_data, + gpointer data) +{ + S3_by_thread *s3t = (S3_by_thread *)thread_data; + Device *pself = (Device *)data; + S3Device *self = S3_DEVICE(pself); + gboolean result; + + result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename, s3_buffer_write_func, + s3_buffer_reset_func, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL); + + g_mutex_lock(self->thread_idle_mutex); + if (!result) { + guint response_code; + s3_error_code_t s3_error_code; + s3_error(s3t->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL); + /* if it's an expected error (not found), just return -1 */ + if (response_code == 404 && + (s3_error_code == S3_ERROR_None || + s3_error_code == S3_ERROR_Unknown || + s3_error_code == S3_ERROR_NoSuchKey || + s3_error_code == S3_ERROR_NoSuchEntity)) { + s3t->eof = TRUE; + } else { + + /* otherwise, log it and return FALSE */ + s3t->errflags = DEVICE_STATUS_VOLUME_ERROR; + s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"), + s3_strerror(s3t->s3)); + } } + s3t->done = 1; + g_cond_broadcast(self->thread_idle_cond); + g_mutex_unlock(self->thread_idle_mutex); - /* INVARIANT: cache is NULL */ - g_assert(self->cached_buf == NULL); - g_assert(self->cached_key == NULL); + return; +} - /* now see how the caller wants to deal with that */ - if (data == NULL || *size_req < 0 || buf_size > (guint)*size_req) { - /* A size query or short buffer -- load the cache and return the size*/ - self->cached_buf = buf; - self->cached_key = key; - self->cached_size = buf_size; +static gboolean +check_at_peom(S3Device *self, guint64 size) +{ + if(self->enforce_volume_limit && (self->volume_limit > 0)) { + guint64 newtotal = self->volume_bytes + size; + if(newtotal > self->volume_limit) { + return TRUE; + } + } + return FALSE; +} - *size_req = buf_size; - return 0; - } else { - /* ok, all checks are passed -- copy the data */ - *size_req = buf_size; - g_memmove(data, buf, buf_size); - g_free(key); - g_free(buf); +static gboolean +check_at_leom(S3Device *self, guint64 size) +{ + guint64 block_size = DEVICE(self)->block_size; + guint64 eom_warning_buffer = block_size * + (EOM_EARLY_WARNING_ZONE_BLOCKS + self->nb_threads); - /* move on to the next block */ - pself->block++; + if(!self->leom) + return FALSE; + + if(self->enforce_volume_limit && (self->volume_limit > 0)) { + guint64 newtotal = self->volume_bytes + size + eom_warning_buffer; + if(newtotal > self->volume_limit) { + return TRUE; + } + } + return FALSE; +} - return buf_size; +static void +reset_thread( + S3Device *self) +{ + int thread; + int nb_done = 0; + + g_mutex_lock(self->thread_idle_mutex); + while(nb_done != self->nb_threads) { + nb_done = 0; + for (thread = 0; thread < self->nb_threads; thread++) { + if (self->s3t[thread].done == 1) + nb_done++; + } + if (nb_done != self->nb_threads) { + g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex); + } } + g_mutex_unlock(self->thread_idle_mutex); } -/* }}} */