Imported Upstream version 3.3.1
[debian/amanda] / device-src / s3-device.c
index 0fe3bb0b8abb13e3b702bfec54c011cd64ceca7b..7a987505c4e81ba70d40c8fb4f70d12aa13b97c3 100644 (file)
@@ -1,29 +1,30 @@
 /*
- * Copyright (c) 2005-2008 Zmanda Inc.  All Rights Reserved.
- * 
- * This library is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License version 2.1 as 
- * published by the Free Software Foundation.
- * 
- * This library is distributed in the hope that it will be useful, but
+ * Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
- * License for more details.
- * 
- * You should have received a copy of the GNU Lesser General Public License
- * along with this library; if not, write to the Free Software Foundation,
- * Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA.
- * 
- * Contact information: Zmanda Inc., 465 S Mathlida Ave, Suite 300
- * Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
+ * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
+ * for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
+ *
+ * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
+ * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
  */
 
-/* An S3 device uses Amazon's S3 service (http://www.amazon.com/s3) to store 
+/* An S3 device uses Amazon's S3 service (http://www.amazon.com/s3) to store
  * data.  It stores data in keys named with a user-specified prefix, inside a
- * user-specified bucket.  Data is stored in the form of numbered (large) 
- * blocks. 
+ * user-specified bucket.  Data is stored in the form of numbered (large)
+ * blocks.
  */
 
+#include "amanda.h"
 #include <string.h>
 #include <sys/types.h>
 #include <sys/stat.h>
@@ -32,7 +33,6 @@
 #include <regex.h>
 #include <time.h>
 #include "util.h"
-#include "amanda.h"
 #include "conffile.h"
 #include "device.h"
 #include "s3.h"
@@ -66,12 +66,25 @@ static GType        s3_device_get_type      (void);
  */
 typedef struct _S3MetadataFile S3MetadataFile;
 
+typedef struct _S3_by_thread S3_by_thread;
+struct _S3_by_thread {
+    S3Handle * volatile          s3;
+    CurlBuffer volatile          curl_buffer;
+    guint volatile               buffer_len;
+    int volatile                 idle;
+    int volatile                 eof;
+    int volatile                 done;
+    char volatile * volatile     filename;
+    DeviceStatusFlags volatile   errflags;     /* device_status */
+    char volatile * volatile     errmsg;       /* device error message */
+};
+
 typedef struct _S3Device S3Device;
 struct _S3Device {
     Device __parent__;
 
     /* The "easy" curl handle we use to access Amazon S3 */
-    S3Handle *s3;
+    S3_by_thread *s3t;
 
     /* S3 access information */
     char *bucket;
@@ -81,9 +94,18 @@ struct _S3Device {
     char *secret_key;
     char *access_key;
     char *user_token;
-    gboolean is_devpay;
+
+    /* The Openstack swift information. */
+    char *swift_account_id;
+    char *swift_access_key;
 
     char *bucket_location;
+    char *storage_class;
+    char *host;
+    char *service_path;
+    char *server_side_encryption;
+
+    char *ca_info;
 
     /* a cache for unsuccessful reads (where we get the file but the caller
      * doesn't have space for it or doesn't want it), where we expect the
@@ -98,6 +120,28 @@ struct _S3Device {
 
     /* Use SSL? */
     gboolean use_ssl;
+    gboolean openstack_swift_api;
+
+    /* Throttling */
+    guint64 max_send_speed;
+    guint64 max_recv_speed;
+
+    gboolean leom;
+    guint64 volume_bytes;
+    guint64 volume_limit;
+    gboolean enforce_volume_limit;
+    gboolean use_subdomain;
+
+    int          nb_threads;
+    int          nb_threads_backup;
+    int          nb_threads_recovery;
+    GThreadPool *thread_pool_delete;
+    GThreadPool *thread_pool_write;
+    GThreadPool *thread_pool_read;
+    GCond       *thread_idle_cond;
+    GMutex      *thread_idle_mutex;
+    int          next_block_to_read;
+    GSList      *keys;
 };
 
 /*
@@ -114,7 +158,6 @@ struct _S3DeviceClass {
  */
 
 #define S3_DEVICE_NAME "s3"
-#define DEVPAY_DEVICE_NAME "s3zmanda"
 
 /* Maximum key length as specified in the S3 documentation
  * (*excluding* null terminator) */
@@ -122,8 +165,9 @@ struct _S3DeviceClass {
 
 /* Note: for compatability, min can only be decreased and max increased */
 #define S3_DEVICE_MIN_BLOCK_SIZE 1024
-#define S3_DEVICE_MAX_BLOCK_SIZE (100*1024*1024)
+#define S3_DEVICE_MAX_BLOCK_SIZE (3*1024*1024*1024ULL)
 #define S3_DEVICE_DEFAULT_BLOCK_SIZE (10*1024*1024)
+#define EOM_EARLY_WARNING_ZONE_BLOCKS 4
 
 /* This goes in lieu of file number for metadata. */
 #define SPECIAL_INFIX "special-"
@@ -141,6 +185,18 @@ static DevicePropertyBase device_property_s3_secret_key;
 #define PROPERTY_S3_SECRET_KEY (device_property_s3_secret_key.ID)
 #define PROPERTY_S3_ACCESS_KEY (device_property_s3_access_key.ID)
 
+/* Authentication information for Openstack Swift. Both of these are strings. */
+static DevicePropertyBase device_property_swift_account_id;
+static DevicePropertyBase device_property_swift_access_key;
+#define PROPERTY_SWIFT_ACCOUNT_ID (device_property_swift_account_id.ID)
+#define PROPERTY_SWIFT_ACCESS_KEY (device_property_swift_access_key.ID)
+
+/* Host and path */
+static DevicePropertyBase device_property_s3_host;
+static DevicePropertyBase device_property_s3_service_path;
+#define PROPERTY_S3_HOST (device_property_s3_host.ID)
+#define PROPERTY_S3_SERVICE_PATH (device_property_s3_service_path.ID)
+
 /* Same, but for S3 with DevPay. */
 static DevicePropertyBase device_property_s3_user_token;
 #define PROPERTY_S3_USER_TOKEN (device_property_s3_user_token.ID)
@@ -149,10 +205,41 @@ static DevicePropertyBase device_property_s3_user_token;
 static DevicePropertyBase device_property_s3_bucket_location;
 #define PROPERTY_S3_BUCKET_LOCATION (device_property_s3_bucket_location.ID)
 
+/* Storage class */
+static DevicePropertyBase device_property_s3_storage_class;
+#define PROPERTY_S3_STORAGE_CLASS (device_property_s3_storage_class.ID)
+
+/* Storage class */
+static DevicePropertyBase device_property_s3_server_side_encryption;
+#define PROPERTY_S3_SERVER_SIDE_ENCRYPTION (device_property_s3_server_side_encryption.ID)
+
+/* Path to certificate authority certificate */
+static DevicePropertyBase device_property_ssl_ca_info;
+#define PROPERTY_SSL_CA_INFO (device_property_ssl_ca_info.ID)
+
+/* Whether to use openstack protocol. */
+static DevicePropertyBase device_property_openstack_swift_api;
+#define PROPERTY_OPENSTACK_SWIFT_API (device_property_openstack_swift_api.ID)
+
 /* Whether to use SSL with Amazon S3. */
 static DevicePropertyBase device_property_s3_ssl;
 #define PROPERTY_S3_SSL (device_property_s3_ssl.ID)
 
+/* Speed limits for sending and receiving */
+static DevicePropertyBase device_property_max_send_speed;
+static DevicePropertyBase device_property_max_recv_speed;
+#define PROPERTY_MAX_SEND_SPEED (device_property_max_send_speed.ID)
+#define PROPERTY_MAX_RECV_SPEED (device_property_max_recv_speed.ID)
+
+/* Whether to use subdomain */
+static DevicePropertyBase device_property_s3_subdomain;
+#define PROPERTY_S3_SUBDOMAIN (device_property_s3_subdomain.ID)
+
+/* Number of threads to use */
+static DevicePropertyBase device_property_nb_threads_backup;
+#define PROPERTY_NB_THREADS_BACKUP (device_property_nb_threads_backup.ID)
+static DevicePropertyBase device_property_nb_threads_recovery;
+#define PROPERTY_NB_THREADS_RECOVERY (device_property_nb_threads_recovery.ID)
 
 /*
  * prototypes
@@ -160,19 +247,19 @@ static DevicePropertyBase device_property_s3_ssl;
 
 void s3_device_register(void);
 
-/* 
+/*
  * utility functions */
 
 /* Given file and block numbers, return an S3 key.
- * 
+ *
  * @param self: the S3Device object
  * @param file: the file number
  * @param block: the block within that file
  * @returns: a newly allocated string containing an S3 key.
  */
-static char * 
-file_and_block_to_key(S3Device *self, 
-                      int file, 
+static char *
+file_and_block_to_key(S3Device *self,
+                      int file,
                       guint64 block);
 
 /* Given the name of a special file (such as 'tapestart'), generate
@@ -183,9 +270,9 @@ file_and_block_to_key(S3Device *self,
  * @param file: a file number to include; omitted if -1
  * @returns: a newly alocated string containing an S3 key.
  */
-static char * 
-special_file_to_key(S3Device *self, 
-                    char *special_name, 
+static char *
+special_file_to_key(S3Device *self,
+                    char *special_name,
                     int file);
 /* Write an amanda header file to S3.
  *
@@ -193,9 +280,9 @@ special_file_to_key(S3Device *self,
  * @param label: the volume label
  * @param timestamp: the volume timestamp
  */
-static gboolean 
-write_amanda_header(S3Device *self, 
-                    char *label, 
+static gboolean
+write_amanda_header(S3Device *self,
+                    char *label,
                     char * timestamp);
 
 /* "Fast forward" this device to the end by looking up the largest file number
@@ -203,15 +290,15 @@ write_amanda_header(S3Device *self,
  *
  * @param self: the S3Device object
  */
-static gboolean 
+static gboolean
 seek_to_end(S3Device *self);
 
-/* Find the number of the last file that contains any data (even just a header). 
+/* Find the number of the last file that contains any data (even just a header).
  *
  * @param self: the S3Device object
  * @returns: the last file, or -1 in event of an error
  */
-static int 
+static int
 find_last_file(S3Device *self);
 
 /* Delete all blocks in the given file, including the filestart block
@@ -219,21 +306,32 @@ find_last_file(S3Device *self);
  * @param self: the S3Device object
  * @param file: the file to delete
  */
-static gboolean 
-delete_file(S3Device *self, 
+static gboolean
+delete_file(S3Device *self,
             int file);
 
-/* Set up self->s3 as best as possible.
+
+/* Delete all files in the given device
+ *
+ * @param self: the S3Device object
+ */
+static gboolean
+delete_all_files(S3Device *self);
+
+/* Set up self->s3t as best as possible.
  *
- * The return value is TRUE iff self->s3 is useable.
+ * The return value is TRUE iff self->s3t is useable.
  *
  * @param self: the S3Device object
  * @returns: TRUE if the handle is set up
  */
-static gboolean 
+static gboolean
 setup_handle(S3Device * self);
 
-/* 
+static void
+s3_wait_thread_delete(S3Device *self);
+
+/*
  * class mechanics */
 
 static void
@@ -259,6 +357,14 @@ static gboolean s3_device_set_secret_key_fn(Device *self,
     DevicePropertyBase *base, GValue *val,
     PropertySurety surety, PropertySource source);
 
+static gboolean s3_device_set_swift_account_id_fn(Device *self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_swift_access_key_fn(Device *self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
 static gboolean s3_device_set_user_token_fn(Device *self,
     DevicePropertyBase *base, GValue *val,
     PropertySurety surety, PropertySource source);
@@ -267,15 +373,81 @@ static gboolean s3_device_set_bucket_location_fn(Device *self,
     DevicePropertyBase *base, GValue *val,
     PropertySurety surety, PropertySource source);
 
+static gboolean s3_device_set_storage_class_fn(Device *self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_server_side_encryption_fn(Device *self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_ca_info_fn(Device *self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
 static gboolean s3_device_set_verbose_fn(Device *self,
     DevicePropertyBase *base, GValue *val,
     PropertySurety surety, PropertySource source);
 
+static gboolean s3_device_set_openstack_swift_api_fn(Device *self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
 static gboolean s3_device_set_ssl_fn(Device *self,
     DevicePropertyBase *base, GValue *val,
     PropertySurety surety, PropertySource source);
 
-/* 
+static gboolean s3_device_set_max_send_speed_fn(Device *self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_max_recv_speed_fn(Device *self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_nb_threads_backup(Device *self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_nb_threads_recovery(Device *self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_max_volume_usage_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
+static gboolean property_set_leom_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_use_subdomain_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_host_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
+static gboolean s3_device_set_service_path_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source);
+
+static void s3_thread_read_block(gpointer thread_data,
+                                gpointer data);
+static void s3_thread_write_block(gpointer thread_data,
+                                 gpointer data);
+static gboolean make_bucket(Device * pself);
+
+
+/* Wait that all threads are done */
+static void reset_thread(S3Device *self);
+
+/*
  * virtual functions */
 
 static void
@@ -284,51 +456,62 @@ s3_device_open_device(Device *pself, char *device_name,
 
 static DeviceStatusFlags s3_device_read_label(Device * self);
 
-static gboolean 
-s3_device_start(Device * self, 
-                DeviceAccessMode mode, 
-                char * label, 
+static gboolean
+s3_device_start(Device * self,
+                DeviceAccessMode mode,
+                char * label,
                 char * timestamp);
 
 static gboolean
 s3_device_finish(Device * self);
 
-static gboolean 
+static gboolean
 s3_device_start_file(Device * self,
                      dumpfile_t * jobInfo);
 
-static gboolean 
-s3_device_write_block(Device * self, 
-                      guint size, 
+static gboolean
+s3_device_write_block(Device * self,
+                      guint size,
                       gpointer data);
 
-static gboolean 
+static gboolean
 s3_device_finish_file(Device * self);
 
-static dumpfile_t* 
-s3_device_seek_file(Device *pself, 
+static dumpfile_t*
+s3_device_seek_file(Device *pself,
                     guint file);
 
-static gboolean 
-s3_device_seek_block(Device *pself, 
+static gboolean
+s3_device_seek_block(Device *pself,
                      guint64 block);
 
 static int
-s3_device_read_block(Device * pself, 
-                     gpointer data, 
+s3_device_read_block(Device * pself,
+                     gpointer data,
                      int *size_req);
 
-static gboolean 
-s3_device_recycle_file(Device *pself, 
+static gboolean
+s3_device_recycle_file(Device *pself,
                        guint file);
 
+static gboolean
+s3_device_erase(Device *pself);
+
+static gboolean
+check_at_leom(S3Device *self,
+                guint64 size);
+
+static gboolean
+check_at_peom(S3Device *self,
+                guint64 size);
+
 /*
  * Private functions
  */
 
 static char *
-file_and_block_to_key(S3Device *self, 
-                      int file, 
+file_and_block_to_key(S3Device *self,
+                      int file,
                       guint64 block)
 {
     char *s3_key = g_strdup_printf("%sf%08x-b%016llx.data",
@@ -338,8 +521,8 @@ file_and_block_to_key(S3Device *self,
 }
 
 static char *
-special_file_to_key(S3Device *self, 
-                    char *special_name, 
+special_file_to_key(S3Device *self,
+                    char *special_name,
                     int file)
 {
     if (file == -1)
@@ -349,41 +532,63 @@ special_file_to_key(S3Device *self,
 }
 
 static gboolean
-write_amanda_header(S3Device *self, 
-                    char *label, 
+write_amanda_header(S3Device *self,
+                    char *label,
                     char * timestamp)
 {
     CurlBuffer amanda_header = {NULL, 0, 0, 0};
     char * key = NULL;
-    gboolean header_fits, result;
+    gboolean result;
     dumpfile_t * dumpinfo = NULL;
     Device *d_self = DEVICE(self);
+    size_t header_size;
 
     /* build the header */
+    header_size = 0; /* no minimum size */
     dumpinfo = make_tapestart_header(DEVICE(self), label, timestamp);
-    amanda_header.buffer = device_build_amanda_header(DEVICE(self), dumpinfo, 
-        /* casting guint* to int* */
-        (int*) &amanda_header.buffer_len, &header_fits);
-    if (!header_fits) {
+    amanda_header.buffer = device_build_amanda_header(DEVICE(self), dumpinfo,
+        &header_size);
+    if (amanda_header.buffer == NULL) {
        device_set_error(d_self,
            stralloc(_("Amanda tapestart header won't fit in a single block!")),
            DEVICE_STATUS_DEVICE_ERROR);
+       dumpfile_free(dumpinfo);
        g_free(amanda_header.buffer);
        return FALSE;
     }
 
+    if(check_at_leom(self, header_size))
+        d_self->is_eom = TRUE;
+
+    if(check_at_peom(self, header_size)) {
+        d_self->is_eom = TRUE;
+        device_set_error(d_self,
+            stralloc(_("No space left on device")),
+            DEVICE_STATUS_DEVICE_ERROR);
+        g_free(amanda_header.buffer);
+        return FALSE;
+    }
+
     /* write out the header and flush the uploads. */
     key = special_file_to_key(self, "tapestart", -1);
-    result = s3_upload(self->s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
+    g_assert(header_size < G_MAXUINT); /* for cast to guint */
+    amanda_header.buffer_len = (guint)header_size;
+    result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
                        &amanda_header, NULL, NULL);
     g_free(amanda_header.buffer);
     g_free(key);
 
     if (!result) {
        device_set_error(d_self,
-           vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3)),
+           vstrallocf(_("While writing amanda header: %s"), s3_strerror(self->s3t[0].s3)),
            DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
+       dumpfile_free(dumpinfo);
+    } else {
+       dumpfile_free(d_self->volume_header);
+       d_self->volume_header = dumpinfo;
+        self->volume_bytes += header_size;
     }
+    d_self->header_block_size = header_size;
     return result;
 }
 
@@ -408,7 +613,7 @@ seek_to_end(S3Device *self) {
 static int key_to_file(guint prefix_len, const char * key) {
     int file;
     int i;
-    
+
     /* skip the prefix */
     if (strlen(key) <= prefix_len)
        return -1;
@@ -418,12 +623,12 @@ static int key_to_file(guint prefix_len, const char * key) {
     if (strncmp(key, SPECIAL_INFIX, strlen(SPECIAL_INFIX)) == 0) {
         return 0;
     }
-    
+
     /* check that key starts with 'f' */
     if (key[0] != 'f')
        return -1;
     key++;
-    
+
     /* check that key is of the form "%08x-" */
     for (i = 0; i < 8; i++) {
         if (!(key[i] >= '0' && key[i] <= '9') &&
@@ -440,11 +645,11 @@ static int key_to_file(guint prefix_len, const char * key) {
         g_warning(_("unparseable file number '%s'"), key);
         return -1;
     }
-    
+
     return file;
 }
 
-/* Find the number of the last file that contains any data (even just a header). 
+/* Find the number of the last file that contains any data (even just a header).
  * Returns -1 in event of an error
  */
 static int
@@ -456,10 +661,10 @@ find_last_file(S3Device *self) {
     Device *d_self = DEVICE(self);
 
     /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
-    result = s3_list_keys(self->s3, self->bucket, self->prefix, "-", &keys);
+    result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-", &keys, NULL);
     if (!result) {
        device_set_error(d_self,
-           vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3)),
+           vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
            DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
         return -1;
     }
@@ -475,7 +680,7 @@ find_last_file(S3Device *self) {
     return last_file;
 }
 
-/* Find the number of the file following the requested one, if any. 
+/* Find the number of the file following the requested one, if any.
  * Returns 0 if there is no such file or -1 in event of an error
  */
 static int
@@ -487,10 +692,11 @@ find_next_file(S3Device *self, int last_file) {
     Device *d_self = DEVICE(self);
 
     /* list all keys matching C{PREFIX*-*}, stripping the C{-*} */
-    result = s3_list_keys(self->s3, self->bucket, self->prefix, "-", &keys);
+    result = s3_list_keys(self->s3t[0].s3, self->bucket, self->prefix, "-",
+                         &keys, NULL);
     if (!result) {
        device_set_error(d_self,
-           vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3)),
+           vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
            DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
         return -1;
     }
@@ -512,50 +718,153 @@ find_next_file(S3Device *self, int last_file) {
         }
     }
 
-    return last_file;
+    return next_file;
 }
 
 static gboolean
 delete_file(S3Device *self,
             int file)
 {
+    int thread = -1;
+
     gboolean result;
     GSList *keys;
-    char *my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
+    guint64 total_size = 0;
     Device *d_self = DEVICE(self);
-    
-    result = s3_list_keys(self->s3, self->bucket, my_prefix, NULL, &keys);
+    char *my_prefix;
+    if (file == -1) {
+       my_prefix = g_strdup_printf("%sf", self->prefix);
+    } else {
+       my_prefix = g_strdup_printf("%sf%08x-", self->prefix, file);
+    }
+
+    result = s3_list_keys(self->s3t[0].s3, self->bucket, my_prefix, NULL, &keys,
+                         &total_size);
     if (!result) {
        device_set_error(d_self,
-           vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3)),
+           vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
            DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
         return FALSE;
     }
 
-    /* this will likely be a *lot* of keys */
-    for (; keys; keys = g_slist_remove(keys, keys->data)) {
-        if (self->verbose) g_debug(_("Deleting %s"), (char*)keys->data);
-        if (!s3_delete(self->s3, self->bucket, keys->data)) {
-           device_set_error(d_self,
-               vstrallocf(_("While deleting key '%s': %s"),
-                           (char*)keys->data, s3_strerror(self->s3)),
-               DEVICE_STATUS_DEVICE_ERROR);
-            g_slist_free(keys);
-            return FALSE;
-        }
+    g_mutex_lock(self->thread_idle_mutex);
+    if (!self->keys) {
+       self->keys = keys;
+    } else {
+       self->keys = g_slist_concat(self->keys, keys);
+    }
+
+    // start the threads
+    for (thread = 0; thread < self->nb_threads; thread++)  {
+       if (self->s3t[thread].idle == 1) {
+           /* Check if the thread is in error */
+           if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
+               device_set_error(d_self,
+                                (char *)self->s3t[thread].errmsg,
+                                self->s3t[thread].errflags);
+               self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
+               self->s3t[thread].errmsg = NULL;
+               g_mutex_unlock(self->thread_idle_mutex);
+               s3_wait_thread_delete(self);
+               return FALSE;
+           }
+           self->s3t[thread].idle = 0;
+           self->s3t[thread].done = 0;
+           g_thread_pool_push(self->thread_pool_delete, &self->s3t[thread],
+                              NULL);
+       }
     }
+    g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+    g_mutex_unlock(self->thread_idle_mutex);
+
+    self->volume_bytes = total_size;
+
+    s3_wait_thread_delete(self);
 
     return TRUE;
 }
 
+static void
+s3_thread_delete_block(
+    gpointer thread_data,
+    gpointer data)
+{
+    static int count = 0;
+    S3_by_thread *s3t = (S3_by_thread *)thread_data;
+    Device *pself = (Device *)data;
+    S3Device *self = S3_DEVICE(pself);
+    gboolean result = 1;
+    char *filename;
+
+    g_mutex_lock(self->thread_idle_mutex);
+    while (result && self->keys) {
+       filename = self->keys->data;
+       self->keys = g_slist_remove(self->keys, self->keys->data);
+       count++;
+       if (count >= 1000) {
+           g_debug("Deleting %s ...", filename);
+           count = 0;
+       }
+       g_mutex_unlock(self->thread_idle_mutex);
+       result = s3_delete(s3t->s3, (const char *)self->bucket, (const char *)filename);
+       if (!result) {
+           s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
+           s3t->errmsg = g_strdup_printf(_("While deleting key '%s': %s"),
+                                         filename, s3_strerror(s3t->s3));
+       }
+       g_free(filename);
+       g_mutex_lock(self->thread_idle_mutex);
+    }
+    s3t->idle = 1;
+    s3t->done = 1;
+    g_cond_broadcast(self->thread_idle_cond);
+    g_mutex_unlock(self->thread_idle_mutex);
+}
+
+static void
+s3_wait_thread_delete(S3Device *self)
+{
+    Device *d_self = (Device *)self;
+    int idle_thread = 0;
+    int thread;
+
+    g_mutex_lock(self->thread_idle_mutex);
+    while (idle_thread != self->nb_threads) {
+       idle_thread = 0;
+       for (thread = 0; thread < self->nb_threads; thread++)  {
+           if (self->s3t[thread].idle == 1) {
+               idle_thread++;
+           }
+           /* Check if the thread is in error */
+           if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
+               device_set_error(d_self, (char *)self->s3t[thread].errmsg,
+                                            self->s3t[thread].errflags);
+               self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
+               self->s3t[thread].errmsg = NULL;
+           }
+       }
+       if (idle_thread != self->nb_threads) {
+           g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+       }
+    }
+    g_mutex_unlock(self->thread_idle_mutex);
+}
+
+
+static gboolean
+delete_all_files(S3Device *self)
+{
+    return delete_file(self, -1);
+}
+
 /*
  * Class mechanics
  */
 
-void 
+void
 s3_device_register(void)
 {
-    static const char * device_prefix_list[] = { S3_DEVICE_NAME, DEVPAY_DEVICE_NAME, NULL };
+    static const char * device_prefix_list[] = { S3_DEVICE_NAME, NULL };
     g_assert(s3_init());
 
     /* set up our properties */
@@ -565,16 +874,54 @@ s3_device_register(void)
     device_property_fill_and_register(&device_property_s3_access_key,
                                       G_TYPE_STRING, "s3_access_key",
        "Access key ID to authenticate with Amazon S3");
+    device_property_fill_and_register(&device_property_swift_account_id,
+                                      G_TYPE_STRING, "swift_account_id",
+       "Account ID to authenticate with openstack swift");
+    device_property_fill_and_register(&device_property_swift_access_key,
+                                      G_TYPE_STRING, "swift_access_key",
+       "Access key to authenticate with openstack swift");
+    device_property_fill_and_register(&device_property_s3_host,
+                                      G_TYPE_STRING, "s3_host",
+       "hostname:port of the server");
+    device_property_fill_and_register(&device_property_s3_service_path,
+                                      G_TYPE_STRING, "s3_service_path",
+       "path to add in the url");
     device_property_fill_and_register(&device_property_s3_user_token,
                                       G_TYPE_STRING, "s3_user_token",
        "User token for authentication Amazon devpay requests");
     device_property_fill_and_register(&device_property_s3_bucket_location,
                                       G_TYPE_STRING, "s3_bucket_location",
        "Location constraint for buckets on Amazon S3");
+    device_property_fill_and_register(&device_property_s3_storage_class,
+                                      G_TYPE_STRING, "s3_storage_class",
+       "Storage class as specified by Amazon (STANDARD or REDUCED_REDUNDANCY)");
+    device_property_fill_and_register(&device_property_s3_server_side_encryption,
+                                      G_TYPE_STRING, "s3_server_side_encryption",
+       "Serve side encryption as specified by Amazon (AES256)");
+    device_property_fill_and_register(&device_property_ssl_ca_info,
+                                      G_TYPE_STRING, "ssl_ca_info",
+       "Path to certificate authority certificate");
+    device_property_fill_and_register(&device_property_openstack_swift_api,
+                                      G_TYPE_BOOLEAN, "openstack_swift_api",
+       "Whether to use openstack protocol");
     device_property_fill_and_register(&device_property_s3_ssl,
                                       G_TYPE_BOOLEAN, "s3_ssl",
        "Whether to use SSL with Amazon S3");
-
+    device_property_fill_and_register(&device_property_s3_subdomain,
+                                      G_TYPE_BOOLEAN, "s3_subdomain",
+       "Whether to use subdomain");
+    device_property_fill_and_register(&device_property_max_send_speed,
+                                      G_TYPE_UINT64, "max_send_speed",
+       "Maximum average upload speed (bytes/sec)");
+    device_property_fill_and_register(&device_property_max_recv_speed,
+                                      G_TYPE_UINT64, "max_recv_speed",
+       "Maximum average download speed (bytes/sec)");
+    device_property_fill_and_register(&device_property_nb_threads_backup,
+                                      G_TYPE_UINT64, "nb_threads_backup",
+       "Number of writer thread");
+    device_property_fill_and_register(&device_property_nb_threads_recovery,
+                                      G_TYPE_UINT64, "nb_threads_recovery",
+       "Number of reader thread");
 
     /* register the device itself */
     register_device(s3_device_factory, device_prefix_list);
@@ -584,7 +931,7 @@ static GType
 s3_device_get_type(void)
 {
     static GType type = 0;
-    
+
     if G_UNLIKELY(type == 0) {
         static const GTypeInfo info = {
             sizeof (S3DeviceClass),
@@ -598,7 +945,7 @@ s3_device_get_type(void)
             (GInstanceInitFunc) s3_device_init,
             NULL
         };
-        
+
         type = g_type_register_static (TYPE_DEVICE, "S3Device", &info,
                                        (GTypeFlags)0);
     }
@@ -606,12 +953,26 @@ s3_device_get_type(void)
     return type;
 }
 
-static void 
+static void
 s3_device_init(S3Device * self)
 {
     Device * dself = DEVICE(self);
     GValue response;
 
+    self->volume_bytes = 0;
+    self->volume_limit = 0;
+    self->leom = TRUE;
+    self->enforce_volume_limit = FALSE;
+    self->use_subdomain = FALSE;
+    self->nb_threads = 1;
+    self->nb_threads_backup = 1;
+    self->nb_threads_recovery = 1;
+    self->thread_pool_delete = NULL;
+    self->thread_pool_write = NULL;
+    self->thread_pool_read = NULL;
+    self->thread_idle_cond = NULL;
+    self->thread_idle_mutex = NULL;
+
     /* Register property values
      * Note: Some aren't added until s3_device_open_device()
      */
@@ -641,6 +1002,30 @@ s3_device_init(S3Device * self)
            &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
     g_value_unset(&response);
 
+    g_value_init(&response, G_TYPE_BOOLEAN);
+    g_value_set_boolean(&response, TRUE);
+    device_set_simple_property(dself, PROPERTY_FULL_DELETION,
+           &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
+    g_value_unset(&response);
+
+    g_value_init(&response, G_TYPE_BOOLEAN);
+    g_value_set_boolean(&response, TRUE); /* well, there *is* no EOM on S3 .. */
+    device_set_simple_property(dself, PROPERTY_LEOM,
+           &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
+    g_value_unset(&response);
+
+    g_value_init(&response, G_TYPE_BOOLEAN);
+    g_value_set_boolean(&response, FALSE);
+    device_set_simple_property(dself, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
+           &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
+    g_value_unset(&response);
+
+    g_value_init(&response, G_TYPE_BOOLEAN);
+    g_value_set_boolean(&response, FALSE);
+    device_set_simple_property(dself, PROPERTY_S3_SUBDOMAIN,
+           &response, PROPERTY_SURETY_GOOD, PROPERTY_SOURCE_DETECTED);
+    g_value_unset(&response);
+
     g_value_init(&response, G_TYPE_BOOLEAN);
     g_value_set_boolean(&response, FALSE);
     device_set_simple_property(dself, PROPERTY_COMPRESSION,
@@ -655,7 +1040,7 @@ s3_device_init(S3Device * self)
 
 }
 
-static void 
+static void
 s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED)
 {
     GObjectClass *g_object_class = (GObjectClass*) c;
@@ -677,6 +1062,8 @@ s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED)
     device_class->read_block = s3_device_read_block;
     device_class->recycle_file = s3_device_recycle_file;
 
+    device_class->erase = s3_device_erase;
+
     g_object_class->finalize = s3_device_finalize;
 
     device_class_register_property(device_class, PROPERTY_S3_ACCESS_KEY,
@@ -689,6 +1076,26 @@ s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED)
            device_simple_property_get_fn,
            s3_device_set_secret_key_fn);
 
+    device_class_register_property(device_class, PROPERTY_SWIFT_ACCOUNT_ID,
+           PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+           device_simple_property_get_fn,
+           s3_device_set_swift_account_id_fn);
+
+    device_class_register_property(device_class, PROPERTY_SWIFT_ACCESS_KEY,
+           PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+           device_simple_property_get_fn,
+           s3_device_set_swift_access_key_fn);
+
+    device_class_register_property(device_class, PROPERTY_S3_HOST,
+           PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+           device_simple_property_get_fn,
+           s3_device_set_host_fn);
+
+    device_class_register_property(device_class, PROPERTY_S3_SERVICE_PATH,
+           PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+           device_simple_property_get_fn,
+           s3_device_set_service_path_fn);
+
     device_class_register_property(device_class, PROPERTY_S3_USER_TOKEN,
            PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
            device_simple_property_get_fn,
@@ -699,20 +1106,83 @@ s3_device_class_init(S3DeviceClass * c G_GNUC_UNUSED)
            device_simple_property_get_fn,
            s3_device_set_bucket_location_fn);
 
+    device_class_register_property(device_class, PROPERTY_S3_STORAGE_CLASS,
+           PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+           device_simple_property_get_fn,
+           s3_device_set_storage_class_fn);
+
+    device_class_register_property(device_class, PROPERTY_S3_SERVER_SIDE_ENCRYPTION,
+           PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+           device_simple_property_get_fn,
+           s3_device_set_server_side_encryption_fn);
+
+    device_class_register_property(device_class, PROPERTY_SSL_CA_INFO,
+           PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+           device_simple_property_get_fn,
+           s3_device_set_ca_info_fn);
+
     device_class_register_property(device_class, PROPERTY_VERBOSE,
            PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
            device_simple_property_get_fn,
            s3_device_set_verbose_fn);
 
+    device_class_register_property(device_class, PROPERTY_OPENSTACK_SWIFT_API,
+           PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+           device_simple_property_get_fn,
+           s3_device_set_openstack_swift_api_fn);
+
     device_class_register_property(device_class, PROPERTY_S3_SSL,
            PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
            device_simple_property_get_fn,
            s3_device_set_ssl_fn);
 
+    device_class_register_property(device_class, PROPERTY_MAX_SEND_SPEED,
+           PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+           device_simple_property_get_fn,
+           s3_device_set_max_send_speed_fn);
+
+    device_class_register_property(device_class, PROPERTY_MAX_RECV_SPEED,
+           PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+           device_simple_property_get_fn,
+           s3_device_set_max_recv_speed_fn);
+
+    device_class_register_property(device_class, PROPERTY_NB_THREADS_BACKUP,
+           PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+           device_simple_property_get_fn,
+           s3_device_set_nb_threads_backup);
+
+    device_class_register_property(device_class, PROPERTY_NB_THREADS_RECOVERY,
+           PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+           device_simple_property_get_fn,
+           s3_device_set_nb_threads_recovery);
+
     device_class_register_property(device_class, PROPERTY_COMPRESSION,
            PROPERTY_ACCESS_GET_MASK,
            device_simple_property_get_fn,
            NULL);
+
+    device_class_register_property(device_class, PROPERTY_LEOM,
+            PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_BEFORE_START,
+            device_simple_property_get_fn,
+            property_set_leom_fn);
+
+    device_class_register_property(device_class, PROPERTY_MAX_VOLUME_USAGE,
+            (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
+                (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
+            device_simple_property_get_fn,
+            s3_device_set_max_volume_usage_fn);
+
+    device_class_register_property(device_class, PROPERTY_ENFORCE_MAX_VOLUME_USAGE,
+            (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
+                (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
+            device_simple_property_get_fn,
+            s3_device_set_enforce_max_volume_usage_fn);
+
+    device_class_register_property(device_class, PROPERTY_S3_SUBDOMAIN,
+            (PROPERTY_ACCESS_GET_MASK | PROPERTY_ACCESS_SET_MASK) &
+                (~ PROPERTY_ACCESS_SET_INSIDE_FILE_WRITE),
+            device_simple_property_get_fn,
+            s3_device_set_use_subdomain_fn);
 }
 
 static gboolean
@@ -742,101 +1212,345 @@ s3_device_set_secret_key_fn(Device *p_self, DevicePropertyBase *base,
 }
 
 static gboolean
-s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base,
+s3_device_set_swift_account_id_fn(Device *p_self, DevicePropertyBase *base,
     GValue *val, PropertySurety surety, PropertySource source)
 {
     S3Device *self = S3_DEVICE(p_self);
 
-    if (!self->is_devpay) {
-       device_set_error(p_self, stralloc(_(
-                  "Can't set a user token unless DevPay is in use")),
-           DEVICE_STATUS_DEVICE_ERROR);
-       return FALSE;
-    }
-
-    amfree(self->user_token);
-    self->user_token = g_value_dup_string(val);
+    amfree(self->swift_account_id);
+    self->swift_account_id = g_value_dup_string(val);
     device_clear_volume_details(p_self);
 
     return device_simple_property_set_fn(p_self, base, val, surety, source);
 }
 
 static gboolean
-s3_device_set_bucket_location_fn(Device *p_self, DevicePropertyBase *base,
+s3_device_set_swift_access_key_fn(Device *p_self, DevicePropertyBase *base,
     GValue *val, PropertySurety surety, PropertySource source)
 {
     S3Device *self = S3_DEVICE(p_self);
 
-    if (self->use_ssl && !s3_curl_location_compat()) {
-       device_set_error(p_self, stralloc(_(
-               "Location constraint given for Amazon S3 bucket, "
-               "but libcurl is too old support wildcard certificates.")),
-           DEVICE_STATUS_DEVICE_ERROR);
-       return FALSE;
-    }
+    amfree(self->swift_access_key);
+    self->swift_access_key = g_value_dup_string(val);
+    device_clear_volume_details(p_self);
 
-    if (!s3_bucket_location_compat(self->bucket)) {
-       device_set_error(p_self, g_strdup_printf(_(
-               "Location constraint given for Amazon S3 bucket, "
-               "but the bucket name (%s) is not usable as a subdomain."),
-               self->bucket),
-           DEVICE_STATUS_DEVICE_ERROR);
-       return FALSE;
-    }
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
 
-    amfree(self->bucket_location);
-    self->bucket_location = g_value_dup_string(val);
+static gboolean
+s3_device_set_host_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+
+    amfree(self->host);
+    self->host = g_value_dup_string(val);
     device_clear_volume_details(p_self);
 
     return device_simple_property_set_fn(p_self, base, val, surety, source);
 }
 
 static gboolean
-s3_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base,
-    GValue *val, PropertySurety surety, PropertySource source)
+s3_device_set_service_path_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source)
 {
     S3Device *self = S3_DEVICE(p_self);
 
-    self->verbose = g_value_get_boolean(val);
-    /* Our S3 handle may not yet have been instantiated; if so, it will
-     * get the proper verbose setting when it is created */
-    if (self->s3)
-       s3_verbose(self->s3, self->verbose);
+    amfree(self->service_path);
+    self->service_path = g_value_dup_string(val);
+    device_clear_volume_details(p_self);
 
     return device_simple_property_set_fn(p_self, base, val, surety, source);
 }
 
 static gboolean
-s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base,
+s3_device_set_user_token_fn(Device *p_self, DevicePropertyBase *base,
     GValue *val, PropertySurety surety, PropertySource source)
 {
     S3Device *self = S3_DEVICE(p_self);
-    gboolean new_val;
+
+    amfree(self->user_token);
+    self->user_token = g_value_dup_string(val);
+    device_clear_volume_details(p_self);
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_bucket_location_fn(Device *p_self, DevicePropertyBase *base,
+    GValue *val, PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+    char *str_val = g_value_dup_string(val);
+
+    if (str_val[0] && self->use_ssl && !s3_curl_location_compat()) {
+       device_set_error(p_self, stralloc(_(
+               "Location constraint given for Amazon S3 bucket, "
+               "but libcurl is too old support wildcard certificates.")),
+           DEVICE_STATUS_DEVICE_ERROR);
+        goto fail;
+    }
+
+    if (str_val[0] && !s3_bucket_location_compat(self->bucket)) {
+       device_set_error(p_self, g_strdup_printf(_(
+               "Location constraint given for Amazon S3 bucket, "
+               "but the bucket name (%s) is not usable as a subdomain."),
+               self->bucket),
+           DEVICE_STATUS_DEVICE_ERROR);
+        goto fail;
+    }
+
+    amfree(self->bucket_location);
+    self->bucket_location = str_val;
+    device_clear_volume_details(p_self);
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+fail:
+    g_free(str_val);
+    return FALSE;
+}
+
+static gboolean
+s3_device_set_storage_class_fn(Device *p_self, DevicePropertyBase *base,
+    GValue *val, PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+    char *str_val = g_value_dup_string(val);
+
+    amfree(self->storage_class);
+    self->storage_class = str_val;
+    device_clear_volume_details(p_self);
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_server_side_encryption_fn(Device *p_self, DevicePropertyBase *base,
+    GValue *val, PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+    char *str_val = g_value_dup_string(val);
+
+    amfree(self->server_side_encryption);
+    self->server_side_encryption = str_val;
+    device_clear_volume_details(p_self);
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_ca_info_fn(Device *p_self, DevicePropertyBase *base,
+    GValue *val, PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+
+    amfree(self->ca_info);
+    self->ca_info = g_value_dup_string(val);
+    device_clear_volume_details(p_self);
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_verbose_fn(Device *p_self, DevicePropertyBase *base,
+    GValue *val, PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+    int       thread;
+
+    self->verbose = g_value_get_boolean(val);
+    /* Our S3 handle may not yet have been instantiated; if so, it will
+     * get the proper verbose setting when it is created */
+    if (self->s3t) {
+       for (thread = 0; thread < self->nb_threads; thread++) {
+           if (self->s3t[thread].s3)
+               s3_verbose(self->s3t[thread].s3, self->verbose);
+       }
+    }
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_openstack_swift_api_fn(Device *p_self, DevicePropertyBase *base,
+    GValue *val, PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+
+    self->openstack_swift_api = g_value_get_boolean(val);
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_ssl_fn(Device *p_self, DevicePropertyBase *base,
+    GValue *val, PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+    gboolean new_val;
+    int      thread;
 
     new_val = g_value_get_boolean(val);
     /* Our S3 handle may not yet have been instantiated; if so, it will
      * get the proper use_ssl setting when it is created */
-    if (self->s3 && !s3_use_ssl(self->s3, new_val)) {
-       device_set_error(p_self, g_strdup_printf(_(
-                "Error setting S3 SSL/TLS use "
-                "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
-           DEVICE_STATUS_DEVICE_ERROR);
-        return FALSE;
+    if (self->s3t) {
+       for (thread = 0; thread < self->nb_threads; thread++) {
+           if (self->s3t[thread].s3 && !s3_use_ssl(self->s3t[thread].s3, new_val)) {
+               device_set_error(p_self, g_strdup_printf(_(
+                       "Error setting S3 SSL/TLS use "
+                       "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
+                   DEVICE_STATUS_DEVICE_ERROR);
+               return FALSE;
+           }
+       }
     }
     self->use_ssl = new_val;
 
     return device_simple_property_set_fn(p_self, base, val, surety, source);
 }
 
-static Device* 
+static gboolean
+s3_device_set_max_send_speed_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+    guint64 new_val;
+    int     thread;
+
+    new_val = g_value_get_uint64(val);
+    if (self->s3t) {
+       for (thread = 0; thread < self->nb_threads; thread++) {
+           if (self->s3t[thread].s3 && !s3_set_max_send_speed(self->s3t[thread].s3, new_val)) {
+               device_set_error(p_self,
+                       g_strdup("Could not set S3 maximum send speed"),
+                       DEVICE_STATUS_DEVICE_ERROR);
+               return FALSE;
+           }
+       }
+    }
+    self->max_send_speed = new_val;
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_max_recv_speed_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+    guint64 new_val;
+    int     thread;
+
+    new_val = g_value_get_uint64(val);
+    if (self->s3t) {
+       for (thread = 0; thread < self->nb_threads; thread++) {
+           if (self->s3t[thread].s3 &&
+               !s3_set_max_recv_speed(self->s3t[thread].s3, new_val)) {
+               device_set_error(p_self,
+                       g_strdup("Could not set S3 maximum recv speed"),
+                       DEVICE_STATUS_DEVICE_ERROR);
+               return FALSE;
+           }
+       }
+    }
+    self->max_recv_speed = new_val;
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_nb_threads_backup(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+    guint64 new_val;
+
+    new_val = g_value_get_uint64(val);
+    self->nb_threads_backup = new_val;
+    if (self->nb_threads_backup > self->nb_threads) {
+       self->nb_threads = self->nb_threads_backup;
+    }
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_nb_threads_recovery(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+    guint64 new_val;
+
+    new_val = g_value_get_uint64(val);
+    self->nb_threads_recovery = new_val;
+    if (self->nb_threads_recovery > self->nb_threads) {
+       self->nb_threads = self->nb_threads_recovery;
+    }
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+s3_device_set_max_volume_usage_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+
+    self->volume_limit = g_value_get_uint64(val);
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+
+}
+
+static gboolean
+s3_device_set_enforce_max_volume_usage_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+
+    self->enforce_volume_limit = g_value_get_boolean(val);
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+
+}
+
+static gboolean
+s3_device_set_use_subdomain_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+
+    self->use_subdomain = g_value_get_boolean(val);
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+
+static gboolean
+property_set_leom_fn(Device *p_self,
+    DevicePropertyBase *base, GValue *val,
+    PropertySurety surety, PropertySource source)
+{
+    S3Device *self = S3_DEVICE(p_self);
+
+    self->leom = g_value_get_boolean(val);
+
+    return device_simple_property_set_fn(p_self, base, val, surety, source);
+}
+static Device*
 s3_device_factory(char * device_name, char * device_type, char * device_node)
 {
     Device *rval;
-    S3Device * s3_rval;
-    g_assert(0 == strcmp(device_type, S3_DEVICE_NAME) ||
-             0 == strcmp(device_type, DEVPAY_DEVICE_NAME));
+    g_assert(0 == strcmp(device_type, S3_DEVICE_NAME));
     rval = DEVICE(g_object_new(TYPE_S3_DEVICE, NULL));
-    s3_rval = (S3Device*)rval;
 
     device_open_device(rval, device_name, device_type, device_node);
     return rval;
@@ -860,7 +1574,7 @@ s3_device_open_device(Device *pself, char *device_name,
 
     /* Device name may be bucket/prefix, to support multiple volumes in a
      * single bucket. */
-    name_colon = index(device_node, '/');
+    name_colon = strchr(device_node, '/');
     if (name_colon == NULL) {
         self->bucket = g_strdup(device_node);
         self->prefix = g_strdup("");
@@ -868,8 +1582,6 @@ s3_device_open_device(Device *pself, char *device_name,
         self->bucket = g_strndup(device_node, name_colon - device_node);
         self->prefix = g_strdup(name_colon + 1);
     }
-    
-    self->is_devpay = !strcmp(device_type, DEVPAY_DEVICE_NAME);
 
     if (self->bucket == NULL || self->bucket[0] == '\0') {
        device_set_error(pself,
@@ -884,6 +1596,7 @@ s3_device_open_device(Device *pself, char *device_name,
 
     /* default values */
     self->verbose = FALSE;
+    self->openstack_swift_api = FALSE;
 
     /* use SSL if available */
     self->use_ssl = s3_curl_supports_ssl();
@@ -900,49 +1613,224 @@ s3_device_open_device(Device *pself, char *device_name,
 
 static void s3_device_finalize(GObject * obj_self) {
     S3Device *self = S3_DEVICE (obj_self);
+    int thread;
 
     if(G_OBJECT_CLASS(parent_class)->finalize)
         (* G_OBJECT_CLASS(parent_class)->finalize)(obj_self);
 
-    if(self->s3) s3_free(self->s3);
+    if (self->thread_pool_delete) {
+       g_thread_pool_free(self->thread_pool_delete, 1, 1);
+       self->thread_pool_delete = NULL;
+    }
+    if (self->thread_pool_write) {
+       g_thread_pool_free(self->thread_pool_write, 1, 1);
+       self->thread_pool_write = NULL;
+    }
+    if (self->thread_pool_read) {
+       g_thread_pool_free(self->thread_pool_read, 1, 1);
+       self->thread_pool_read = NULL;
+    }
+    if (self->thread_idle_mutex) {
+       g_mutex_free(self->thread_idle_mutex);
+       self->thread_idle_mutex = NULL;
+    }
+    if (self->thread_idle_cond) {
+       g_cond_free(self->thread_idle_cond);
+       self->thread_idle_cond = NULL;
+    }
+    if (self->s3t) {
+       for (thread = 0; thread < self->nb_threads; thread++) {
+            if(self->s3t[thread].s3) s3_free(self->s3t[thread].s3);
+       }
+       g_free(self->s3t);
+    }
     if(self->bucket) g_free(self->bucket);
     if(self->prefix) g_free(self->prefix);
     if(self->access_key) g_free(self->access_key);
     if(self->secret_key) g_free(self->secret_key);
+    if(self->swift_account_id) g_free(self->swift_account_id);
+    if(self->swift_access_key) g_free(self->swift_access_key);
+    if(self->host) g_free(self->host);
+    if(self->service_path) g_free(self->service_path);
     if(self->user_token) g_free(self->user_token);
     if(self->bucket_location) g_free(self->bucket_location);
+    if(self->storage_class) g_free(self->storage_class);
+    if(self->server_side_encryption) g_free(self->server_side_encryption);
+    if(self->ca_info) g_free(self->ca_info);
 }
 
 static gboolean setup_handle(S3Device * self) {
     Device *d_self = DEVICE(self);
-    if (self->s3 == NULL) {
-        if (self->access_key == NULL)
+    int thread;
+    guint response_code;
+    s3_error_code_t s3_error_code;
+    CURLcode curl_code;
+
+    if (self->s3t == NULL) {
+       self->s3t = g_new(S3_by_thread, self->nb_threads);
+       if (self->s3t == NULL) {
+           device_set_error(d_self,
+               stralloc(_("Can't allocate S3Handle array")),
+               DEVICE_STATUS_DEVICE_ERROR);
             return FALSE;
-       if (self->secret_key == NULL)
+       }
+
+       if (!self->openstack_swift_api) {
+           if (self->access_key == NULL || self->access_key[0] == '\0') {
+               device_set_error(d_self,
+                   g_strdup(_("No Amazon access key specified")),
+                   DEVICE_STATUS_DEVICE_ERROR);
+               return FALSE;
+           }
+
+           if (self->secret_key == NULL || self->secret_key[0] == '\0') {
+               device_set_error(d_self,
+                   g_strdup(_("No Amazon secret key specified")),
+                   DEVICE_STATUS_DEVICE_ERROR);
+               return FALSE;
+           }
+       } else {
+           if (self->swift_account_id == NULL ||
+               self->swift_account_id[0] == '\0') {
+               device_set_error(d_self,
+                   g_strdup(_("No Swift account id specified")),
+                   DEVICE_STATUS_DEVICE_ERROR);
+               return FALSE;
+           }
+            if (self->swift_access_key == NULL ||
+               self->swift_access_key[0] == '\0') {
+               device_set_error(d_self,
+                   g_strdup(_("No Swift access key specified")),
+                   DEVICE_STATUS_DEVICE_ERROR);
+               return FALSE;
+           }
+       }
+
+       if (!self->use_ssl && self->ca_info) {
+           amfree(self->ca_info);
+       }
+
+       self->thread_idle_cond = g_cond_new();
+       self->thread_idle_mutex = g_mutex_new();
+
+       for (thread = 0; thread < self->nb_threads; thread++) {
+           self->s3t[thread].idle = 1;
+           self->s3t[thread].done = 1;
+           self->s3t[thread].eof = FALSE;
+           self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
+           self->s3t[thread].errmsg = NULL;
+           self->s3t[thread].filename = NULL;
+           self->s3t[thread].curl_buffer.buffer = NULL;
+           self->s3t[thread].curl_buffer.buffer_len = 0;
+            self->s3t[thread].s3 = s3_open(self->access_key, self->secret_key,
+                                          self->swift_account_id,
+                                          self->swift_access_key,
+                                          self->host, self->service_path,
+                                          self->use_subdomain,
+                                          self->user_token, self->bucket_location,
+                                          self->storage_class, self->ca_info,
+                                          self->server_side_encryption,
+                                          self->openstack_swift_api);
+            if (self->s3t[thread].s3 == NULL) {
+               device_set_error(d_self,
+                   stralloc(_("Internal error creating S3 handle")),
+                   DEVICE_STATUS_DEVICE_ERROR);
+               self->nb_threads = thread+1;
+                return FALSE;
+            } else if (self->openstack_swift_api) {
+               s3_error(self->s3t[0].s3, NULL, &response_code,
+                       &s3_error_code, NULL, &curl_code, NULL);
+               if (response_code != 200) {
+                   device_set_error(d_self,
+                       g_strdup_printf(_("Internal error creating S3 handle: %s"),
+                                       s3_strerror(self->s3t[0].s3)),
+                       DEVICE_STATUS_DEVICE_ERROR);
+                   self->nb_threads = thread+1;
+                   return FALSE;
+               }
+           }
+        }
+
+       g_debug("Create %d threads", self->nb_threads);
+       self->thread_pool_delete = g_thread_pool_new(s3_thread_delete_block,
+                                                    self, self->nb_threads, 0,
+                                                    NULL);
+       self->thread_pool_write = g_thread_pool_new(s3_thread_write_block, self,
+                                             self->nb_threads, 0, NULL);
+       self->thread_pool_read = g_thread_pool_new(s3_thread_read_block, self,
+                                             self->nb_threads, 0, NULL);
+    }
+
+    for (thread = 0; thread < self->nb_threads; thread++) {
+       s3_verbose(self->s3t[thread].s3, self->verbose);
+
+       if (!s3_use_ssl(self->s3t[thread].s3, self->use_ssl)) {
+           device_set_error(d_self, g_strdup_printf(_(
+                "Error setting S3 SSL/TLS use "
+                "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
+               DEVICE_STATUS_DEVICE_ERROR);
             return FALSE;
-       if (self->is_devpay && self->user_token == NULL)
+       }
+
+       if (self->max_send_speed &&
+           !s3_set_max_send_speed(self->s3t[thread].s3, self->max_send_speed)) {
+           device_set_error(d_self,
+               g_strdup("Could not set S3 maximum send speed"),
+               DEVICE_STATUS_DEVICE_ERROR);
             return FALSE;
+       }
 
-        self->s3 = s3_open(self->access_key, self->secret_key, self->user_token,
-            self->bucket_location);
-        if (self->s3 == NULL) {
+       if (self->max_recv_speed &&
+           !s3_set_max_recv_speed(self->s3t[thread].s3, self->max_recv_speed)) {
            device_set_error(d_self,
-               stralloc(_("Internal error creating S3 handle")),
+               g_strdup("Could not set S3 maximum recv speed"),
                DEVICE_STATUS_DEVICE_ERROR);
             return FALSE;
-        }
+       }
     }
 
-    s3_verbose(self->s3, self->verbose);
+    return TRUE;
+}
 
-    if (!s3_use_ssl(self->s3, self->use_ssl)) {
-       device_set_error(d_self, g_strdup_printf(_(
-                "Error setting S3 SSL/TLS use "
-                "(tried to enable SSL/TLS for S3, but curl doesn't support it?)")),
-           DEVICE_STATUS_DEVICE_ERROR);
-        return FALSE;
+static gboolean
+make_bucket(
+    Device * pself)
+{
+    S3Device *self = S3_DEVICE(pself);
+    guint response_code;
+    s3_error_code_t s3_error_code;
+    CURLcode curl_code;
+
+    if (s3_is_bucket_exists(self->s3t[0].s3, self->bucket)) {
+       return TRUE;
+    }
+
+    s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, &curl_code, NULL);
+
+    if (response_code == 0 && s3_error_code == 0 &&
+       (curl_code == CURLE_COULDNT_CONNECT ||
+        curl_code == CURLE_COULDNT_RESOLVE_HOST)) {
+       device_set_error(pself,
+           g_strdup_printf(_("While connecting to S3 bucket: %s"),
+                           s3_strerror(self->s3t[0].s3)),
+               DEVICE_STATUS_DEVICE_ERROR);
+       return FALSE;
     }
 
+    if (!s3_make_bucket(self->s3t[0].s3, self->bucket)) {
+        s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
+
+        /* if it isn't an expected error (bucket already exists),
+         * return FALSE */
+        if (response_code != 409 ||
+            (s3_error_code != S3_ERROR_BucketAlreadyExists &&
+            s3_error_code != S3_ERROR_BucketAlreadyOwnedByYou)) {
+           device_set_error(pself,
+               g_strdup_printf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3t[0].s3)),
+               DEVICE_STATUS_DEVICE_ERROR);
+            return FALSE;
+        }
+    }
     return TRUE;
 }
 
@@ -952,30 +1840,40 @@ s3_device_read_label(Device *pself) {
     char *key;
     CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
     dumpfile_t *amanda_header;
-
     /* note that this may be called from s3_device_start, when
      * self->access_mode is not ACCESS_NULL */
 
     amfree(pself->volume_label);
     amfree(pself->volume_time);
-    amfree(pself->volume_header);
+    dumpfile_free(pself->volume_header);
+    pself->volume_header = NULL;
 
     if (device_in_error(self)) return pself->status;
 
     if (!setup_handle(self)) {
-       device_set_error(pself, stralloc(_("Error setting up S3 interface")), DEVICE_STATUS_DEVICE_ERROR);
+        /* setup_handle already set our error message */
        return pself->status;
     }
+    reset_thread(self);
 
     key = special_file_to_key(self, "tapestart", -1);
-    if (!s3_read(self->s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) {
+
+    if (!make_bucket(pself)) {
+       return pself->status;
+    }
+
+    if (!s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL)) {
         guint response_code;
         s3_error_code_t s3_error_code;
-        s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
+        s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
 
         /* if it's an expected error (not found), just return FALSE */
-        if (response_code == 404 && 
-             (s3_error_code == S3_ERROR_NoSuchKey || s3_error_code == S3_ERROR_NoSuchBucket)) {
+        if (response_code == 404 &&
+             (s3_error_code == S3_ERROR_None ||
+              s3_error_code == S3_ERROR_Unknown ||
+             s3_error_code == S3_ERROR_NoSuchKey ||
+             s3_error_code == S3_ERROR_NoSuchEntity ||
+             s3_error_code == S3_ERROR_NoSuchBucket)) {
             g_debug(_("Amanda header not found while reading tapestart header (this is expected for empty tapes)"));
            device_set_error(pself,
                stralloc(_("Amanda header not found -- unlabeled volume?")),
@@ -987,16 +1885,22 @@ s3_device_read_label(Device *pself) {
 
         /* otherwise, log it and return */
        device_set_error(pself,
-           vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3)),
+           vstrallocf(_("While trying to read tapestart header: %s"), s3_strerror(self->s3t[0].s3)),
            DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
         return pself->status;
     }
 
+    /* handle an empty file gracefully */
+    if (buf.buffer_len == 0) {
+       device_set_error(pself, stralloc(_("Empty header file")), DEVICE_STATUS_VOLUME_ERROR);
+        return pself->status;
+    }
+
+    pself->header_block_size = buf.buffer_len;
     g_assert(buf.buffer != NULL);
     amanda_header = g_new(dumpfile_t, 1);
     parse_file_header(buf.buffer, amanda_header, buf.buffer_pos);
     pself->volume_header = amanda_header;
-
     g_free(buf.buffer);
 
     if (amanda_header->type != F_TAPESTART) {
@@ -1013,41 +1917,30 @@ s3_device_read_label(Device *pself) {
     return pself->status;
 }
 
-static gboolean 
+static gboolean
 s3_device_start (Device * pself, DeviceAccessMode mode,
                  char * label, char * timestamp) {
     S3Device * self;
-    int file, last_file;
+    GSList *keys;
+    guint64 total_size = 0;
+    gboolean result;
 
     self = S3_DEVICE(pself);
 
     if (device_in_error(self)) return FALSE;
 
     if (!setup_handle(self)) {
-       device_set_error(pself,
-           stralloc(_("Error setting up S3 interface")),
-           DEVICE_STATUS_DEVICE_ERROR);
+        /* setup_handle already set our error message */
        return FALSE;
     }
 
+    reset_thread(self);
     pself->access_mode = mode;
     pself->in_file = FALSE;
 
     /* try creating the bucket, in case it doesn't exist */
-    if (mode != ACCESS_READ && !s3_make_bucket(self->s3, self->bucket)) {
-        guint response_code;
-        s3_error_code_t s3_error_code;
-        s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
-
-        /* if it isn't an expected error (bucket already exists),
-         * return FALSE */
-        if (response_code != 409 ||
-            s3_error_code != S3_ERROR_BucketAlreadyExists) {
-           device_set_error(pself,
-               vstrallocf(_("While creating new S3 bucket: %s"), s3_strerror(self->s3)),
-               DEVICE_STATUS_DEVICE_ERROR);
-            return FALSE;
-        }
+    if (!make_bucket(pself)) {
+       return FALSE;
     }
 
     /* take care of any dirty work for this mode */
@@ -1060,14 +1953,7 @@ s3_device_start (Device * pself, DeviceAccessMode mode,
             break;
 
         case ACCESS_WRITE:
-            /* delete all files */
-            last_file = find_last_file(self);
-            if (last_file < 0) return FALSE;
-            for (file = 0; file <= last_file; file++) {
-                if (!delete_file(self, file))
-                   /* delete_file already set our error message */
-                   return FALSE;
-            }
+            delete_all_files(self);
 
             /* write a new amanda header */
             if (!write_amanda_header(self, label, timestamp)) {
@@ -1085,7 +1971,17 @@ s3_device_start (Device * pself, DeviceAccessMode mode,
            if (pself->volume_label == NULL && s3_device_read_label(pself) != DEVICE_STATUS_SUCCESS) {
                /* s3_device_read_label already set our error message */
                return FALSE;
-           }
+           } else {
+                result = s3_list_keys(self->s3t[0].s3, self->bucket, NULL, NULL, &keys, &total_size);
+                if(!result) {
+                    device_set_error(pself,
+                                 vstrallocf(_("While listing S3 keys: %s"), s3_strerror(self->s3t[0].s3)),
+                                 DEVICE_STATUS_DEVICE_ERROR|DEVICE_STATUS_VOLUME_ERROR);
+                    return FALSE;
+                } else {
+                    self->volume_bytes = total_size;
+                }
+            }
             return seek_to_end(self);
             break;
 
@@ -1097,12 +1993,18 @@ s3_device_start (Device * pself, DeviceAccessMode mode,
 }
 
 static gboolean
-s3_device_finish (Device * pself) {
-    if (device_in_error(pself)) return FALSE;
+s3_device_finish (
+    Device * pself)
+{
+    S3Device *self = S3_DEVICE(pself);
+
+    reset_thread(self);
 
     /* we're not in a file anymore */
     pself->access_mode = ACCESS_NULL;
 
+    if (device_in_error(pself)) return FALSE;
+
     return TRUE;
 }
 
@@ -1113,76 +2015,202 @@ static gboolean
 s3_device_start_file (Device *pself, dumpfile_t *jobInfo) {
     S3Device *self = S3_DEVICE(pself);
     CurlBuffer amanda_header = {NULL, 0, 0, 0};
-    gboolean header_fits, result;
-    char *key;
+    gboolean result;
+    size_t header_size;
+    char  *key;
+    int    thread;
 
     if (device_in_error(self)) return FALSE;
 
+    reset_thread(self);
+    pself->is_eom = FALSE;
+
     /* Set the blocksize to zero, since there's no header to skip (it's stored
      * in a distinct file, rather than block zero) */
     jobInfo->blocksize = 0;
 
     /* Build the amanda header. */
+    header_size = 0; /* no minimum size */
     amanda_header.buffer = device_build_amanda_header(pself, jobInfo,
-        (int *) &amanda_header.buffer_len, &header_fits);
-    if (!header_fits) {
+        &header_size);
+    if (amanda_header.buffer == NULL) {
        device_set_error(pself,
            stralloc(_("Amanda file header won't fit in a single block!")),
            DEVICE_STATUS_DEVICE_ERROR);
        return FALSE;
     }
+    amanda_header.buffer_len = header_size;
 
+    if(check_at_leom(self, header_size))
+        pself->is_eom = TRUE;
+
+    if(check_at_peom(self, header_size)) {
+        pself->is_eom = TRUE;
+        device_set_error(pself,
+            stralloc(_("No space left on device")),
+            DEVICE_STATUS_DEVICE_ERROR);
+        g_free(amanda_header.buffer);
+        return FALSE;
+    }
     /* set the file and block numbers correctly */
     pself->file = (pself->file > 0)? pself->file+1 : 1;
     pself->block = 0;
     pself->in_file = TRUE;
-
     /* write it out as a special block (not the 0th) */
     key = special_file_to_key(self, "filestart", pself->file);
-    result = s3_upload(self->s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
+    result = s3_upload(self->s3t[0].s3, self->bucket, key, S3_BUFFER_READ_FUNCS,
                        &amanda_header, NULL, NULL);
     g_free(amanda_header.buffer);
     g_free(key);
     if (!result) {
        device_set_error(pself,
-           vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3)),
+           vstrallocf(_("While writing filestart header: %s"), s3_strerror(self->s3t[0].s3)),
            DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
         return FALSE;
     }
 
+    self->volume_bytes += header_size;
+    for (thread = 0; thread < self->nb_threads; thread++)  {
+       self->s3t[thread].idle = 1;
+    }
+
     return TRUE;
 }
 
 static gboolean
 s3_device_write_block (Device * pself, guint size, gpointer data) {
-    gboolean result;
     char *filename;
     S3Device * self = S3_DEVICE(pself);
-    CurlBuffer to_write = {data, size, 0, 0};
+    int idle_thread = 0;
+    int thread = -1;
+    int first_idle = -1;
 
     g_assert (self != NULL);
     g_assert (data != NULL);
     if (device_in_error(self)) return FALSE;
-    
-    filename = file_and_block_to_key(self, pself->file, pself->block);
 
-    result = s3_upload(self->s3, self->bucket, filename, S3_BUFFER_READ_FUNCS,
-        &to_write, NULL, NULL);
-    g_free(filename);
-    if (!result) {
-       device_set_error(pself,
-           vstrallocf(_("While writing data block to S3: %s"), s3_strerror(self->s3)),
-           DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR);
+    if(check_at_leom(self, size))
+        pself->is_eom = TRUE;
+
+    if(check_at_peom(self, size)) {
+        pself->is_eom = TRUE;
+        device_set_error(pself,
+            stralloc(_("No space left on device")),
+            DEVICE_STATUS_DEVICE_ERROR);
         return FALSE;
     }
 
-    pself->block++;
+    filename = file_and_block_to_key(self, pself->file, pself->block);
+
+    g_mutex_lock(self->thread_idle_mutex);
+    while (!idle_thread) {
+       idle_thread = 0;
+       for (thread = 0; thread < self->nb_threads_backup; thread++)  {
+           if (self->s3t[thread].idle == 1) {
+               idle_thread++;
+               /* Check if the thread is in error */
+               if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
+                   device_set_error(pself, (char *)self->s3t[thread].errmsg,
+                                    self->s3t[thread].errflags);
+                   self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
+                   self->s3t[thread].errmsg = NULL;
+                   g_mutex_unlock(self->thread_idle_mutex);
+                   return FALSE;
+               }
+               if (first_idle == -1) {
+                   first_idle = thread;
+                   break;
+               }
+           }
+       }
+       if (!idle_thread) {
+           g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+       }
+    }
+    thread = first_idle;
+
+    self->s3t[thread].idle = 0;
+    self->s3t[thread].done = 0;
+    if (self->s3t[thread].curl_buffer.buffer &&
+       self->s3t[thread].curl_buffer.buffer_len < size) {
+       g_free((char *)self->s3t[thread].curl_buffer.buffer);
+       self->s3t[thread].curl_buffer.buffer = NULL;
+       self->s3t[thread].curl_buffer.buffer_len = 0;
+       self->s3t[thread].buffer_len = 0;
+    }
+    if (self->s3t[thread].curl_buffer.buffer == NULL) {
+       self->s3t[thread].curl_buffer.buffer = g_malloc(size);
+       self->s3t[thread].curl_buffer.buffer_len = size;
+       self->s3t[thread].buffer_len = size;
+    }
+    memcpy((char *)self->s3t[thread].curl_buffer.buffer, data, size);
+    self->s3t[thread].curl_buffer.buffer_pos = 0;
+    self->s3t[thread].curl_buffer.buffer_len = size;
+    self->s3t[thread].curl_buffer.max_buffer_size = 0;
+    self->s3t[thread].filename = filename;
+    g_thread_pool_push(self->thread_pool_write, &self->s3t[thread], NULL);
+    g_mutex_unlock(self->thread_idle_mutex);
 
+    pself->block++;
+    self->volume_bytes += size;
     return TRUE;
 }
 
+static void
+s3_thread_write_block(
+    gpointer thread_data,
+    gpointer data)
+{
+    S3_by_thread *s3t = (S3_by_thread *)thread_data;
+    Device *pself = (Device *)data;
+    S3Device *self = S3_DEVICE(pself);
+    gboolean result;
+
+    result = s3_upload(s3t->s3, self->bucket, (char *)s3t->filename,
+                      S3_BUFFER_READ_FUNCS, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
+    g_free((void *)s3t->filename);
+    s3t->filename = NULL;
+    if (!result) {
+       s3t->errflags = DEVICE_STATUS_DEVICE_ERROR | DEVICE_STATUS_VOLUME_ERROR;
+       s3t->errmsg = g_strdup_printf(_("While writing data block to S3: %s"), s3_strerror(s3t->s3));
+    }
+    g_mutex_lock(self->thread_idle_mutex);
+    s3t->idle = 1;
+    s3t->done = 1;
+    s3t->curl_buffer.buffer_len = s3t->buffer_len;
+    g_cond_broadcast(self->thread_idle_cond);
+    g_mutex_unlock(self->thread_idle_mutex);
+}
+
 static gboolean
 s3_device_finish_file (Device * pself) {
+    S3Device *self = S3_DEVICE(pself);
+
+    /* Check all threads are done */
+    int idle_thread = 0;
+    int thread;
+
+    g_mutex_lock(self->thread_idle_mutex);
+    while (idle_thread != self->nb_threads) {
+       idle_thread = 0;
+       for (thread = 0; thread < self->nb_threads; thread++)  {
+           if (self->s3t[thread].idle == 1) {
+               idle_thread++;
+           }
+           /* check thread status */
+           if (self->s3t[thread].errflags != DEVICE_STATUS_SUCCESS) {
+               device_set_error(pself, (char *)self->s3t[thread].errmsg,
+                                self->s3t[thread].errflags);
+               self->s3t[thread].errflags = DEVICE_STATUS_SUCCESS;
+               self->s3t[thread].errmsg = NULL;
+           }
+       }
+       if (idle_thread != self->nb_threads) {
+           g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+       }
+    }
+    g_mutex_unlock(self->thread_idle_mutex);
+
     if (device_in_error(pself)) return FALSE;
 
     /* we're not in a file anymore */
@@ -1196,10 +2224,67 @@ s3_device_recycle_file(Device *pself, guint file) {
     S3Device *self = S3_DEVICE(pself);
     if (device_in_error(self)) return FALSE;
 
-    return delete_file(self, file);
+    reset_thread(self);
+    delete_file(self, file);
+    s3_wait_thread_delete(self);
+    return !device_in_error(self);
     /* delete_file already set our error message if necessary */
 }
 
+static gboolean
+s3_device_erase(Device *pself) {
+    S3Device *self = S3_DEVICE(pself);
+    char *key = NULL;
+    const char *errmsg = NULL;
+    guint response_code;
+    s3_error_code_t s3_error_code;
+
+    if (!setup_handle(self)) {
+        /* error set by setup_handle */
+        return FALSE;
+    }
+
+    reset_thread(self);
+    key = special_file_to_key(self, "tapestart", -1);
+    if (!s3_delete(self->s3t[0].s3, self->bucket, key)) {
+        s3_error(self->s3t[0].s3, &errmsg, NULL, NULL, NULL, NULL, NULL);
+       device_set_error(pself,
+           stralloc(errmsg),
+           DEVICE_STATUS_DEVICE_ERROR);
+        return FALSE;
+    }
+    g_free(key);
+
+    dumpfile_free(pself->volume_header);
+    pself->volume_header = NULL;
+
+    if (!delete_all_files(self))
+        return FALSE;
+
+    device_set_error(pself, g_strdup("Unlabeled volume"),
+                    DEVICE_STATUS_VOLUME_UNLABELED);
+
+    if (!s3_delete_bucket(self->s3t[0].s3, self->bucket)) {
+        s3_error(self->s3t[0].s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
+
+        /*
+         * ignore the error if the bucket isn't empty (there may be data from elsewhere)
+         * or the bucket not existing (already deleted perhaps?)
+         */
+        if (!(
+                (response_code == 409 && s3_error_code == S3_ERROR_BucketNotEmpty) ||
+                (response_code == 404 && s3_error_code == S3_ERROR_NoSuchBucket))) {
+
+            device_set_error(pself,
+               stralloc(errmsg),
+               DEVICE_STATUS_DEVICE_ERROR);
+            return FALSE;
+        }
+    }
+    self->volume_bytes = 0;
+    return TRUE;
+}
+
 /* functions for reading */
 
 static dumpfile_t*
@@ -1210,27 +2295,34 @@ s3_device_seek_file(Device *pself, guint file) {
     CurlBuffer buf = {NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE};
     dumpfile_t *amanda_header;
     const char *errmsg = NULL;
+    int thread;
 
     if (device_in_error(self)) return NULL;
 
+    reset_thread(self);
+
     pself->file = file;
     pself->is_eof = FALSE;
     pself->in_file = FALSE;
     pself->block = 0;
+    self->next_block_to_read = 0;
 
     /* read it in */
     key = special_file_to_key(self, "filestart", pself->file);
-    result = s3_read(self->s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS,
+    result = s3_read(self->s3t[0].s3, self->bucket, key, S3_BUFFER_WRITE_FUNCS,
         &buf, NULL, NULL);
     g_free(key);
+
     if (!result) {
         guint response_code;
         s3_error_code_t s3_error_code;
-        s3_error(self->s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL);
+        s3_error(self->s3t[0].s3, &errmsg, &response_code, &s3_error_code, NULL, NULL, NULL);
 
         /* if it's an expected error (not found), check what to do. */
-        if (response_code == 404 && s3_error_code == S3_ERROR_NoSuchKey) {
+        if (response_code == 404 &&
+            (s3_error_code == S3_ERROR_None ||
+            s3_error_code == S3_ERROR_NoSuchKey ||
+            s3_error_code == S3_ERROR_NoSuchEntity)) {
             int next_file;
             next_file = find_next_file(self, pself->file);
             if (next_file > 0) {
@@ -1239,7 +2331,7 @@ s3_device_seek_file(Device *pself, guint file) {
             } else if (next_file == 0) {
                 /* No next file. Check if we are one past the end. */
                 key = special_file_to_key(self, "filestart", pself->file - 1);
-                result = s3_read(self->s3, self->bucket, key,
+                result = s3_read(self->s3t[0].s3, self->bucket, key,
                     S3_BUFFER_WRITE_FUNCS, &buf, NULL, NULL);
                 g_free(key);
                 if (result) {
@@ -1260,7 +2352,7 @@ s3_device_seek_file(Device *pself, guint file) {
             return NULL;
         }
     }
-   
+
     /* and make a dumpfile_t out of it */
     g_assert(buf.buffer != NULL);
     amanda_header = g_new(dumpfile_t, 1);
@@ -1283,152 +2375,230 @@ s3_device_seek_file(Device *pself, guint file) {
     }
 
     pself->in_file = TRUE;
+    for (thread = 0; thread < self->nb_threads; thread++)  {
+       self->s3t[thread].idle = 1;
+       self->s3t[thread].eof = FALSE;
+    }
     return amanda_header;
 }
 
 static gboolean
 s3_device_seek_block(Device *pself, guint64 block) {
+    S3Device * self = S3_DEVICE(pself);
     if (device_in_error(pself)) return FALSE;
 
+    reset_thread(self);
     pself->block = block;
+    self->next_block_to_read = block;
     return TRUE;
 }
 
-typedef struct s3_read_block_data {
-    gpointer data;
-    int size_req;
-    int size_written;
-
-    CurlBuffer curl;
-} s3_read_block_data;
-
-/* wrapper around s3_buffer_write_func to write as much data as possible to
- * the user's buffer, and switch to a dynamically allocated buffer if that
- * isn't large enough */
-static size_t
-s3_read_block_write_func(void *ptr, size_t size, size_t nmemb, void *stream)
-{
-    s3_read_block_data *dat = stream;
-    guint new_bytes, bytes_needed;
-
-    /* if data is NULL, call through to s3_buffer_write_func */
-    if (!dat->data) {
-       return s3_buffer_write_func(ptr, size, nmemb, (void *)(&dat->curl));
-    }
-
-    new_bytes = (guint) size * nmemb;
-    bytes_needed = dat->size_written + new_bytes;
-
-    if (bytes_needed > (guint)dat->size_written) {
-       /* this read will overflow the user's buffer, so malloc ourselves
-        * a new buffer and keep reading */
-       dat->curl.buffer = g_malloc(bytes_needed);
-       dat->curl.buffer_len = bytes_needed;
-       dat->curl.buffer_pos = dat->size_written;
-       memcpy(dat->curl.buffer, dat->data, dat->size_written);
-       dat->data = NULL; /* signal that the user's buffer is too small */
-       return s3_buffer_write_func(ptr, size, nmemb, (void *)(&dat->curl));
-    }
-
-    memcpy(dat->data + dat->size_written, ptr, bytes_needed);
-    return new_bytes;
-}
-
 static int
 s3_device_read_block (Device * pself, gpointer data, int *size_req) {
     S3Device * self = S3_DEVICE(pself);
     char *key;
-    s3_read_block_data dat = {NULL, 0, 0, { NULL, 0, 0, S3_DEVICE_MAX_BLOCK_SIZE} };
-    gboolean result;
+    int thread;
+    int done = 0;
 
     g_assert (self != NULL);
     if (device_in_error(self)) return -1;
 
+    g_mutex_lock(self->thread_idle_mutex);
+    /* start a read ahead for each thread */
+    for (thread = 0; thread < self->nb_threads_recovery; thread++) {
+       S3_by_thread *s3t = &self->s3t[thread];
+       if (s3t->idle) {
+           key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
+           s3t->filename = key;
+           s3t->done = 0;
+           s3t->idle = 0;
+           s3t->eof = FALSE;
+           s3t->errflags = DEVICE_STATUS_SUCCESS;
+           if (self->s3t[thread].curl_buffer.buffer &&
+               (int)self->s3t[thread].curl_buffer.buffer_len < *size_req) {
+               g_free(self->s3t[thread].curl_buffer.buffer);
+               self->s3t[thread].curl_buffer.buffer = NULL;
+               self->s3t[thread].curl_buffer.buffer_len = 0;
+               self->s3t[thread].buffer_len = 0;
+           }
+           if (!self->s3t[thread].curl_buffer.buffer) {
+               self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
+               self->s3t[thread].curl_buffer.buffer_len = *size_req;
+               self->s3t[thread].buffer_len = *size_req;
+           }
+           s3t->curl_buffer.buffer_pos = 0;
+           s3t->curl_buffer.max_buffer_size = S3_DEVICE_MAX_BLOCK_SIZE;
+           self->next_block_to_read++;
+           g_thread_pool_push(self->thread_pool_read, s3t, NULL);
+       }
+    }
+
     /* get the file*/
     key = file_and_block_to_key(self, pself->file, pself->block);
     g_assert(key != NULL);
-    if (self->cached_key && (0 == strcmp(key, self->cached_key))) {
-       if (*size_req >= self->cached_size) {
-           /* use the cached copy and clear the cache */
-           memcpy(data, self->cached_buf, self->cached_size);
-           *size_req = self->cached_size;
-
-           g_free(key);
-           g_free(self->cached_key);
-           self->cached_key = NULL;
-           g_free(self->cached_buf);
-           self->cached_buf = NULL;
-
-           pself->block++;
-           return *size_req;
-       } else {
-           *size_req = self->cached_size;
-           g_free(key);
-           return 0;
+    while (!done) {
+       /* find which thread read the key */
+       for (thread = 0; thread < self->nb_threads_recovery; thread++) {
+           S3_by_thread *s3t;
+           s3t = &self->s3t[thread];
+           if (!s3t->idle &&
+               s3t->done &&
+               strcmp(key, (char *)s3t->filename) == 0) {
+               if (s3t->eof) {
+                   /* return eof */
+                   g_free(key);
+                   pself->is_eof = TRUE;
+                   pself->in_file = FALSE;
+                   device_set_error(pself, stralloc(_("EOF")),
+                                    DEVICE_STATUS_SUCCESS);
+                   g_mutex_unlock(self->thread_idle_mutex);
+                   return -1;
+               } else if (s3t->errflags != DEVICE_STATUS_SUCCESS) {
+                   /* return the error */
+                   device_set_error(pself, (char *)s3t->errmsg, s3t->errflags);
+                   g_free(key);
+                   g_mutex_unlock(self->thread_idle_mutex);
+                   return -1;
+
+               } else if ((guint)*size_req >= s3t->curl_buffer.buffer_pos) {
+                   /* return the buffer */
+                   g_mutex_unlock(self->thread_idle_mutex);
+                   memcpy(data, s3t->curl_buffer.buffer,
+                                s3t->curl_buffer.buffer_pos);
+                   *size_req = s3t->curl_buffer.buffer_pos;
+                   g_free(key);
+                   s3t->idle = 1;
+                   g_free((char *)s3t->filename);
+                   pself->block++;
+                   done = 1;
+                   g_mutex_lock(self->thread_idle_mutex);
+                   break;
+               } else { /* buffer not enough large */
+                   *size_req = s3t->curl_buffer.buffer_len;
+                   g_free(key);
+                   g_mutex_unlock(self->thread_idle_mutex);
+                   return 0;
+               }
+           }
+       }
+       if (!done) {
+           g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
        }
     }
 
-    /* clear the cache, as it's useless to us */
-    if (self->cached_key) {
-       g_free(self->cached_key);
-       self->cached_key = NULL;
-
-       g_free(self->cached_buf);
-       self->cached_buf = NULL;
+    /* start a read ahead for the thread */
+    for (thread = 0; thread < self->nb_threads_recovery; thread++) {
+       S3_by_thread *s3t = &self->s3t[thread];
+       if (s3t->idle) {
+           key = file_and_block_to_key(self, pself->file, self->next_block_to_read);
+           s3t->filename = key;
+           s3t->done = 0;
+           s3t->idle = 0;
+           s3t->eof = FALSE;
+           s3t->errflags = DEVICE_STATUS_SUCCESS;
+           if (!self->s3t[thread].curl_buffer.buffer) {
+               self->s3t[thread].curl_buffer.buffer = g_malloc(*size_req);
+               self->s3t[thread].curl_buffer.buffer_len = *size_req;
+           }
+           s3t->curl_buffer.buffer_pos = 0;
+           self->next_block_to_read++;
+           g_thread_pool_push(self->thread_pool_read, s3t, NULL);
+       }
     }
+    g_mutex_unlock(self->thread_idle_mutex);
 
-    /* set up dat for the write_func callback */
-    if (!data || *size_req <= 0) {
-       dat.data = NULL;
-       dat.size_req = 0;
-    } else {
-       dat.data = data;
-       dat.size_req = *size_req;
-    }
+    return *size_req;
 
-    result = s3_read(self->s3, self->bucket, key, s3_read_block_write_func,
-        s3_buffer_reset_func, &dat, NULL, NULL);
+}
+
+static void
+s3_thread_read_block(
+    gpointer thread_data,
+    gpointer data)
+{
+    S3_by_thread *s3t = (S3_by_thread *)thread_data;
+    Device *pself = (Device *)data;
+    S3Device *self = S3_DEVICE(pself);
+    gboolean result;
+
+    result = s3_read(s3t->s3, self->bucket, (char *)s3t->filename, s3_buffer_write_func,
+        s3_buffer_reset_func, (CurlBuffer *)&s3t->curl_buffer, NULL, NULL);
+
+    g_mutex_lock(self->thread_idle_mutex);
     if (!result) {
        guint response_code;
        s3_error_code_t s3_error_code;
-       s3_error(self->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
-
-       g_free(key);
-       key = NULL;
-
+       s3_error(s3t->s3, NULL, &response_code, &s3_error_code, NULL, NULL, NULL);
        /* if it's an expected error (not found), just return -1 */
-       if (response_code == 404 && s3_error_code == S3_ERROR_NoSuchKey) {
-           pself->is_eof = TRUE;
-           pself->in_file = FALSE;
-           device_set_error(pself,
-               stralloc(_("EOF")),
-               DEVICE_STATUS_SUCCESS);
-           return -1;
+       if (response_code == 404 &&
+            (s3_error_code == S3_ERROR_None ||
+            s3_error_code == S3_ERROR_Unknown ||
+            s3_error_code == S3_ERROR_NoSuchKey ||
+            s3_error_code == S3_ERROR_NoSuchEntity)) {
+           s3t->eof = TRUE;
+       } else {
+
+           /* otherwise, log it and return FALSE */
+           s3t->errflags = DEVICE_STATUS_VOLUME_ERROR;
+           s3t->errmsg = g_strdup_printf(_("While reading data block from S3: %s"),
+                                         s3_strerror(s3t->s3));
        }
+    }
+    s3t->done = 1;
+    g_cond_broadcast(self->thread_idle_cond);
+    g_mutex_unlock(self->thread_idle_mutex);
 
-       /* otherwise, log it and return FALSE */
-       device_set_error(pself,
-           vstrallocf(_("While reading data block from S3: %s"), s3_strerror(self->s3)),
-           DEVICE_STATUS_VOLUME_ERROR);
-       return -1;
+    return;
+}
+
+static gboolean
+check_at_peom(S3Device *self, guint64 size)
+{
+    if(self->enforce_volume_limit && (self->volume_limit > 0)) {
+        guint64 newtotal = self->volume_bytes + size;
+        if(newtotal > self->volume_limit) {
+            return TRUE;
+        }
     }
+    return FALSE;
+}
+
+static gboolean
+check_at_leom(S3Device *self, guint64 size)
+{
+    guint64 block_size = DEVICE(self)->block_size;
+    guint64 eom_warning_buffer = block_size *
+               (EOM_EARLY_WARNING_ZONE_BLOCKS + self->nb_threads);
 
-    if (dat.data == NULL) {
-       /* data was larger than the available space, so cache it and return
-        * the actual size */
-        self->cached_buf = dat.curl.buffer;
-        self->cached_size = dat.curl.buffer_pos;
-        self->cached_key = key;
-       key = NULL;
+    if(!self->leom)
+        return FALSE;
 
-        *size_req = dat.curl.buffer_pos;
-        return 0;
+    if(self->enforce_volume_limit && (self->volume_limit > 0)) {
+        guint64 newtotal = self->volume_bytes + size + eom_warning_buffer;
+        if(newtotal > self->volume_limit) {
+           return TRUE;
+        }
     }
+    return FALSE;
+}
 
-    /* ok, the read went directly to the user's buffer, so we need only
-     * set and return the size */
-    pself->block++;
-    g_free(key);
-    *size_req = dat.size_req;
-    return dat.size_req;
+static void
+reset_thread(
+    S3Device *self)
+{
+    int thread;
+    int nb_done = 0;
+
+    g_mutex_lock(self->thread_idle_mutex);
+    while(nb_done != self->nb_threads) {
+       nb_done = 0;
+       for (thread = 0; thread < self->nb_threads; thread++)  {
+           if (self->s3t[thread].done == 1)
+               nb_done++;
+       }
+       if (nb_done != self->nb_threads) {
+           g_cond_wait(self->thread_idle_cond, self->thread_idle_mutex);
+       }
+    }
+    g_mutex_unlock(self->thread_idle_mutex);
 }