Imported Upstream version 2.5.0
[debian/amanda] / server-src / driver.c
index 345f3f931d2ba14d5e07e58b205c13d4fca2d26e..c9f53e45fc1a86f87ac46b9c2423f57f76d2670a 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 1991-2000 University of Maryland at College Park
+ * Copyright (c) 1991-1998 University of Maryland at College Park
  * All Rights Reserved.
  *
  * Permission to use, copy, modify, distribute, and sell this software and its
@@ -24,7 +24,7 @@
  * file named AUTHORS, in the root directory of this distribution.
  */
 /*
- * $Id: driver.c,v 1.58.2.31.2.8.2.20.2.16 2005/09/20 21:31:52 jrjackson Exp $
+ * $Id: driver.c,v 1.164 2006/03/22 15:07:08 martinea Exp $
  *
  * controlling process for the Amanda backup system
  */
  *     tape.  Probably not effective though, should do this in planner.
  */
 
+/*#define HOLD_DEBUG*/
+
 #include "amanda.h"
 #include "clock.h"
 #include "conffile.h"
 #include "diskfile.h"
+#include "event.h"
 #include "holding.h"
 #include "infofile.h"
 #include "logfile.h"
 #include "driverio.h"
 #include "server_util.h"
 
-disklist_t waitq, runq, tapeq, roomq;
-int pending_aborts, inside_dump_to_tape;
-disk_t *taper_disk;
-int degraded_mode;
-unsigned long reserved_space;
-unsigned long total_disksize;
-char *dumper_program;
-int  inparallel;
-int nodump = 0;
-unsigned long tape_length, tape_left = 0;
-int conf_taperalgo;
-am_host_t *flushhost = NULL;
-
-int client_constrained P((disk_t *dp));
-int sort_by_priority_reversed P((disk_t *a, disk_t *b));
-int sort_by_time P((disk_t *a, disk_t *b));
-int start_some_dumps P((disklist_t *rq));
-void dump_schedule P((disklist_t *qp, char *str));
-void start_degraded_mode P((disklist_t *queuep));
-void handle_taper_result P((void));
-dumper_t *idle_dumper P((void));
-int some_dumps_in_progress P((void));
-int num_busy_dumpers P((void));
-dumper_t *lookup_dumper P((int fd));
-void handle_dumper_result P((int fd));
-void read_flush P((disklist_t *tapeqp));
-void read_schedule P((disklist_t *waitqp, disklist_t *runqp));
-int free_kps P((interface_t *ip));
-void interface_state P((char *time_str));
-void allocate_bandwidth P((interface_t *ip, int kps));
-void deallocate_bandwidth P((interface_t *ip, int kps));
-unsigned long free_space P((void));
-assignedhd_t **find_diskspace P((unsigned long size, int *cur_idle, assignedhd_t *preferred));
-char *diskname2filename P((char *dname));
-int assign_holdingdisk P((assignedhd_t **holdp, disk_t *diskp));
+static disklist_t waitq, runq, tapeq, roomq;
+static int pending_aborts;
+static disk_t *taper_disk;
+static int degraded_mode;
+static unsigned long reserved_space;
+static unsigned long total_disksize;
+static char *dumper_program;
+static char *chunker_program;
+static int  inparallel;
+static int nodump = 0;
+static unsigned long tape_length, tape_left = 0;
+static int current_tape = 1;
+static int conf_taperalgo;
+static int conf_runtapes;
+static time_t sleep_time;
+static int idle_reason;
+static char *datestamp;
+static char *timestamp;
+static am_host_t *flushhost = NULL;
+static int need_degraded=0;
+
+static event_handle_t *dumpers_ev_time = NULL;
+static event_handle_t *schedule_ev_read = NULL;
+
+static void allocate_bandwidth P((interface_t *ip, int kps));
+static int assign_holdingdisk P((assignedhd_t **holdp, disk_t *diskp));
 static void adjust_diskspace P((disk_t *diskp, cmd_t cmd));
 static void delete_diskspace P((disk_t *diskp));
-assignedhd_t **build_diskspace P((char *destname));
-void holdingdisk_state P((char *time_str));
-int dump_to_tape P((disk_t *dp));
-int queue_length P((disklist_t q));
-void short_dump_state P((void));
-void dump_state P((char *str));
-void startaflush P((void));
+static assignedhd_t **build_diskspace P((char *destname));
+static int client_constrained P((disk_t *dp));
+static void deallocate_bandwidth P((interface_t *ip, int kps));
+static void dump_schedule P((disklist_t *qp, char *str));
+static int dump_to_tape P((disk_t *dp));
+static assignedhd_t **find_diskspace P((unsigned long size, int *cur_idle,
+                                       assignedhd_t *preferred));
+static int free_kps P((interface_t *ip));
+static unsigned long free_space P((void));
+static void dumper_result P((disk_t *dp));
+static void handle_dumper_result P((void *));
+static void handle_chunker_result P((void *));
+static void handle_dumpers_time P((void *));
+static void handle_taper_result P((void *));
+static void holdingdisk_state P((char *time_str));
+static dumper_t *idle_dumper P((void));
+static void interface_state P((char *time_str));
+static int num_busy_dumpers P((void));
+static int queue_length P((disklist_t q));
+static disklist_t read_flush P((void));
+static void read_schedule P((void *cookie));
+static void short_dump_state P((void));
+static void startaflush P((void));
+static void start_degraded_mode P((disklist_t *queuep));
+static void start_some_dumps P((disklist_t *rq));
+static void continue_port_dumps();
+static void update_failed_dump_to_tape P((disk_t *));
+#if 0
+static void dump_state P((const char *str));
+#endif
 int main P((int main_argc, char **main_argv));
 
-static int idle_reason;
-char *datestamp;
-char *timestamp;
-
-char *idle_strings[] = {
+static const char *idle_strings[] = {
 #define NOT_IDLE               0
     "not-idle",
-#define IDLE_START_WAIT                1
-    "start-wait",
-#define IDLE_NO_DUMPERS                2
+#define IDLE_NO_DUMPERS                1
     "no-dumpers",
+#define IDLE_START_WAIT                2
+    "start-wait",
 #define IDLE_NO_HOLD           3
     "no-hold",
 #define IDLE_CLIENT_CONSTRAINED        4
@@ -117,19 +130,14 @@ char *idle_strings[] = {
     "taper-wait",
 };
 
-#define SLEEP_MAX              (24*3600)
-struct timeval sleep_time = { SLEEP_MAX, 0 };
-/* enabled if any disks are in start-wait: */
-int any_delayed_disk = 0;
-
-int main(main_argc, main_argv)
+int
+main(main_argc, main_argv)
      int main_argc;
      char **main_argv;
 {
-    disklist_t *origqp;
+    disklist_t origq;
     disk_t *diskp;
-    fd_set selectset;
-    int fd, dsk;
+    int dsk;
     dumper_t *dumper;
     char *newdir = NULL;
     generic_fs_stats_t fs;
@@ -154,6 +162,7 @@ int main(main_argc, main_argv)
 
     set_pname("driver");
 
+    /* Don't die when child closes pipe */
     signal(SIGPIPE, SIG_IGN);
 
     malloc_size_1 = malloc_inuse(&malloc_hist_1);
@@ -162,7 +171,6 @@ int main(main_argc, main_argv)
     set_logerror(logerror);
 
     startclock();
-    FD_ZERO(&readset);
 
     printf("%s: pid %ld executable %s version %s\n",
           get_pname(), (long) getpid(), main_argv[0], version());
@@ -204,9 +212,12 @@ int main(main_argc, main_argv)
     taper_program = vstralloc(libexecdir, "/", "taper", versionsuffix(), NULL);
     dumper_program = vstralloc(libexecdir, "/", "dumper", versionsuffix(),
                               NULL);
+    chunker_program = vstralloc(libexecdir, "/", "chunker", versionsuffix(),
+                              NULL);
 
     conf_taperalgo = getconf_int(CNF_TAPERALGO);
     conf_tapetype = getconf_str(CNF_TAPETYPE);
+    conf_runtapes = getconf_int(CNF_RUNTAPES);
     tape = lookup_tapetype(conf_tapetype);
     tape_length = tape->length;
     printf("driver: tape size %ld\n", tape_length);
@@ -214,8 +225,10 @@ int main(main_argc, main_argv)
     /* taper takes a while to get going, so start it up right away */
 
     init_driverio();
-    startup_tape_process(taper_program);
-    taper_cmd(START_TAPER, datestamp, NULL, 0, NULL);
+    if(conf_runtapes > 0) {
+       startup_tape_process(taper_program);
+       taper_cmd(START_TAPER, datestamp, NULL, 0, NULL);
+    }
 
     /* start initializing: read in databases */
 
@@ -225,9 +238,8 @@ int main(main_argc, main_argv)
     } else {
        conf_diskfile = stralloc2(config_dir, conf_diskfile);
     }
-    if((origqp = read_diskfile(conf_diskfile)) == NULL) {
+    if (read_diskfile(conf_diskfile, &origq) < 0)
        error("could not load disklist \"%s\"", conf_diskfile);
-    }
     amfree(conf_diskfile);
 
     /* set up any configuration-dependent variables */
@@ -276,7 +288,7 @@ int main(main_argc, main_argv)
        newdir = newvstralloc(newdir,
                              hdp->diskdir, "/", timestamp,
                              NULL);
-        if(!mkholdingdir(newdir)) {
+       if(!mkholdingdir(newdir)) {
            hdp->disksize = 0L;
        }
        total_disksize += hdp->disksize;
@@ -292,7 +304,6 @@ int main(main_argc, main_argv)
     if(inparallel > MAX_DUMPERS) inparallel = MAX_DUMPERS;
 
     /* fire up the dumpers now while we are waiting */
-
     if(!nodump) startup_dump_processes(dumper_program, inparallel);
 
     /*
@@ -303,12 +314,12 @@ int main(main_argc, main_argv)
      * in parallel with the planner.
      */
 
-    waitq = *origqp;
-    tapeq.head = tapeq.tail = NULL;
-    roomq.head = roomq.tail = NULL;
-    runq.head = runq.tail = NULL;
+    runq.head = NULL;
+    runq.tail = NULL;
+    waitq = origq;
+    tapeq = read_flush();
 
-    read_flush(&tapeq);
+    roomq.head = roomq.tail = NULL;
 
     log_add(L_STATS, "startup time %s", walltime_str(curclock()));
 
@@ -320,87 +331,35 @@ int main(main_argc, main_argv)
           getconf_str(CNF_DUMPORDER));
     fflush(stdout);
 
-    /* Let's see if the tape is ready */
+    /* ok, planner is done, now lets see if the tape is ready */
 
-    cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1);
+    if(conf_runtapes > 0) {
+       cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1);
 
-    if(cmd != TAPER_OK) {
-       /* no tape, go into degraded mode: dump to holding disk */
-       start_degraded_mode(&runq);
-       FD_CLR(taper,&readset);
+       if(cmd != TAPER_OK) {
+           /* no tape, go into degraded mode: dump to holding disk */
+           need_degraded=1;
+       }
+    }
+    else {
+       need_degraded=1;
     }
-
-    short_dump_state();                                        /* for amstatus */
 
     tape_left = tape_length;
     taper_busy = 0;
     taper_disk = NULL;
+    taper_ev_read = NULL;
+    if(!need_degraded) startaflush();
 
-    /* Start autoflush while waiting for dump schedule */
-    if(!nodump) {
-       /* Start any autoflush tape writes */
-       if (!empty(tapeq)) {
-           startaflush();
-           short_dump_state();                         /* for amstatus */
-
-           /* Process taper results until the schedule arrives */
-           while (1) {
-               FD_ZERO(&selectset);
-               FD_SET(0, &selectset);
-               FD_SET(taper, &selectset);
-
-               if(select(taper+1, (SELECT_ARG_TYPE *)(&selectset), NULL, NULL,
-                         &sleep_time) == -1)
-                   error("select: %s", strerror(errno));
-               if (FD_ISSET(0, &selectset)) break;     /* schedule arrived */
-               if (FD_ISSET(taper, &selectset)) handle_taper_result();
-               short_dump_state();                     /* for amstatus */
-           }
-           
-       }
-
-       /* Read the dump schedule */
-       read_schedule(&waitq, &runq);
-    }
-
-    /* Start any needed flushes */
-    startaflush();
-
-    while(start_some_dumps(&runq) || some_dumps_in_progress() ||
-         any_delayed_disk) {
-       short_dump_state();
-
-       /* wait for results */
-
-       memcpy(&selectset, &readset, sizeof(fd_set));
-       if(select(maxfd+1, (SELECT_ARG_TYPE *)(&selectset),
-                 NULL, NULL, &sleep_time) == -1)
-           error("select: %s", strerror(errno));
-
-       /* handle any results that have come in */
-
-       for(fd = 0; fd <= maxfd; fd++) {
-           /*
-            * The first pass through the following loop, we have
-            * data ready for areads (called by getresult, called by
-            * handle_.*_result).  But that may read more than one record,
-            * so we need to keep processing as long as areads has data.
-            * We will get control back after each record and the buffer
-            * will go empty (indicated by areads_dataready(fd) == 0)
-            * after the last one available has been processed.
-            */
-           while(FD_ISSET(fd, &selectset) || areads_dataready(fd) > 0) {
-               if(fd == taper) handle_taper_result();
-               else handle_dumper_result(fd);
-               FD_CLR(fd, &selectset);
-           }
-       }
+    if(!nodump)
+       schedule_ev_read = event_register(0, EV_READFD, read_schedule, NULL);
 
-    }
+    short_dump_state();
+    event_loop(0);
 
     /* handle any remaining dumps by dumping directly to tape, if possible */
 
-    while(!empty(runq)) {
+    while(!empty(runq) && taper > 0) {
        diskp = dequeue_disk(&runq);
        if(!degraded_mode) {
            int rc = dump_to_tape(diskp);
@@ -419,8 +378,8 @@ int main(main_argc, main_argv)
        }
        else
            log_add(L_FAIL, "%s %s %s %d [%s]",
-                   diskp->host->hostname, diskp->name,
-                   sched(diskp)->datestamp, sched(diskp)->level,
+               diskp->host->hostname, diskp->name, sched(diskp)->datestamp,
+               sched(diskp)->level,
                diskp->no_hold ?
                    "can't dump no-hold disk in degraded mode" :
                    "no more holding disk space");
@@ -474,7 +433,7 @@ int main(main_argc, main_argv)
            who = stralloc("taper");
        }
        if(what != NULL && who == NULL) {
-           ap_snprintf(number, sizeof(number), "%ld", (long)pid);
+           snprintf(number, sizeof(number), "%ld", (long)pid);
            who = stralloc2("unknown pid ", number);
        }
        if(who && what) {
@@ -494,6 +453,7 @@ int main(main_argc, main_argv)
     }
     amfree(newdir);
 
+    check_unfree_serial();
     printf("driver: FINISHED time %s\n", walltime_str(curclock()));
     fflush(stdout);
     log_add(L_FINISH,"date %s time %s", datestamp, walltime_str(curclock()));
@@ -514,21 +474,26 @@ int main(main_argc, main_argv)
     return 0;
 }
 
-void startaflush() {
+static void
+startaflush()
+{
     disk_t *dp = NULL;
     disk_t *fit = NULL;
     char *datestamp;
-
+    unsigned int extra_tapes = 0;
     if(!degraded_mode && !taper_busy && !empty(tapeq)) {
+       
        datestamp = sched(tapeq.head)->datestamp;
        switch(conf_taperalgo) {
        case ALGO_FIRST:
                dp = dequeue_disk(&tapeq);
                break;
-       case ALGO_FIRSTFIT: 
+       case ALGO_FIRSTFIT:
                fit = tapeq.head;
                while (fit != NULL) {
-                   if(sched(fit)->act_size <= tape_left &&
+                   extra_tapes = (fit->tape_splitsize > 0) ? 
+                                               conf_runtapes - current_tape : 0;
+                   if(sched(fit)->act_size <= (tape_left + tape_length*extra_tapes) &&
                       strcmp(sched(fit)->datestamp, datestamp) <= 0) {
                        dp = fit;
                        fit = NULL;
@@ -539,7 +504,7 @@ void startaflush() {
                }
                if(dp) remove_disk(&tapeq, dp);
                break;
-       case ALGO_LARGEST: 
+       case ALGO_LARGEST:
                fit = dp = tapeq.head;
                while (fit != NULL) {
                    if(sched(fit)->act_size > sched(dp)->act_size &&
@@ -550,10 +515,12 @@ void startaflush() {
                }
                if(dp) remove_disk(&tapeq, dp);
                break;
-       case ALGO_LARGESTFIT: 
+       case ALGO_LARGESTFIT:
                fit = tapeq.head;
                while (fit != NULL) {
-                   if(sched(fit)->act_size <= tape_left &&
+                   extra_tapes = (fit->tape_splitsize > 0) ? 
+                                               conf_runtapes - current_tape : 0;
+                   if(sched(fit)->act_size <= (tape_left + tape_length*extra_tapes) &&
                       (!dp || sched(fit)->act_size > sched(dp)->act_size) &&
                       strcmp(sched(fit)->datestamp, datestamp) <= 0) {
                        dp = fit;
@@ -562,7 +529,7 @@ void startaflush() {
                }
                if(dp) remove_disk(&tapeq, dp);
                break;
-       case ALGO_SMALLEST: 
+       case ALGO_SMALLEST:
                break;
        case ALGO_LAST:
                dp = tapeq.tail;
@@ -580,27 +547,40 @@ void startaflush() {
                   strcmp(sched(fit)->datestamp, datestamp) <= 0) {
                    dp = fit;
                }
-               fit = fit->next;
+               fit = fit->next;
            }
            if(dp) remove_disk(&tapeq, dp);
        }
-       taper_disk = dp;
-       taper_busy = 1;
-       taper_cmd(FILE_WRITE, dp, sched(dp)->destname, sched(dp)->level, 
+       if(taper_ev_read == NULL) {
+           taper_ev_read = event_register(taper, EV_READFD,
+                                          handle_taper_result, NULL);
+       }
+       if (dp != NULL) {
+           taper_disk = dp;
+           taper_busy = 1;
+           taper_cmd(FILE_WRITE, dp, sched(dp)->destname, sched(dp)->level,
                  sched(dp)->datestamp);
-       fprintf(stderr,"driver: startaflush: %s %s %s %ld %ld\n",
-               taperalgo2str(conf_taperalgo), dp->host->hostname,
-               dp->name, sched(taper_disk)->act_size, tape_left);
-       if(sched(dp)->act_size <= tape_left)
-           tape_left -= sched(dp)->act_size;
-       else
-           tape_left = 0;
+           fprintf(stderr,"driver: startaflush: %s %s %s %ld %ld\n",
+                   taperalgo2str(conf_taperalgo), dp->host->hostname,
+                   dp->name, sched(taper_disk)->act_size, tape_left);
+           if(sched(dp)->act_size <= tape_left)
+               tape_left -= sched(dp)->act_size;
+           else
+               tape_left = 0;
+       } else {
+           error("FATAL: Taper marked busy and no work found.");
+           /*NOTREACHED*/
+       }
+    } else if(!taper_busy && taper_ev_read != NULL) {
+       event_release(taper_ev_read);
+       taper_ev_read = NULL;
     }
 }
 
 
-int client_constrained(dp)
-disk_t *dp;
+static int
+client_constrained(dp)
+    disk_t *dp;
 {
     disk_t *dp2;
 
@@ -624,86 +604,109 @@ disk_t *dp;
     return 0;
 }
 
-int start_some_dumps(rq)
-disklist_t *rq;
+static void
+start_some_dumps(rq)
+    disklist_t *rq;
 {
-    int total, cur_idle;
-    disk_t *diskp, *diskp_accept;
-    dumper_t *dumper;
+    int cur_idle;
+    disk_t *diskp, *delayed_diskp, *diskp_accept;
     assignedhd_t **holdp=NULL, **holdp_accept;
-    time_t now = time(NULL);
+    const time_t now = time(NULL);
+    cmd_t cmd;
+    int result_argc;
+    char *result_argv[MAX_ARGS+1];
+    chunker_t *chunker;
+    dumper_t *dumper;
+    char dumptype;
+    char *dumporder;
 
-    total = 0;
     idle_reason = IDLE_NO_DUMPERS;
-    sleep_time.tv_sec = SLEEP_MAX;
-    sleep_time.tv_usec = 0;
-    any_delayed_disk = 0;
+    sleep_time = 0;
 
-    if(rq->head == NULL) {
-       idle_reason = 0;
-       return 0;
+    if(dumpers_ev_time != NULL) {
+       event_release(dumpers_ev_time);
+       dumpers_ev_time = NULL;
     }
 
-    /*
-     * A potential problem with starting from the bottom of the dump time
-     * distribution is that a slave host will have both one of the shortest
-     * and one of the longest disks, so starting its shortest disk first will
-     * tie up the host and eliminate its longest disk from consideration the
-     * first pass through.  This could cause a big delay in starting that long
-     * disk, which could drag out the whole night's dumps.
-     *
-     * While starting from the top of the dump time distribution solves the
-     * above problem, this turns out to be a bad idea, because the big dumps
-     * will almost certainly pack the holding disk completely, leaving no
-     * room for even one small dump to start.  This ends up shutting out the
-     * small-end dumpers completely (they stay idle).
-     *
-     * The introduction of multiple simultaneous dumps to one host alleviates
-     * the biggest&smallest dumps problem: both can be started at the
-     * beginning.
-     */
-    for(dumper = dmptable; dumper < dmptable+inparallel; dumper++) {
-       if(dumper->busy || dumper->down) continue;
-       /* found an idle dumper, now find a disk for it */
-       diskp = rq->head;
+    for (dumper = dmptable; dumper < dmptable+inparallel; dumper++) {
+
+       if( dumper->busy ) {
+           continue;
+       }
+
+       if (dumper->ev_read != NULL) {
+/*         assert(dumper->ev_read == NULL);*/
+           event_release(dumper->ev_read);
+           dumper->ev_read = NULL;
+       }
+
+       /*
+        * A potential problem with starting from the bottom of the dump time
+        * distribution is that a slave host will have both one of the shortest
+        * and one of the longest disks, so starting its shortest disk first will
+        * tie up the host and eliminate its longest disk from consideration the
+        * first pass through.  This could cause a big delay in starting that long
+        * disk, which could drag out the whole night's dumps.
+        *
+        * While starting from the top of the dump time distribution solves the
+        * above problem, this turns out to be a bad idea, because the big dumps
+        * will almost certainly pack the holding disk completely, leaving no
+        * room for even one small dump to start.  This ends up shutting out the
+        * small-end dumpers completely (they stay idle).
+        *
+        * The introduction of multiple simultaneous dumps to one host alleviates
+        * the biggest&smallest dumps problem: both can be started at the
+        * beginning.
+        */
+
        diskp_accept = NULL;
        holdp_accept = NULL;
-
-       if(idle_reason == IDLE_NO_DUMPERS)
-           idle_reason = NOT_IDLE;
+       delayed_diskp = NULL;
 
        cur_idle = NOT_IDLE;
 
-       while(diskp) {
+       dumporder = getconf_str(CNF_DUMPORDER);
+       if(strlen(dumporder) > (dumper-dmptable)) {
+           dumptype = dumporder[dumper-dmptable];
+       }
+       else {
+           if(dumper-dmptable < 3)
+               dumptype = 't';
+           else
+               dumptype = 'T';
+       }
+
+       for(diskp = rq->head; diskp != NULL; diskp = diskp->next) {
            assert(diskp->host != NULL && sched(diskp) != NULL);
 
            /* round estimate to next multiple of DISK_BLOCK_KB */
            sched(diskp)->est_size = am_round(sched(diskp)->est_size,
                                              DISK_BLOCK_KB);
 
-           if(diskp->host->start_t > now) {
+           if (diskp->host->start_t > now) {
                cur_idle = max(cur_idle, IDLE_START_WAIT);
-               sleep_time.tv_sec = min(diskp->host->start_t - now, 
-                                       sleep_time.tv_sec);
-               any_delayed_disk = 1;
-           }
-           else if(diskp->start_t > now) {
+               if (delayed_diskp == NULL || sleep_time > diskp->host->start_t) {
+                   delayed_diskp = diskp;
+                   sleep_time = diskp->host->start_t;
+               }
+           else if(diskp->start_t > now) {
                cur_idle = max(cur_idle, IDLE_START_WAIT);
-               sleep_time.tv_sec = min(diskp->start_t - now, 
-                                       sleep_time.tv_sec);
-               any_delayed_disk = 1;
-           }
-           else if(diskp->host->netif->curusage > 0 &&
-                   sched(diskp)->est_kps > free_kps(diskp->host->netif))
+               if (delayed_diskp == NULL || sleep_time > diskp->start_t) {
+                   delayed_diskp = diskp;
+                   sleep_time = diskp->start_t;
+               }
+           } else if (diskp->host->netif->curusage > 0 &&
+                      sched(diskp)->est_kps > free_kps(diskp->host->netif)) {
                cur_idle = max(cur_idle, IDLE_NO_BANDWIDTH);
-           else if(sched(diskp)->no_space)
+           } else if(sched(diskp)->no_space) {
                cur_idle = max(cur_idle, IDLE_NO_DISKSPACE);
-           else if((holdp = find_diskspace(sched(diskp)->est_size,&cur_idle,NULL)) == NULL)
+           } else if ((holdp =
+               find_diskspace(sched(diskp)->est_size,&cur_idle,NULL)) == NULL) {
                cur_idle = max(cur_idle, IDLE_NO_DISKSPACE);
-           else if(diskp->no_hold) {
+           } else if (diskp->no_hold) {
                free_assignedhd(holdp);
                cur_idle = max(cur_idle, IDLE_NO_HOLD);
-           } else if(client_constrained(diskp)) {
+           } else if (client_constrained(diskp)) {
                free_assignedhd(holdp);
                cur_idle = max(cur_idle, IDLE_CLIENT_CONSTRAINED);
            } else {
@@ -711,17 +714,6 @@ disklist_t *rq;
                /* disk fits, dump it */
                int accept = !diskp_accept;
                if(!accept) {
-                   char dumptype;
-                   char *dumporder = getconf_str(CNF_DUMPORDER);
-                   if(strlen(dumporder) <= (dumper-dmptable)) {
-                       if(dumper-dmptable < 3)
-                           dumptype = 't';
-                       else
-                           dumptype = 'T';
-                   }
-                   else {
-                       dumptype = dumporder[dumper-dmptable];
-                   }
                    switch(dumptype) {
                      case 's': accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size);
                                break;
@@ -735,7 +727,7 @@ disklist_t *rq;
                                break;
                      case 'B': accept = (sched(diskp)->est_kps > sched(diskp_accept)->est_kps);
                                break;
-                     default:  log_add(L_WARNING, "Unknown dumporder character \'%c\', using 's'.\n",
+                     default:  log_add(L_WARNING, "Unknown dumporder character \'%c\', using 's'.\n",
                                        dumptype);
                                accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size);
                                break;
@@ -755,60 +747,94 @@ disklist_t *rq;
                    free_assignedhd(holdp);
                }
            }
-           diskp = diskp->next;
        }
 
        diskp = diskp_accept;
        holdp = holdp_accept;
-       if(diskp) {
-           cur_idle = NOT_IDLE;
+
+       idle_reason = max(idle_reason, cur_idle);
+
+       /*
+        * If we have no disk at this point, and there are disks that
+        * are delayed, then schedule a time event to call this dumper
+        * with the disk with the shortest delay.
+        */
+       if (diskp == NULL && delayed_diskp != NULL) {
+           assert(sleep_time > now);
+           sleep_time -= now;
+           dumpers_ev_time = event_register(sleep_time, EV_TIME,
+               handle_dumpers_time, &runq);
+           return;
+       } else if (diskp != NULL) {
            sched(diskp)->act_size = 0;
            allocate_bandwidth(diskp->host->netif, sched(diskp)->est_kps);
            sched(diskp)->activehd = assign_holdingdisk(holdp, diskp);
            amfree(holdp);
-           diskp->host->inprogress += 1;       /* host is now busy */
+           sched(diskp)->destname = newstralloc(sched(diskp)->destname,
+                                                sched(diskp)->holdp[0]->destname);
+           diskp->host->inprogress++;  /* host is now busy */
            diskp->inprogress = 1;
            sched(diskp)->dumper = dumper;
-           sched(diskp)->timestamp = time((time_t *)0);
+           sched(diskp)->timestamp = now;
 
+           dumper->ev_read = event_register(dumper->fd, EV_READFD,
+               handle_dumper_result, dumper);
            dumper->busy = 1;           /* dumper is now busy */
            dumper->dp = diskp;         /* link disk to dumper */
-           total++;
            remove_disk(rq, diskp);             /* take it off the run queue */
-           dumper_cmd(dumper, FILE_DUMP, diskp);
-           diskp->host->start_t = time(NULL) + 15;
+
+           sched(diskp)->origsize = -1;
+           sched(diskp)->dumpsize = -1;
+           sched(diskp)->dumptime = -1;
+           sched(diskp)->tapetime = -1;
+           chunker = dumper->chunker;
+           chunker->result = LAST_TOK;
+           dumper->result = LAST_TOK;
+           startup_chunk_process(chunker,chunker_program);
+           chunker->dumper = dumper;
+           chunker_cmd(chunker, PORT_WRITE, diskp);
+           cmd = getresult(chunker->fd, 1, &result_argc, result_argv, MAX_ARGS+1);
+           if(cmd != PORT) {
+               printf("driver: did not get PORT from %s for %s:%s\n",
+                      chunker->name, diskp->host->hostname, diskp->name);
+               fflush(stdout);
+               return ;        /* fatal problem */
+           }
+           chunker->ev_read = event_register(chunker->fd, EV_READFD,
+                   handle_chunker_result, chunker);
+           dumper->output_port = atoi(result_argv[2]);
+
+           dumper_cmd(dumper, PORT_DUMP, diskp);
+
+           diskp->host->start_t = now + 15;
+       } else if (/* cur_idle != NOT_IDLE && */
+           (num_busy_dumpers() > 0 || taper_busy)) {
+           /*
+            * We are constrained.
+            */
        }
-       idle_reason = max(idle_reason, cur_idle);
     }
-    return total;
-}
-
-int sort_by_priority_reversed(a, b)
-disk_t *a, *b;
-{
-    if(sched(b)->priority - sched(a)->priority != 0)
-       return sched(b)->priority - sched(a)->priority;
-    else
-       return sort_by_time(a, b);
 }
 
-int sort_by_time(a, b)
-disk_t *a, *b;
+/*
+ * This gets called when a dumper is delayed for some reason.  It may
+ * be because a disk has a delayed start, or amanda is constrained
+ * by network or disk limits.
+ */
+static void
+handle_dumpers_time(cookie)
+    void *cookie;
 {
-    long diff;
-
-    if ((diff = sched(a)->est_time - sched(b)->est_time) < 0) {
-       return -1;
-    } else if (diff > 0) {
-       return 1;
-    } else {
-       return 0;
-    }
+    disklist_t *runq = cookie;
+    event_release(dumpers_ev_time);
+    dumpers_ev_time = NULL; 
+    start_some_dumps(runq);
 }
 
-void dump_schedule(qp, str)
-disklist_t *qp;
-char *str;
+static void
+dump_schedule(qp, str)
+    disklist_t *qp;
+    char *str;
 {
     disk_t *dp;
 
@@ -822,14 +848,19 @@ char *str;
     printf("--------\n");
 }
 
-
-void start_degraded_mode(queuep)
-disklist_t *queuep;
+static void
+start_degraded_mode(queuep)
+    disklist_t *queuep;
 {
     disk_t *dp;
     disklist_t newq;
     unsigned long est_full_size;
 
+    if (taper_ev_read != NULL) {
+       event_release(taper_ev_read);
+       taper_ev_read = NULL;
+    }
+
     newq.head = newq.tail = 0;
 
     dump_schedule(queuep, "before start degraded mode");
@@ -840,11 +871,11 @@ disklist_t *queuep;
 
        if(sched(dp)->level != 0)
            /* go ahead and do the disk as-is */
-           insert_disk(&newq, dp, sort_by_priority_reversed);
+           enqueue_disk(&newq, dp);
        else {
            if (reserved_space + est_full_size + sched(dp)->est_size
                <= total_disksize) {
-               insert_disk(&newq, dp, sort_by_priority_reversed);
+               enqueue_disk(&newq, dp);
                est_full_size += sched(dp)->est_size;
            }
            else if(sched(dp)->degr_level != -1) {
@@ -853,12 +884,12 @@ disklist_t *queuep;
                sched(dp)->est_size = sched(dp)->degr_size;
                sched(dp)->est_time = sched(dp)->degr_time;
                sched(dp)->est_kps  = sched(dp)->degr_kps;
-               insert_disk(&newq, dp, sort_by_priority_reversed);
+               enqueue_disk(&newq, dp);
            }
            else {
-               log_add(L_FAIL, "%s %s %s %d [can't switch to incremental dump]",
-                       dp->host->hostname, dp->name,
-                       sched(dp)->datestamp, sched(dp)->level);
+               log_add(L_FAIL,"%s %s %s %d [can't switch to incremental dump]",
+                       dp->host->hostname, dp->name, sched(dp)->datestamp,
+                       sched(dp)->level);
            }
        }
     }
@@ -869,12 +900,13 @@ disklist_t *queuep;
     dump_schedule(queuep, "after start degraded mode");
 }
 
-void continue_dumps()
+
+static void continue_port_dumps()
 {
-disk_t *dp, *ndp;
-assignedhd_t **h;
-int active_dumpers=0, busy_dumpers=0, i;
-dumper_t *dumper;
+    disk_t *dp, *ndp;
+    assignedhd_t **h;
+    int active_dumpers=0, busy_dumpers=0, i;
+    dumper_t *dumper;
 
     /* First we try to grant diskspace to some dumps waiting for it. */
     for( dp = roomq.head; dp; dp = ndp ) {
@@ -882,13 +914,14 @@ dumper_t *dumper;
        /* find last holdingdisk used by this dump */
        for( i = 0, h = sched(dp)->holdp; h[i+1]; i++ );
        /* find more space */
-       h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size, &active_dumpers, h[i] );
+       h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size,
+                           &active_dumpers, h[i] );
        if( h ) {
            for(dumper = dmptable; dumper < dmptable + inparallel &&
                                   dumper->dp != dp; dumper++);
            assert( dumper < dmptable + inparallel );
            sched(dp)->activehd = assign_holdingdisk( h, dp );
-           dumper_cmd( dumper, CONTINUE, dp );
+           chunker_cmd( dumper->chunker, CONTINUE, dp );
            amfree(h);
            remove_disk( &roomq, dp );
        }
@@ -909,17 +942,18 @@ dumper_t *dumper;
      * it will be dumped directly to tape. Actually, case c is a special
      * manifestation of case b) where only one dumper is busy.
      */
-    for( dp=NULL, dumper = dmptable; dumper < dmptable + inparallel; dumper++) {
+    for(dp=NULL, dumper = dmptable; dumper < (dmptable+inparallel); dumper++) {
        if( dumper->busy ) {
            busy_dumpers++;
            if( !find_disk(&roomq, dumper->dp) ) {
                active_dumpers++;
-           } else if( !dp || sched(dp)->est_size > sched(dumper->dp)->est_size ) {
+           } else if( !dp || 
+                      sched(dp)->est_size > sched(dumper->dp)->est_size ) {
                dp = dumper->dp;
            }
        }
     }
-    if( !active_dumpers && busy_dumpers > 0 && 
+    if((dp != NULL) && (active_dumpers == 0) && (busy_dumpers > 0) && 
         ((!taper_busy && empty(tapeq)) || degraded_mode) &&
        pending_aborts == 0 ) { /* not case a */
        if( busy_dumpers == 1 ) { /* case c */
@@ -930,411 +964,584 @@ dumper_t *dumper;
         * We abort that dump, hopefully not wasting too much time retrying it.
         */
        remove_disk( &roomq, dp );
+       chunker_cmd( sched(dp)->dumper->chunker, ABORT, NULL );
        dumper_cmd( sched(dp)->dumper, ABORT, NULL );
        pending_aborts++;
     }
 }
 
-void handle_taper_result()
+
+static void
+handle_taper_result(void *cookie)
 {
     disk_t *dp;
     int filenum;
     cmd_t cmd;
     int result_argc;
     char *result_argv[MAX_ARGS+1];
+    int avail_tapes = 0;
+    
+    assert(cookie == NULL);
+    
+    do {
+        
+       short_dump_state();
+        
+       cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1);
+        
+       switch(cmd) {
+            
+       case PARTIAL:
+       case DONE:      /* DONE <handle> <label> <tape file> <err mess> */
+           if(result_argc != 5) {
+               error("error: [taper DONE result_argc != 5: %d", result_argc);
+           }
+            
+           dp = serial2disk(result_argv[2]);
+           free_serial(result_argv[2]);
+            
+           filenum = atoi(result_argv[4]);
+           if(cmd == DONE) {
+               update_info_taper(dp, result_argv[3], filenum,
+                                  sched(dp)->level);
+           }
+            
+           delete_diskspace(dp);
+            
+           printf("driver: finished-cmd time %s taper wrote %s:%s\n",
+                  walltime_str(curclock()), dp->host->hostname, dp->name);
+           fflush(stdout);
+            
+           amfree(sched(dp)->destname);
+           amfree(sched(dp)->dumpdate);
+           amfree(sched(dp)->degr_dumpdate);
+           amfree(sched(dp)->datestamp);
+           amfree(dp->up);
+            
+           taper_busy = 0;
+           taper_disk = NULL;
+           startaflush();
+            
+           /* continue with those dumps waiting for diskspace */
+           continue_port_dumps();
+           break;
+            
+       case TRYAGAIN:  /* TRY-AGAIN <handle> <err mess> */
+           if (result_argc < 2) {
+               error("error [taper TRYAGAIN result_argc < 2: %d]",
+                     result_argc);
+           }
+           dp = serial2disk(result_argv[2]);
+           free_serial(result_argv[2]);
+           printf("driver: taper-tryagain time %s disk %s:%s\n",
+                  walltime_str(curclock()), dp->host->hostname, dp->name);
+           fflush(stdout);
+            
+           /* See how many tapes we have left, but we alwyays
+              retry once (why?) */
+           current_tape++;
+           if(dp->tape_splitsize > 0)
+               avail_tapes = conf_runtapes - current_tape;
+           else
+               avail_tapes = 0;
+            
+           if(sched(dp)->attempted > avail_tapes) {
+               log_add(L_FAIL, "%s %s %s %d [too many taper retries]",
+                        dp->host->hostname, dp->name, sched(dp)->datestamp,
+                        sched(dp)->level);
+               printf("driver: taper failed %s %s %s, too many taper retry\n",
+                       result_argv[2], dp->host->hostname, dp->name);
+           }
+           else {
+               /* Re-insert into taper queue. */
+               sched(dp)->attempted++;
+               headqueue_disk(&tapeq, dp);
+           }
+            
+           tape_left = tape_length;
+            
+           /* run next thing from queue */
+            
+           taper_busy = 0;
+           taper_disk = NULL;
+           startaflush();
+           continue_port_dumps();
+           break;
+            
+        case SPLIT_CONTINUE:  /* SPLIT_CONTINUE <handle> <new_label> */
+            if (result_argc != 3) {
+                error("error [taper SPLIT_CONTINUE result_argc != 3: %d]",
+                      result_argc);
+            }
+            
+            break;
+        case SPLIT_NEEDNEXT:  /* SPLIT-NEEDNEXT <handle> <kb written> */
+            if (result_argc != 3) {
+                error("error [taper SPLIT_NEEDNEXT result_argc != 3: %d]",
+                      result_argc);
+            }
+            
+            /* Update our tape counter and reset tape_left */
+            current_tape++;
+            tape_left = tape_length;
+            
+            /* Reduce the size of the dump by amount written and reduce
+               tape_left by the amount left over */
+            dp = serial2disk(result_argv[2]);
+            sched(dp)->act_size -= atoi(result_argv[3]);
+            if (sched(dp)->act_size < tape_left)
+                tape_left -= sched(dp)->act_size;
+            else
+                tape_length = 0;
+            
+            break;
+            
+        case TAPE_ERROR: /* TAPE-ERROR <handle> <err mess> */
+            dp = serial2disk(result_argv[2]);
+            free_serial(result_argv[2]);
+            printf("driver: finished-cmd time %s taper wrote %s:%s\n",
+                   walltime_str(curclock()), dp->host->hostname, dp->name);
+            fflush(stdout);
+            log_add(L_WARNING, "Taper  error: %s", result_argv[3]);
+            /* FALLSTHROUGH */
+
+        case BOGUS:
+            if (cmd == BOGUS) {
+               log_add(L_WARNING, "Taper protocol error");
+            }
+            /*
+             * Since we've gotten a taper error, we can't send anything more
+             * to the taper.  Go into degraded mode to try to get everthing
+             * onto disk.  Later, these dumps can be flushed to a new tape.
+             * The tape queue is zapped so that it appears empty in future
+             * checks. If there are dumps waiting for diskspace to be freed,
+             * cancel one.
+             */
+            if(!nodump) {
+                log_add(L_WARNING,
+                        "going into degraded mode because of taper component error.");
+                start_degraded_mode(&runq);
+            }
+            tapeq.head = tapeq.tail = NULL;
+            taper_busy = 0;
+            taper_disk = NULL;
+            if(taper_ev_read != NULL) {
+                event_release(taper_ev_read);
+                taper_ev_read = NULL;
+            }
+            if(cmd != TAPE_ERROR) aclose(taper);
+            continue_port_dumps();
+            break;
+
+       default:
+            error("driver received unexpected token (%s) from taper",
+                  cmdstr[cmd]);
+       }
+       /*
+        * Wakeup any dumpers that are sleeping because of network
+        * or disk constraints.
+        */
+       start_some_dumps(&runq);
+        
+    } while(areads_dataready(taper));
+}
 
-    cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1);
+static dumper_t *
+idle_dumper()
+{
+    dumper_t *dumper;
 
-    switch(cmd) {
+    for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
+       if(!dumper->busy && !dumper->down) return dumper;
 
-    case DONE: /* DONE <handle> <label> <tape file> <err mess> */
-       if(result_argc != 5) {
-           error("error: [taper DONE result_argc != 5: %d", result_argc);
-       }
+    return NULL;
+}
 
-       dp = serial2disk(result_argv[2]);
-       free_serial(result_argv[2]);
+static int
+num_busy_dumpers()
+{
+    dumper_t *dumper;
+    int n;
 
-       filenum = atoi(result_argv[4]);
-       update_info_taper(dp, result_argv[3], filenum, sched(dp)->level);
+    n = 0;
+    for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
+       if(dumper->busy) n += 1;
 
-       delete_diskspace(dp);
+    return n;
+}
 
-       printf("driver: finished-cmd time %s taper wrote %s:%s\n",
-              walltime_str(curclock()), dp->host->hostname, dp->name);
-       fflush(stdout);
 
-       amfree(sched(dp)->dumpdate);
-       amfree(sched(dp)->degr_dumpdate);
-       amfree(sched(dp)->datestamp);
-       amfree(dp->up);
+static void
+dumper_result(dp)
+    disk_t *dp;
+{
+    dumper_t *dumper;
+    chunker_t *chunker;
+    assignedhd_t **h=NULL;
+    int activehd, i, dummy;
+    long size;
+    int is_partial;
 
-       taper_busy = 0;
-       taper_disk = NULL;
-       startaflush();
-       continue_dumps(); /* continue with those dumps waiting for diskspace */
-       break;
+    dumper = sched(dp)->dumper;
+    chunker = dumper->chunker;
 
-    case TRYAGAIN:  /* TRY-AGAIN <handle> <err mess> */
-       if (result_argc < 2) {
-           error("error [taper TRYAGAIN result_argc < 2: %d]", result_argc);
-       }
-       dp = serial2disk(result_argv[2]);
-       free_serial(result_argv[2]);
-       printf("driver: taper-tryagain time %s disk %s:%s\n",
-              walltime_str(curclock()), dp->host->hostname, dp->name);
-       fflush(stdout);
+    free_serial_dp(dp);
 
-       /* re-insert into taper queue */
+    h = sched(dp)->holdp;
+    activehd = sched(dp)->activehd;
 
-       if(sched(dp)->attempted) {
-           log_add(L_FAIL, "%s %s %d %s [too many taper retries]",
-                   dp->host->hostname, dp->name, sched(dp)->level,
-                   sched(dp)->datestamp);
-           printf("driver: taper failed %s %s %s, too many taper retry\n", result_argv[2], dp->host->hostname, dp->name);
-       }
-       else {
-           sched(dp)->attempted++;
-           headqueue_disk(&tapeq, dp);
-       }
+    if(dumper->result == DONE && chunker->result == DONE) {
+       update_info_dumper(dp, sched(dp)->origsize,
+                          sched(dp)->dumpsize, sched(dp)->dumptime);
+    }
 
-       tape_left = tape_length;
+    deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
 
-       /* run next thing from queue */
-       taper_busy = 0;
-       taper_disk = NULL;
-       startaflush();
-       continue_dumps(); /* continue with those dumps waiting for diskspace */
+    is_partial = dumper->result != DONE || chunker->result != DONE;
+    rename_tmp_holding(sched(dp)->destname, !is_partial);
 
-       break;
+    dummy = 0;
+    for( i = 0, h = sched(dp)->holdp; i < activehd; i++ ) {
+       dummy += h[i]->used;
+    }
 
-    case TAPE_ERROR: /* TAPE-ERROR <handle> <err mess> */
-       dp = serial2disk(result_argv[2]);
-       free_serial(result_argv[2]);
-       printf("driver: finished-cmd time %s taper wrote %s:%s\n",
-              walltime_str(curclock()), dp->host->hostname, dp->name);
-       fflush(stdout);
-       /* Note: fall through code... */
+    size = size_holding_files(sched(dp)->destname, 0);
+    h[activehd]->used = size - dummy;
+    holdalloc(h[activehd]->disk)->allocated_dumpers--;
+    adjust_diskspace(dp, DONE);
 
-    case BOGUS:
-       /*
-        * Since we've gotten a tape error, we can't send anything more
-        * to the taper.  Go into degraded mode to try to get everthing
-        * onto disk.  Later, these dumps can be flushed to a new tape.
-        * The tape queue is zapped so that it appears empty in future
-        * checks. If there are dumps waiting for diskspace to be freed,
-        * cancel one.
-        */
-       if(!nodump) {
-           log_add(L_WARNING,
-                   "going into degraded mode because of tape error.");
-       }
-       start_degraded_mode(&runq);
-       taper_busy = 0;
-       taper_disk = NULL;
-       tapeq.head = tapeq.tail = NULL;
-       FD_CLR(taper,&readset);
-       if(cmd != TAPE_ERROR) aclose(taper);
-       continue_dumps();
-       break;
-    default:
-       error("driver received unexpected token (%d) from taper", cmd);
+    sched(dp)->attempted += 1;
+
+    if((dumper->result != DONE || chunker->result != DONE) &&
+       sched(dp)->attempted <= 1) {
+       delete_diskspace(dp);
+       enqueue_disk(&runq, dp);
+    }
+    else if(size > DISK_BLOCK_KB) {
+       sched(dp)->attempted = 0;
+       enqueue_disk(&tapeq, dp);
+       startaflush();
+    }
+    else {
+       delete_diskspace(dp);
     }
+
+    dumper->busy = 0;
+    dp->host->inprogress -= 1;
+    dp->inprogress = 0;
+
+    waitpid(chunker->pid, NULL, 0 );
+    aclose(chunker->fd);
+    chunker->fd = -1;
+    chunker->down = 1;
+    
+    dp = NULL;
+    continue_port_dumps();
+    /*
+     * Wakeup any dumpers that are sleeping because of network
+     * or disk constraints.
+     */
+    start_some_dumps(&runq);
 }
 
 
-dumper_t *idle_dumper()
+static void
+handle_dumper_result(cookie)
+    void *cookie;
 {
-    dumper_t *dumper;
+    /*static int pending_aborts = 0;*/
+    dumper_t *dumper = cookie;
+    disk_t *dp, *sdp;
+    cmd_t cmd;
+    int result_argc;
+    char *result_argv[MAX_ARGS+1];
 
-    for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
-       if(!dumper->busy && !dumper->down) return dumper;
+    assert(dumper != NULL);
+    dp = dumper->dp;
+    assert(dp != NULL && sched(dp) != NULL);
 
-    return NULL;
-}
+    do {
 
-int some_dumps_in_progress()
-{
-    dumper_t *dumper;
+       short_dump_state();
 
-    for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
-       if(dumper->busy) return 1;
+       cmd = getresult(dumper->fd, 1, &result_argc, result_argv, MAX_ARGS+1);
 
-    return taper_busy;
-}
+       if(cmd != BOGUS) {
+           /* result_argv[2] always contains the serial number */
+           sdp = serial2disk(result_argv[2]);
+           assert(sdp == dp);
+       }
 
-int num_busy_dumpers()
-{
-    dumper_t *dumper;
-    int n;
+       switch(cmd) {
 
-    n = 0;
-    for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
-       if(dumper->busy) n += 1;
+       case DONE: /* DONE <handle> <origsize> <dumpsize> <dumptime> <errstr> */
+           if(result_argc != 6) {
+               error("error [dumper DONE result_argc != 6: %d]", result_argc);
+           }
 
-    return n;
-}
+           /*free_serial(result_argv[2]);*/
 
-dumper_t *lookup_dumper(fd)
-int fd;
-{
-    dumper_t *dumper;
+           sched(dp)->origsize = (long)atof(result_argv[3]);
+           sched(dp)->dumptime = (long)atof(result_argv[5]);
 
-    for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
-       if(dumper->outfd == fd) return dumper;
+           printf("driver: finished-cmd time %s %s dumped %s:%s\n",
+                  walltime_str(curclock()), dumper->name,
+                  dp->host->hostname, dp->name);
+           fflush(stdout);
 
-    return NULL;
+           dumper->result = cmd;
+
+           break;
+
+       case TRYAGAIN: /* TRY-AGAIN <handle> <errstr> */
+           /*
+            * Requeue this disk, and fall through to the FAILED
+            * case for cleanup.
+            */
+           if(sched(dp)->attempted) {
+               log_add(L_FAIL, "%s %s %s %d [too many dumper retry: %s]",
+                   dp->host->hostname, dp->name, sched(dp)->datestamp,
+                   sched(dp)->level, result_argv[3]);
+               printf("driver: dump failed %s %s %s, too many dumper retry: %s\n",
+                       result_argv[2], dp->host->hostname, dp->name,
+                       result_argv[3]);
+           }
+           /* FALLTHROUGH */
+       case FAILED: /* FAILED <handle> <errstr> */
+           /*free_serial(result_argv[2]);*/
+           dumper->result = cmd;
+           break;
+
+       case ABORT_FINISHED: /* ABORT-FINISHED <handle> */
+           /*
+            * We sent an ABORT from the NO-ROOM case because this dump
+            * wasn't going to fit onto the holding disk.  We now need to
+            * clean up the remains of this image, and try to finish
+            * other dumps that are waiting on disk space.
+            */
+           assert(pending_aborts);
+           /*free_serial(result_argv[2]);*/
+           dumper->result = cmd;
+           break;
+
+       case BOGUS:
+           /* either EOF or garbage from dumper.  Turn it off */
+           log_add(L_WARNING, "%s pid %ld is messed up, ignoring it.\n",
+                   dumper->name, (long)dumper->pid);
+           event_release(dumper->ev_read);
+           dumper->ev_read = NULL;
+           aclose(dumper->fd);
+           dumper->busy = 0;
+           dumper->down = 1;   /* mark it down so it isn't used again */
+           if(dp) {
+               /* if it was dumping something, zap it and try again */
+               if(sched(dp)->attempted) {
+               log_add(L_FAIL, "%s %s %s %d [%s died]",
+                       dp->host->hostname, dp->name, sched(dp)->datestamp,
+                       sched(dp)->level, dumper->name);
+               }
+               else {
+               log_add(L_WARNING, "%s died while dumping %s:%s lev %d.",
+                       dumper->name, dp->host->hostname, dp->name,
+                       sched(dp)->level);
+               }
+           }
+           dumper->result = cmd;
+           break;
+
+       default:
+           assert(0);
+       }
+       /* send the dumper result to the chunker */
+       if(dumper->chunker->down == 0 && dumper->chunker->fd != -1) {
+           if(cmd == DONE) {
+               chunker_cmd(dumper->chunker, DONE, dp);
+           }
+           else {
+               chunker_cmd(dumper->chunker, FAILED, dp);
+           }
+       }
+
+       if(dumper->result != LAST_TOK && dumper->chunker->result != LAST_TOK)
+           dumper_result(dp);
+
+    } while(areads_dataready(dumper->fd));
 }
 
 
-void handle_dumper_result(fd)
-     int fd;
+static void
+handle_chunker_result(cookie)
+    void *cookie;
 {
+    /*static int pending_aborts = 0;*/
+    chunker_t *chunker = cookie;
     assignedhd_t **h=NULL;
     dumper_t *dumper;
     disk_t *dp, *sdp;
-    long origsize;
-    long dumpsize;
-    long dumptime;
     cmd_t cmd;
     int result_argc;
     char *result_argv[MAX_ARGS+1];
-    int i, dummy;
+    int dummy;
     int activehd = -1;
 
-    dumper = lookup_dumper(fd);
+
+    assert(chunker != NULL);
+    dumper = chunker->dumper;
+    assert(dumper != NULL);
     dp = dumper->dp;
-    assert(dp && sched(dp) && sched(dp)->destname);
+    assert(dp != NULL);
+    assert(sched(dp) != NULL);
+    assert(sched(dp)->destname != NULL);
+    assert(dp != NULL && sched(dp) != NULL && sched(dp)->destname);
 
     if(dp && sched(dp) && sched(dp)->holdp) {
        h = sched(dp)->holdp;
        activehd = sched(dp)->activehd;
     }
 
-    cmd = getresult(fd, 1, &result_argc, result_argv, MAX_ARGS+1);
+    do {
 
-    if(cmd != BOGUS) {
-       sdp = serial2disk(result_argv[2]); /* result_argv[2] always contains the serial number */
-       assert(sdp == dp);
-    }
+       short_dump_state();
 
-    switch(cmd) {
+       cmd = getresult(chunker->fd, 1, &result_argc, result_argv, MAX_ARGS+1);
 
-    case DONE: /* DONE <handle> <origsize> <dumpsize> <dumptime> <err str> */
-       if(result_argc != 6) {
-           error("error [dumper DONE result_argc != 6: %d]", result_argc);
+       if(cmd != BOGUS) {
+           /* result_argv[2] always contains the serial number */
+           sdp = serial2disk(result_argv[2]);
+           assert(sdp == dp);
        }
 
-       free_serial(result_argv[2]);
-
-       origsize = (long)atof(result_argv[3]);
-       dumpsize = (long)atof(result_argv[4]);
-       dumptime = (long)atof(result_argv[5]);
-       update_info_dumper(dp, origsize, dumpsize, dumptime);
+       switch(cmd) {
 
-       /* adjust holdp[active]->used using the real dumpsize and all other
-        * holdp[i]->used as an estimate.
-        */
+       case PARTIAL: /* PARTIAL <handle> <dumpsize> <errstr> */
+       case DONE: /* DONE <handle> <dumpsize> <errstr> */
+           if(result_argc != 4) {
+               error("error [chunker %s result_argc != 4: %d]", cmdstr[cmd],
+                     result_argc);
+           }
+           /*free_serial(result_argv[2]);*/
 
-       dummy = 0;
-       for( i = 0, h = sched(dp)->holdp; i < activehd; i++ ) {
-           dummy += h[i]->used;
-       }
+           sched(dp)->dumpsize = (long)atof(result_argv[3]);
 
-       rename_tmp_holding(sched(dp)->destname, 1);
-       assert( h && activehd >= 0 );
-       h[activehd]->used = size_holding_files(sched(dp)->destname) - dummy;
-       deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
-       holdalloc(h[activehd]->disk)->allocated_dumpers--;
-       adjust_diskspace(dp, DONE);
-       dumper->busy = 0;
-       dp->host->inprogress -= 1;
-       dp->inprogress = 0;
-       sched(dp)->attempted = 0;
-       printf("driver: finished-cmd time %s %s dumped %s:%s\n",
-              walltime_str(curclock()), dumper->name,
-              dp->host->hostname, dp->name);
-       fflush(stdout);
+           printf("driver: finished-cmd time %s %s chunked %s:%s\n",
+                  walltime_str(curclock()), chunker->name,
+                  dp->host->hostname, dp->name);
+           fflush(stdout);
 
-       enqueue_disk(&tapeq, dp);
-       dp = NULL;
+           event_release(chunker->ev_read);
 
-       startaflush();
-       continue_dumps();
+           chunker->result = cmd;
 
-       break;
+           break;
 
-    case TRYAGAIN: /* TRY-AGAIN <handle> <err str> */
-    case FATAL_TRYAGAIN:
-       free_serial(result_argv[2]);
+       case TRYAGAIN: /* TRY-AGAIN <handle> <errstr> */
+           assert(0);
+           event_release(chunker->ev_read);
 
-       rename_tmp_holding(sched(dp)->destname, 0);
-       deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
-       assert( h && activehd >= 0 );
-       holdalloc(h[activehd]->disk)->allocated_dumpers--;
-       /* Because we don't know how much was written to disk the
-        * following functions *must* be called together!
-        */
-       adjust_diskspace(dp, DONE);
-       delete_diskspace(dp);
-       dumper->busy = 0;
-       dp->host->inprogress -= 1;
-       dp->inprogress = 0;
-
-       if(sched(dp)->attempted) {
-           log_add(L_FAIL, "%s %s %d %s [too many dumper retry]",
-                   dp->host->hostname, dp->name,
-                   sched(dp)->level, sched(dp)->datestamp);
-           printf("driver: dump failed %s %s %s, too many dumper retry\n", result_argv[2], dp->host->hostname, dp->name);
-       } else {
-           sched(dp)->attempted++;
-           enqueue_disk(&runq, dp);
-       }
-       continue_dumps();
+           break;
+       case FAILED: /* FAILED <handle> <errstr> */
+           /*free_serial(result_argv[2]);*/
 
-       if(cmd == FATAL_TRYAGAIN) {
-           /* dumper is confused, start another */
-           log_add(L_WARNING, "%s (pid %ld) confused, restarting it.",
-                   dumper->name, (long)dumper->pid);
-           FD_CLR(fd,&readset);
-           aclose(fd);
-           startup_dump_process(dumper, dumper_program);
-       }
-       /* sleep in case the dumper failed because of a temporary network
-          problem, as NIS or NFS... */
-       sleep(15);
-       break;
+           event_release(chunker->ev_read);
 
-    case FAILED: /* FAILED <handle> <errstr> */
-       free_serial(result_argv[2]);
+           chunker->result = cmd;
 
-       rename_tmp_holding(sched(dp)->destname, 0);
-       deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
-       assert( h && activehd >= 0 );
-       holdalloc(h[activehd]->disk)->allocated_dumpers--;
-       /* Because we don't know how much was written to disk the
-        * following functions *must* be called together!
-        */
-       adjust_diskspace(dp, DONE);
-       delete_diskspace(dp);
-       dumper->busy = 0;
-       dp->host->inprogress -= 1;
-       dp->inprogress = 0;
-       continue_dumps();
-
-       /* no need to log this, dumper will do it */
-       /* sleep in case the dumper failed because of a temporary network
-          problem, as NIS or NFS... */
-       sleep(15);
-       break;
+           break;
 
-    case NO_ROOM: /* NO-ROOM <handle> <missing_size> */
-       assert( h && activehd >= 0 );
-       h[activehd]->used -= atoi(result_argv[3]);
-       h[activehd]->reserved -= atoi(result_argv[3]);
-       holdalloc(h[activehd]->disk)->allocated_space -= atoi(result_argv[3]);
-       h[activehd]->disk->disksize -= atoi(result_argv[3]);
-       break;
+       case NO_ROOM: /* NO-ROOM <handle> <missing_size> */
+           assert( h && activehd >= 0 );
+           h[activehd]->used -= atoi(result_argv[3]);
+           h[activehd]->reserved -= atoi(result_argv[3]);
+           holdalloc(h[activehd]->disk)->allocated_space -= atoi(result_argv[3]);
+           h[activehd]->disk->disksize -= atoi(result_argv[3]);
+           break;
 
-    case RQ_MORE_DISK: /* RQ-MORE-DISK <handle> */
-       assert( h && activehd >= 0 );
-       holdalloc(h[activehd]->disk)->allocated_dumpers--;
-       h[activehd]->used = h[activehd]->reserved;
-       if( h[++activehd] ) { /* There's still some allocated space left. Tell
-                              * the dumper about it. */
-           sched(dp)->activehd++;
-           dumper_cmd( dumper, CONTINUE, dp );
-       } else { /* !h[++activehd] - must allocate more space */
-           sched(dp)->act_size = sched(dp)->est_size; /* not quite true */
-           sched(dp)->est_size = sched(dp)->act_size * 21 / 20; /* +5% */
-           sched(dp)->est_size = am_round(sched(dp)->est_size, DISK_BLOCK_KB);
-           h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size,
-                               &dummy,
-                               h[activehd-1] );
-           if( !h ) {
-    /*     cur_idle = max(cur_idle, IDLE_NO_DISKSPACE); */
-               /* No diskspace available. The reason for this will be
-                * determined in continue_dumps(). */
-               enqueue_disk( &roomq, dp );
-               continue_dumps();
-           } else {
-               /* OK, allocate space for disk and have dumper continue */
-               sched(dp)->activehd = assign_holdingdisk( h, dp );
-               dumper_cmd( dumper, CONTINUE, dp );
-               amfree(h);
+       case RQ_MORE_DISK: /* RQ-MORE-DISK <handle> */
+           assert( h && activehd >= 0 );
+           holdalloc(h[activehd]->disk)->allocated_dumpers--;
+           h[activehd]->used = h[activehd]->reserved;
+           if( h[++activehd] ) { /* There's still some allocated space left.
+                                  * Tell the dumper about it. */
+               sched(dp)->activehd++;
+               chunker_cmd( chunker, CONTINUE, dp );
+           } else { /* !h[++activehd] - must allocate more space */
+               sched(dp)->act_size = sched(dp)->est_size; /* not quite true */
+               sched(dp)->est_size = sched(dp)->act_size * 21 / 20; /* +5% */
+               sched(dp)->est_size = am_round(sched(dp)->est_size, DISK_BLOCK_KB);
+               h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size,
+                                   &dummy,
+                                   h[activehd-1] );
+               if( !h ) {
+                   /* No diskspace available. The reason for this will be
+                    * determined in continue_port_dumps(). */
+                   enqueue_disk( &roomq, dp );
+                   continue_port_dumps();
+               } else {
+                   /* OK, allocate space for disk and have chunker continue */
+                   sched(dp)->activehd = assign_holdingdisk( h, dp );
+                   chunker_cmd( chunker, CONTINUE, dp );
+                   amfree(h);
+               }
            }
-       }
-       break;
+           break;
 
-    case ABORT_FINISHED: /* ABORT-FINISHED <handle> */
-       assert(pending_aborts);
-       free_serial(result_argv[2]);
+       case ABORT_FINISHED: /* ABORT-FINISHED <handle> */
+           /*
+            * We sent an ABORT from the NO-ROOM case because this dump
+            * wasn't going to fit onto the holding disk.  We now need to
+            * clean up the remains of this image, and try to finish
+            * other dumps that are waiting on disk space.
+            */
+           /*assert(pending_aborts);*/
 
-       rename_tmp_holding(sched(dp)->destname, 0);
-       deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
-       /* Because we don't know how much was written to disk the
-        * following functions *must* be called together!
-        */
-       adjust_diskspace(dp, DONE);
-       delete_diskspace(dp);
-       sched(dp)->attempted++;
-       enqueue_disk(&runq, dp);        /* we'll try again later */
-       dumper->busy = 0;
-       dp->host->inprogress -= 1;
-       dp->inprogress = 0;
-       dp = NULL;
-       pending_aborts--;
-       continue_dumps();
-       break;
+           /*free_serial(result_argv[2]);*/
 
-    case BOGUS:
-       /* either EOF or garbage from dumper.  Turn it off */
-       log_add(L_WARNING, "%s pid %ld is messed up, ignoring it.\n",
-               dumper->name, (long)dumper->pid);
-       FD_CLR(fd,&readset);
-       aclose(fd);
-       dumper->busy = 0;
-       dumper->down = 1;       /* mark it down so it isn't used again */
-       if(dp) {
-           /* if it was dumping something, zap it and try again */
-           rename_tmp_holding(sched(dp)->destname, 0);
-           deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
-           assert( h && activehd >= 0 );
-           holdalloc(h[activehd]->disk)->allocated_dumpers--;
-           /* Because we don't know how much was written to disk the
-            * following functions *must* be called together!
-            */
-           adjust_diskspace(dp, DONE);
-           delete_diskspace(dp);
-           dp->host->inprogress -= 1;
-           dp->inprogress = 0;
-           if(sched(dp)->attempted) {
-               log_add(L_FAIL, "%s %s %d %s [%s died]",
-                       dp->host->hostname, dp->name,
-                       sched(dp)->level, sched(dp)->datestamp, dumper->name);
-           }
-           else {
-               log_add(L_WARNING, "%s died while dumping %s:%s lev %d.",
-                       dumper->name, dp->host->hostname, dp->name,
-                       sched(dp)->level);
-               sched(dp)->attempted++;
-               enqueue_disk(&runq, dp);
+           event_release(chunker->ev_read);
+
+           chunker->result = cmd;
+
+           break;
+
+       case BOGUS:
+           /* either EOF or garbage from chunker.  Turn it off */
+           log_add(L_WARNING, "%s pid %ld is messed up, ignoring it.\n",
+                   chunker->name, (long)chunker->pid);
+
+           if(dp) {
+               /* if it was dumping something, zap it and try again */
+               assert( h && activehd >= 0 );
+               if(sched(dp)->attempted) {
+                   log_add(L_FAIL, "%s %s %s %d [%s died]",
+                           dp->host->hostname, dp->name, sched(dp)->datestamp,
+                           sched(dp)->level, chunker->name);
+               }
+               else {
+                   log_add(L_WARNING, "%s died while dumping %s:%s lev %d.",
+                           chunker->name, dp->host->hostname, dp->name,
+                           sched(dp)->level);
+               }
+               dp = NULL;
            }
-           dp = NULL;
-           continue_dumps();
+
+           event_release(chunker->ev_read);
+
+           chunker->result = cmd;
+
+           break;
+
+       default:
+           assert(0);
        }
-       break;
 
-    default:
-       assert(0);
-    }
+       if(chunker->result != LAST_TOK && chunker->dumper->result != LAST_TOK)
+           dumper_result(dp);
 
-    return;
+    } while(areads_dataready(chunker->fd));
 }
 
 
-void read_flush(tapeqp)
-disklist_t *tapeqp;
+static disklist_t
+read_flush()
 {
     sched_t *sp;
     disk_t *dp;
@@ -1348,9 +1555,9 @@ disklist_t *tapeqp;
     char *command;
     char *s;
     int ch;
-    long flush_size = 0;
+    disklist_t tq;
 
-    /* read schedule from stdin */
+    tq.head = tq.tail = NULL;
 
     for(line = 0; (inpline = agets(stdin)) != NULL; free(inpline)) {
        line++;
@@ -1358,9 +1565,9 @@ disklist_t *tapeqp;
        s = inpline;
        ch = *s++;
 
-       skip_whitespace(s, ch);                 /* find the command */
+       skip_whitespace(s, ch);                 /* find the command */
        if(ch == '\0') {
-           error("Aflush line %d: syntax error", line);
+           error("flush line %d: syntax error (no command)", line);
            continue;
        }
        command = s - 1;
@@ -1372,13 +1579,13 @@ disklist_t *tapeqp;
        }
 
        if(strcmp(command,"FLUSH") != 0) {
-           error("Bflush line %d: syntax error", line);
+           error("flush line %d: syntax error (%s != FLUSH)", line, command);
            continue;
        }
 
        skip_whitespace(s, ch);                 /* find the hostname */
        if(ch == '\0') {
-           error("Cflush line %d: syntax error", line);
+           error("flush line %d: syntax error (no hostname)", line);
            continue;
        }
        hostname = s - 1;
@@ -1387,7 +1594,7 @@ disklist_t *tapeqp;
 
        skip_whitespace(s, ch);                 /* find the diskname */
        if(ch == '\0') {
-           error("Cflush line %d: syntax error", line);
+           error("flush line %d: syntax error (no diskname)", line);
            continue;
        }
        diskname = s - 1;
@@ -1396,7 +1603,7 @@ disklist_t *tapeqp;
 
        skip_whitespace(s, ch);                 /* find the datestamp */
        if(ch == '\0') {
-           error("Cflush line %d: syntax error", line);
+           error("flush line %d: syntax error (no datestamp)", line);
            continue;
        }
        datestamp = s - 1;
@@ -1405,14 +1612,14 @@ disklist_t *tapeqp;
 
        skip_whitespace(s, ch);                 /* find the level number */
        if(ch == '\0' || sscanf(s - 1, "%d", &level) != 1) {
-           error("Cflush line %d: syntax error", line);
+           error("flush line %d: syntax error (bad level)", line);
            continue;
        }
        skip_integer(s, ch);
 
        skip_whitespace(s, ch);                 /* find the filename */
        if(ch == '\0') {
-           error("Cflush line %d: syntax error", line);
+           error("flush line %d: syntax error (no filename)", line);
            continue;
        }
        destname = s - 1;
@@ -1475,28 +1682,28 @@ disklist_t *tapeqp;
        sp->degr_level = -1;
        sp->est_kps = 10;
        sp->attempted = 0;
-       sp->act_size = size_holding_files(destname);
-       /*sp->holdp = NULL; JLM: must be build*/
+       sp->act_size = size_holding_files(destname, 0);
        sp->holdp = build_diskspace(destname);
-        if(sp->holdp == NULL) continue;
+       if(sp->holdp == NULL) continue;
        sp->dumper = NULL;
        sp->timestamp = (time_t)0;
 
        dp1->up = (char *)sp;
 
-       enqueue_disk(tapeqp, dp1);
-       flush_size += sp->act_size;
+       enqueue_disk(&tq, dp1);
     }
-    printf("driver: flush size %ld\n", flush_size);
     amfree(inpline);
-}
 
+    return tq;
+}
 
-void read_schedule(waitqp, runqp)
-disklist_t *waitqp, *runqp;
+static void
+read_schedule(cookie)
+    void *cookie;
 {
     sched_t *sp;
     disk_t *dp;
+    disklist_t rq;
     int level, line, priority;
     char *dumpdate, *degr_dumpdate;
     int degr_level;
@@ -1506,6 +1713,11 @@ disklist_t *waitqp, *runqp;
     char *command;
     char *s;
     int ch;
+    long flush_size = 0;
+
+    rq.head = rq.tail = NULL;
+
+    event_release(schedule_ev_read);
 
     /* read schedule from stdin */
 
@@ -1686,16 +1898,21 @@ disklist_t *waitqp, *runqp;
        if(dp->host->features == NULL) {
            dp->host->features = am_string_to_feature(features);
        }
-       remove_disk(waitqp, dp);
-       insert_disk(&runq, dp, sort_by_time);
+       remove_disk(&waitq, dp);
+       enqueue_disk(&runq, dp);
+       flush_size += sp->act_size;
     }
+    printf("driver: flush size %ld\n", flush_size);
     amfree(inpline);
     if(line == 0)
        log_add(L_WARNING, "WARNING: got empty schedule from planner");
+    if(need_degraded==1) start_degraded_mode(&runq);
+    start_some_dumps(&runq);
 }
 
-int free_kps(ip)
-interface_t *ip;
+static int
+free_kps(ip)
+    interface_t *ip;
 {
     int res;
 
@@ -1716,8 +1933,9 @@ interface_t *ip;
     return res;
 }
 
-void interface_state(time_str)
-char *time_str;
+static void
+interface_state(time_str)
+    char *time_str;
 {
     interface_t *ip;
 
@@ -1729,23 +1947,26 @@ char *time_str;
     printf("\n");
 }
 
-void allocate_bandwidth(ip, kps)
-interface_t *ip;
-int kps;
+static void
+allocate_bandwidth(ip, kps)
+    interface_t *ip;
+    int kps;
 {
     ip->curusage += kps;
 }
 
-void deallocate_bandwidth(ip, kps)
-interface_t *ip;
-int kps;
+static void
+deallocate_bandwidth(ip, kps)
+    interface_t *ip;
+    int kps;
 {
     assert(kps <= ip->curusage);
     ip->curusage -= kps;
 }
 
 /* ------------ */
-unsigned long free_space()
+static unsigned long
+free_space()
 {
     holdingdisk_t *hdp;
     unsigned long total_free;
@@ -1760,22 +1981,21 @@ unsigned long free_space()
     return total_free;
 }
 
-assignedhd_t **find_diskspace(size, cur_idle, pref)
-unsigned long size;
-int *cur_idle;
-assignedhd_t *pref;
-/* Rewrite by Peter Conrad <conrad@opus5.de>, June '99:
- *  - enable splitting a dump across several holding disks
- *  - allocate only as much as size tells us, dumpers may request more later
- * We return an array of pointers to assignedhd_t. The array contains at
+static assignedhd_t **
+find_diskspace(size, cur_idle, pref)
+    unsigned long size;
+    int *cur_idle;
+    assignedhd_t *pref;
+/* We return an array of pointers to assignedhd_t. The array contains at
  * most one entry per holding disk. The list of pointers is terminated by
  * a NULL pointer. Each entry contains a pointer to a holdingdisk and
  * how much diskspace to use on that disk. Later on, assign_holdingdisk
  * will allocate the given amount of space.
  * If there is not enough room on the holdingdisks, NULL is returned.
  */
+
 {
-assignedhd_t **result = NULL;
+    assignedhd_t **result = NULL;
     holdingdisk_t *minp, *hdp;
     int i=0, num_holdingdisks=0; /* are we allowed to use the global thing? */
     int j, minj;
@@ -1785,7 +2005,7 @@ assignedhd_t **result = NULL;
     size = am_round(size, DISK_BLOCK_KB);
 
 #ifdef HOLD_DEBUG
-    printf("find diskspace: want %lu K\n", size );
+    printf("%s: want %lu K\n", debug_prefix_time(": find_diskspace"), size);
     fflush(stdout);
 #endif
 
@@ -1811,7 +2031,7 @@ assignedhd_t **result = NULL;
                break;
            }
            else if( holdalloc(hdp)->allocated_space <= hdp->disksize - 2*DISK_BLOCK_KB &&
-               !used[j] && 
+               !used[j] &&
                (!minp ||
                 holdalloc(hdp)->allocated_dumpers < holdalloc(minp)->allocated_dumpers ||
                 (holdalloc(hdp)->allocated_dumpers == holdalloc(minp)->allocated_dumpers &&
@@ -1820,6 +2040,7 @@ assignedhd_t **result = NULL;
                minj = j;
            }
        }
+
        pref = NULL;
        if( !minp ) { break; } /* all holding disks are full */
        used[minj] = 1;
@@ -1837,7 +2058,9 @@ assignedhd_t **result = NULL;
        halloc = dalloc + (((dalloc-1)/minp->chunksize)+1) * DISK_BLOCK_KB;
 
 #ifdef HOLD_DEBUG
-       fprintf(stdout,"find diskspace: size %ld hf %ld df %ld da %ld ha %ld\n",                size, hfree, dfree, dalloc, halloc);
+       printf("%s: find diskspace: size %ld hf %ld df %ld da %ld ha %ld\n",
+              debug_prefix_time(": find_diskspace"),
+              size, hfree, dfree, dalloc, halloc);
        fflush(stdout);
 #endif
        size -= dalloc;
@@ -1852,21 +2075,20 @@ assignedhd_t **result = NULL;
     amfree(used);
 
     if( size ) { /* not enough space available */
-#ifdef HOLD_DEBUG
        printf("find diskspace: not enough diskspace. Left with %lu K\n", size);
        fflush(stdout);
-#endif
        free_assignedhd(result);
        result = NULL;
     }
 
 #ifdef HOLD_DEBUG
     for( i = 0; result && result[i]; i++ ) {
-    printf("find diskspace: selected %s free %ld reserved %ld dumpers %d\n",
-           result[i]->disk->diskdir,
-           result[i]->disk->disksize - holdalloc(result[i]->disk)->allocated_space,
-          result[i]->reserved,
-           holdalloc(result[i]->disk)->allocated_dumpers);
+       printf("%s: find diskspace: selected %s free %ld reserved %ld dumpers %d\n",
+               debug_prefix_time(": find_diskspace"),
+               result[i]->disk->diskdir,
+               result[i]->disk->disksize - holdalloc(result[i]->disk)->allocated_space,
+               result[i]->reserved,
+               holdalloc(result[i]->disk)->allocated_dumpers);
     }
     fflush(stdout);
 #endif
@@ -1874,21 +2096,18 @@ assignedhd_t **result = NULL;
     return result;
 }
 
-int assign_holdingdisk(holdp, diskp)
-assignedhd_t **holdp;
-disk_t *diskp;
+static int
+assign_holdingdisk(holdp, diskp)
+    assignedhd_t **holdp;
+    disk_t *diskp;
 {
-/* Modified by Peter Conrad <conrad@opus5.de>, June '99
- * Modifications for splitting dumps across holding disks:
- * sched(diskp)->holdp now contains an array of pointers to assignedhd_t.
- */
     int i, j, c, l=0;
     unsigned long size;
     char *sfn = sanitise_filename(diskp->name);
     char lvl[64];
     assignedhd_t **new_holdp;
 
-    ap_snprintf( lvl, sizeof(lvl), "%d", sched(diskp)->level );
+    snprintf( lvl, sizeof(lvl), "%d", sched(diskp)->level );
 
     size = am_round(sched(diskp)->est_size - sched(diskp)->act_size,
                    DISK_BLOCK_KB);
@@ -1914,7 +2133,8 @@ disk_t *diskp;
            holdalloc(holdp[0]->disk)->allocated_space += holdp[0]->reserved;
            size = (holdp[0]->reserved>size) ? 0 : size-holdp[0]->reserved;
 #ifdef HOLD_DEBUG
-           printf("merging holding disk %s to disk %s:%s, add %lu for reserved %lu, left %lu\n",
+           printf("%s: merging holding disk %s to disk %s:%s, add %lu for reserved %lu, left %lu\n",
+                  debug_prefix_time(": assign_holdingdisk"),
                   sched(diskp)->holdp[j-1]->disk->diskdir,
                   diskp->host->hostname, diskp->name,
                   holdp[0]->reserved, sched(diskp)->holdp[j-1]->reserved,
@@ -1939,41 +2159,34 @@ disk_t *diskp;
        holdalloc(holdp[i]->disk)->allocated_space += holdp[i]->reserved;
        size = (holdp[i]->reserved>size) ? 0 : size-holdp[i]->reserved;
 #ifdef HOLD_DEBUG
-        printf("assigning holding disk %s to disk %s:%s, reserved %lu, left %lu\n",
-                holdp[i]->disk->diskdir, diskp->host->hostname, diskp->name,
-                holdp[i]->reserved, size );
-        fflush(stdout);
+       printf("%s: %d assigning holding disk %s to disk %s:%s, reserved %lu, left %lu\n",
+               debug_prefix_time(": assign_holdingdisk"),
+               i, holdp[i]->disk->diskdir, diskp->host->hostname, diskp->name,
+               holdp[i]->reserved, size );
+       fflush(stdout);
 #endif
        holdp[i] = NULL; /* so it doesn't get free()d... */
     }
     sched(diskp)->holdp[j] = NULL;
-    sched(diskp)->destname = newstralloc(sched(diskp)->destname,sched(diskp)->holdp[0]->destname);
     amfree(sfn);
 
     return l;
 }
 
-static void adjust_diskspace(diskp, cmd)
-disk_t *diskp;
-cmd_t cmd;
+static void
+adjust_diskspace(diskp, cmd)
+    disk_t *diskp;
+    cmd_t cmd;
 {
-/* Re-write by Peter Conrad <conrad@opus5.de>, March '99
- * Modifications for splitting dumps across holding disks:
- * Dumpers no longer write more than they've allocated, therefore an
- * adjustment may only free some allocated space.
- * 08/99: Jean-Louis suggested that dumpers tell us how much they've written.
- * We just believe them and don't stat all the files but rely on the used
- * field.
- */
-
     assignedhd_t **holdp;
     unsigned long total=0;
     long diff;
     int i;
 
 #ifdef HOLD_DEBUG
-    printf("adjust: %s:%s %s\n", diskp->host->hostname, diskp->name,
-           sched(diskp)->destname );
+    printf("%s: %s:%s %s\n",
+          debug_prefix_time(": adjust_diskspace"),
+          diskp->host->hostname, diskp->name, sched(diskp)->destname);
     fflush(stdout);
 #endif
 
@@ -1985,32 +2198,33 @@ cmd_t cmd;
        diff = holdp[i]->used - holdp[i]->reserved;
        total += holdp[i]->used;
        holdalloc(holdp[i]->disk)->allocated_space += diff;
+
 #ifdef HOLD_DEBUG
-       printf("adjust: hdisk %s done, reserved %ld used %ld diff %ld alloc %ld dumpers %d\n",
+       printf("%s: hdisk %s done, reserved %ld used %ld diff %ld alloc %ld dumpers %d\n",
+               debug_prefix_time(": adjust_diskspace"),
                holdp[i]->disk->name, holdp[i]->reserved, holdp[i]->used, diff,
                holdalloc(holdp[i]->disk)->allocated_space,
                holdalloc(holdp[i]->disk)->allocated_dumpers );
-               fflush(stdout);
+       fflush(stdout);
 #endif
        holdp[i]->reserved += diff;
     }
 
     sched(diskp)->act_size = total;
+
 #ifdef HOLD_DEBUG
-    printf("adjust: after: disk %s:%s used %ld\n", diskp->host->hostname,
-          diskp->name, sched(diskp)->act_size );
+    printf("%s: after: disk %s:%s used %ld\n",
+          debug_prefix_time(": adjust_diskspace"),
+          diskp->host->hostname, diskp->name, sched(diskp)->act_size );
     fflush(stdout);
 #endif
+
 }
 
-static void delete_diskspace(diskp)
-disk_t *diskp;
+static void
+delete_diskspace(diskp)
+    disk_t *diskp;
 {
-/* Re-write by Peter Conrad <conrad@opus5.de>, March '99
- * Modifications for splitting dumps across holding disks:
- * After implementing Jean-Louis' suggestion (see above) this looks much
- * simpler... again, we rely on assignedhd_s containing correct info
- */
     assignedhd_t **holdp;
     int i;
 
@@ -2019,24 +2233,22 @@ disk_t *diskp;
     assert(holdp);
 
     for( i = 0; holdp[i]; i++ ) { /* for each disk */
-        /* find all files of this dump on that disk, and subtract their
-         * reserved sizes from the disk's allocated space
-         */
+       /* find all files of this dump on that disk, and subtract their
+        * reserved sizes from the disk's allocated space
+        */
        holdalloc(holdp[i]->disk)->allocated_space -= holdp[i]->used;
     }
 
-    unlink_holding_files(holdp[0]->destname); /* no need for the entire list, 
-                                                        because unlink_holding_files
-                                                will walk through all files
-                                                        using cont_filename */
-
+    unlink_holding_files(holdp[0]->destname);  /* no need for the entire list,
+                                                * because unlink_holding_files
+                                                * will walk through all files
+                                                * using cont_filename */
     free_assignedhd(sched(diskp)->holdp);
     sched(diskp)->holdp = NULL;
     sched(diskp)->act_size = 0;
-    amfree(sched(diskp)->destname);
 }
 
-assignedhd_t **build_diskspace(destname)
+static assignedhd_t **build_diskspace(destname)
 char *destname;
 {
     int i, j;
@@ -2085,8 +2297,9 @@ char *destname;
                    filename, strerror(errno));
            return NULL;
        }
-       buflen = fullread(fd, buffer, sizeof(buffer));
-       parse_file_header(buffer, &file, buflen);
+       if ((buflen = fullread(fd, buffer, sizeof(buffer))) > 0) {;
+               parse_file_header(buffer, &file, buflen);
+       }
        close(fd);
        filename = file.cont_filename;
     }
@@ -2108,9 +2321,9 @@ char *destname;
     return result;
 }
 
-
-void holdingdisk_state(time_str)
-char *time_str;
+static void
+holdingdisk_state(time_str)
+    char *time_str;
 {
     holdingdisk_t *hdp;
     int dsk;
@@ -2126,9 +2339,14 @@ char *time_str;
     printf("\n");
 }
 
-static void update_failed_dump_to_tape(dp)
-disk_t *dp;
+static void
+update_failed_dump_to_tape(dp)
+    disk_t *dp;
 {
+/* JLM
+ * should simply set no_bump
+ */
+
     time_t save_timestamp = sched(dp)->timestamp;
     /* setting timestamp to 0 removes the current level from the
      * database, so that we ensure that it will not be bumped to the
@@ -2141,8 +2359,9 @@ disk_t *dp;
 }
 
 /* ------------------- */
-int dump_to_tape(dp)
-     disk_t *dp;
+static int
+dump_to_tape(dp)
+    disk_t *dp;
 {
     dumper_t *dumper;
     int failed = 0;
@@ -2150,13 +2369,12 @@ int dump_to_tape(dp)
     long origsize = 0;
     long dumpsize = 0;
     long dumptime = 0;
+    float tapetime = 0;
     cmd_t cmd;
     int result_argc;
     char *result_argv[MAX_ARGS+1];
     int dumper_tryagain = 0;
 
-    inside_dump_to_tape = 1;   /* for simulator */
-
     printf("driver: dumping %s:%s directly to tape\n",
           dp->host->hostname, dp->name);
     fflush(stdout);
@@ -2170,7 +2388,6 @@ int dump_to_tape(dp)
        fflush(stdout);
        log_add(L_WARNING, "no idle dumpers for %s:%s.\n",
                dp->host->hostname, dp->name);
-       inside_dump_to_tape = 0;
        return 2;       /* fatal problem */
     }
 
@@ -2182,11 +2399,10 @@ int dump_to_tape(dp)
        printf("driver: did not get PORT from taper for %s:%s\n",
                dp->host->hostname, dp->name);
        fflush(stdout);
-       inside_dump_to_tape = 0;
        return 2;       /* fatal problem */
     }
     /* copy port number */
-    sched(dp)->destname = newvstralloc(sched(dp)->destname, result_argv[2], NULL );
+    dumper->output_port = atoi(result_argv[2]);
 
     /* tell the dumper to dump to a port */
 
@@ -2200,13 +2416,13 @@ int dump_to_tape(dp)
     dp->inprogress = 1;
     sched(dp)->timestamp = time((time_t *)0);
     allocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
-    idle_reason = 0;
+    idle_reason = NOT_IDLE;
 
     short_dump_state();
 
     /* wait for result from dumper */
 
-    cmd = getresult(dumper->outfd, 1, &result_argc, result_argv, MAX_ARGS+1);
+    cmd = getresult(dumper->fd, 1, &result_argc, result_argv, MAX_ARGS+1);
 
     if(cmd != BOGUS)
        free_serial(result_argv[2]);
@@ -2220,21 +2436,21 @@ int dump_to_tape(dp)
        failed = 1;     /* dump failed, must still finish up with taper */
        break;
 
-    case DONE: /* DONE <handle> <origsize> <dumpsize> <dumptime> <err str> */
+    case DONE: /* DONE <handle> <origsize> <dumpsize> <dumptime> <errstr> */
        /* everything went fine */
        origsize = (long)atof(result_argv[3]);
-       dumpsize = (long)atof(result_argv[4]);
+       /*dumpsize = (long)atof(result_argv[4]);*/
        dumptime = (long)atof(result_argv[5]);
        break;
 
     case NO_ROOM: /* NO-ROOM <handle> */
        dumper_cmd(dumper, ABORT, dp);
-       cmd = getresult(dumper->outfd, 1, &result_argc, result_argv, MAX_ARGS+1);
+       cmd = getresult(dumper->fd, 1, &result_argc, result_argv, MAX_ARGS+1);
        if(cmd != BOGUS)
            free_serial(result_argv[2]);
        assert(cmd == ABORT_FINISHED);
 
-    case TRYAGAIN: /* TRY-AGAIN <handle> <err str> */
+    case TRYAGAIN: /* TRY-AGAIN <handle> <errstr> */
     default:
        /* dump failed, but we must still finish up with taper */
        /* problem with dump, possibly nonfatal, retry one time */
@@ -2257,9 +2473,12 @@ int dump_to_tape(dp)
      * "no space on device", etc., since taper closed the port first.
      */
 
+    continue_port_dump:
+
     cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1);
 
     switch(cmd) {
+    case PARTIAL:
     case DONE: /* DONE <handle> <label> <tape file> <err mess> */
        if(result_argc != 5) {
            error("error [dump to tape DONE result_argc != 5: %d]", result_argc);
@@ -2270,17 +2489,23 @@ int dump_to_tape(dp)
 
        free_serial(result_argv[2]);
 
-       /* every thing went fine */
-       update_info_dumper(dp, origsize, dumpsize, dumptime);
-       filenum = atoi(result_argv[4]);
-       update_info_taper(dp, result_argv[3], filenum, sched(dp)->level);
-       /* note that update_info_dumper() must be run before
-          update_info_taper(), since update_info_dumper overwrites
-          tape information.  */
+       sscanf(result_argv[5],"[sec %f kb %ld ", &tapetime, &dumpsize);
+
+       if(cmd == DONE) {
+           /* every thing went fine */
+           update_info_dumper(dp, origsize, dumpsize, dumptime);
+           filenum = atoi(result_argv[4]);
+           update_info_taper(dp, result_argv[3], filenum, sched(dp)->level);
+           /* note that update_info_dumper() must be run before
+              update_info_taper(), since update_info_dumper overwrites
+              tape information.  */
+       }
 
        break;
 
     case TRYAGAIN: /* TRY-AGAIN <handle> <err mess> */
+       tape_left = tape_length;
+       current_tape++;
        if(dumper_tryagain == 0) {
            sched(dp)->attempted++;
            if(sched(dp)->attempted > failed)
@@ -2292,10 +2517,21 @@ int dump_to_tape(dp)
     failed_dumper:
        update_failed_dump_to_tape(dp);
        free_serial(result_argv[2]);
-       tape_left = tape_length;
        break;
 
-
+    case SPLIT_CONTINUE:  /* SPLIT_CONTINUE <handle> <new_label> */
+        if (result_argc != 3) {
+            error("error [taper SPLIT_CONTINUE result_argc != 3: %d]", result_argc);
+        }
+        fprintf(stderr, "driver: Got SPLIT_CONTINUE %s %s\n", result_argv[2], result_argv[3]);
+        goto continue_port_dump;
+        break;
+    case SPLIT_NEEDNEXT:
+        fprintf(stderr, "driver: Got SPLIT_NEEDNEXT %s %s\n", result_argv[2], result_argv[3]);
+
+        goto continue_port_dump;
+        break;
     case TAPE_ERROR: /* TAPE-ERROR <handle> <err mess> */
     case BOGUS:
     default:
@@ -2312,12 +2548,12 @@ int dump_to_tape(dp)
     dp->inprogress = 0;
     deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
 
-    inside_dump_to_tape = 0;
     return failed;
 }
 
-int queue_length(q)
-disklist_t q;
+static int
+queue_length(q)
+    disklist_t q;
 {
     disk_t *p;
     int len;
@@ -2326,8 +2562,8 @@ disklist_t q;
     return len;
 }
 
-
-void short_dump_state()
+static void
+short_dump_state()
 {
     int i, nidle;
     char *wall_time;
@@ -2346,15 +2582,17 @@ void short_dump_state()
     printf(" qlen tapeq: %d", queue_length(tapeq));
     printf(" runq: %d", queue_length(runq));
     printf(" roomq: %d", queue_length(roomq));
-    printf(" wakeup: %d", (int)sleep_time.tv_sec);
+    printf(" wakeup: %d", (int)sleep_time);
     printf(" driver-idle: %s\n", idle_strings[idle_reason]);
     interface_state(wall_time);
     holdingdisk_state(wall_time);
     fflush(stdout);
 }
 
-void dump_state(str)
-char *str;
+#if 0
+static void
+dump_state(str)
+    const char *str;
 {
     int i;
     disk_t *dp;
@@ -2383,3 +2621,4 @@ char *str;
     printf("================\n");
     fflush(stdout);
 }
+#endif