X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=server-src%2Fdriver.c;h=55f1789be0b1aebccc2702bb337756f4bd09229a;hb=94c03cae686e4196a345d72452fda2a5203768ce;hp=345f3f931d2ba14d5e07e58b205c13d4fca2d26e;hpb=2df780bff19c457b0debb7adc29972a0bc2a5dc2;p=debian%2Famanda diff --git a/server-src/driver.c b/server-src/driver.c index 345f3f9..55f1789 100644 --- a/server-src/driver.c +++ b/server-src/driver.c @@ -1,6 +1,6 @@ /* * Amanda, The Advanced Maryland Automatic Network Disk Archiver - * Copyright (c) 1991-2000 University of Maryland at College Park + * Copyright (c) 1991-1998 University of Maryland at College Park * All Rights Reserved. * * Permission to use, copy, modify, distribute, and sell this software and its @@ -24,7 +24,7 @@ * file named AUTHORS, in the root directory of this distribution. */ /* - * $Id: driver.c,v 1.58.2.31.2.8.2.20.2.16 2005/09/20 21:31:52 jrjackson Exp $ + * $Id: driver.c,v 1.165.2.2 2006/04/23 23:04:33 martinea Exp $ * * controlling process for the Amanda backup system */ @@ -34,10 +34,13 @@ * tape. Probably not effective though, should do this in planner. */ +/*#define HOLD_DEBUG*/ + #include "amanda.h" #include "clock.h" #include "conffile.h" #include "diskfile.h" +#include "event.h" #include "holding.h" #include "infofile.h" #include "logfile.h" @@ -46,63 +49,73 @@ #include "driverio.h" #include "server_util.h" -disklist_t waitq, runq, tapeq, roomq; -int pending_aborts, inside_dump_to_tape; -disk_t *taper_disk; -int degraded_mode; -unsigned long reserved_space; -unsigned long total_disksize; -char *dumper_program; -int inparallel; -int nodump = 0; -unsigned long tape_length, tape_left = 0; -int conf_taperalgo; -am_host_t *flushhost = NULL; - -int client_constrained P((disk_t *dp)); -int sort_by_priority_reversed P((disk_t *a, disk_t *b)); -int sort_by_time P((disk_t *a, disk_t *b)); -int start_some_dumps P((disklist_t *rq)); -void dump_schedule P((disklist_t *qp, char *str)); -void start_degraded_mode P((disklist_t *queuep)); -void handle_taper_result P((void)); -dumper_t *idle_dumper P((void)); -int some_dumps_in_progress P((void)); -int num_busy_dumpers P((void)); -dumper_t *lookup_dumper P((int fd)); -void handle_dumper_result P((int fd)); -void read_flush P((disklist_t *tapeqp)); -void read_schedule P((disklist_t *waitqp, disklist_t *runqp)); -int free_kps P((interface_t *ip)); -void interface_state P((char *time_str)); -void allocate_bandwidth P((interface_t *ip, int kps)); -void deallocate_bandwidth P((interface_t *ip, int kps)); -unsigned long free_space P((void)); -assignedhd_t **find_diskspace P((unsigned long size, int *cur_idle, assignedhd_t *preferred)); -char *diskname2filename P((char *dname)); -int assign_holdingdisk P((assignedhd_t **holdp, disk_t *diskp)); +static disklist_t waitq, runq, tapeq, roomq; +static int pending_aborts; +static disk_t *taper_disk; +static int degraded_mode; +static unsigned long reserved_space; +static unsigned long total_disksize; +static char *dumper_program; +static char *chunker_program; +static int inparallel; +static int nodump = 0; +static unsigned long tape_length, tape_left = 0; +static int current_tape = 1; +static int conf_taperalgo; +static int conf_runtapes; +static time_t sleep_time; +static int idle_reason; +static char *datestamp; +static char *timestamp; +static am_host_t *flushhost = NULL; +static int need_degraded=0; + +static event_handle_t *dumpers_ev_time = NULL; +static event_handle_t *schedule_ev_read = NULL; + +static void allocate_bandwidth P((interface_t *ip, int kps)); +static int assign_holdingdisk P((assignedhd_t **holdp, disk_t *diskp)); static void adjust_diskspace P((disk_t *diskp, cmd_t cmd)); static void delete_diskspace P((disk_t *diskp)); -assignedhd_t **build_diskspace P((char *destname)); -void holdingdisk_state P((char *time_str)); -int dump_to_tape P((disk_t *dp)); -int queue_length P((disklist_t q)); -void short_dump_state P((void)); -void dump_state P((char *str)); -void startaflush P((void)); +static assignedhd_t **build_diskspace P((char *destname)); +static int client_constrained P((disk_t *dp)); +static void deallocate_bandwidth P((interface_t *ip, int kps)); +static void dump_schedule P((disklist_t *qp, char *str)); +static int dump_to_tape P((disk_t *dp)); +static assignedhd_t **find_diskspace P((unsigned long size, int *cur_idle, + assignedhd_t *preferred)); +static int free_kps P((interface_t *ip)); +static unsigned long free_space P((void)); +static void dumper_result P((disk_t *dp)); +static void handle_dumper_result P((void *)); +static void handle_chunker_result P((void *)); +static void handle_dumpers_time P((void *)); +static void handle_taper_result P((void *)); +static void holdingdisk_state P((char *time_str)); +static dumper_t *idle_dumper P((void)); +static void interface_state P((char *time_str)); +static int num_busy_dumpers P((void)); +static int queue_length P((disklist_t q)); +static disklist_t read_flush P((void)); +static void read_schedule P((void *cookie)); +static void short_dump_state P((void)); +static void startaflush P((void)); +static void start_degraded_mode P((disklist_t *queuep)); +static void start_some_dumps P((disklist_t *rq)); +static void continue_port_dumps(); +static void update_failed_dump_to_tape P((disk_t *)); +#if 0 +static void dump_state P((const char *str)); +#endif int main P((int main_argc, char **main_argv)); -static int idle_reason; -char *datestamp; -char *timestamp; - -char *idle_strings[] = { +static const char *idle_strings[] = { #define NOT_IDLE 0 "not-idle", -#define IDLE_START_WAIT 1 - "start-wait", -#define IDLE_NO_DUMPERS 2 +#define IDLE_NO_DUMPERS 1 "no-dumpers", +#define IDLE_START_WAIT 2 + "start-wait", #define IDLE_NO_HOLD 3 "no-hold", #define IDLE_CLIENT_CONSTRAINED 4 @@ -117,19 +130,14 @@ char *idle_strings[] = { "taper-wait", }; -#define SLEEP_MAX (24*3600) -struct timeval sleep_time = { SLEEP_MAX, 0 }; -/* enabled if any disks are in start-wait: */ -int any_delayed_disk = 0; - -int main(main_argc, main_argv) +int +main(main_argc, main_argv) int main_argc; char **main_argv; { - disklist_t *origqp; + disklist_t origq; disk_t *diskp; - fd_set selectset; - int fd, dsk; + int dsk; dumper_t *dumper; char *newdir = NULL; generic_fs_stats_t fs; @@ -154,6 +162,7 @@ int main(main_argc, main_argv) set_pname("driver"); + /* Don't die when child closes pipe */ signal(SIGPIPE, SIG_IGN); malloc_size_1 = malloc_inuse(&malloc_hist_1); @@ -162,7 +171,6 @@ int main(main_argc, main_argv) set_logerror(logerror); startclock(); - FD_ZERO(&readset); printf("%s: pid %ld executable %s version %s\n", get_pname(), (long) getpid(), main_argv[0], version()); @@ -204,9 +212,12 @@ int main(main_argc, main_argv) taper_program = vstralloc(libexecdir, "/", "taper", versionsuffix(), NULL); dumper_program = vstralloc(libexecdir, "/", "dumper", versionsuffix(), NULL); + chunker_program = vstralloc(libexecdir, "/", "chunker", versionsuffix(), + NULL); conf_taperalgo = getconf_int(CNF_TAPERALGO); conf_tapetype = getconf_str(CNF_TAPETYPE); + conf_runtapes = getconf_int(CNF_RUNTAPES); tape = lookup_tapetype(conf_tapetype); tape_length = tape->length; printf("driver: tape size %ld\n", tape_length); @@ -214,8 +225,10 @@ int main(main_argc, main_argv) /* taper takes a while to get going, so start it up right away */ init_driverio(); - startup_tape_process(taper_program); - taper_cmd(START_TAPER, datestamp, NULL, 0, NULL); + if(conf_runtapes > 0) { + startup_tape_process(taper_program); + taper_cmd(START_TAPER, datestamp, NULL, 0, NULL); + } /* start initializing: read in databases */ @@ -225,9 +238,8 @@ int main(main_argc, main_argv) } else { conf_diskfile = stralloc2(config_dir, conf_diskfile); } - if((origqp = read_diskfile(conf_diskfile)) == NULL) { + if (read_diskfile(conf_diskfile, &origq) < 0) error("could not load disklist \"%s\"", conf_diskfile); - } amfree(conf_diskfile); /* set up any configuration-dependent variables */ @@ -276,7 +288,7 @@ int main(main_argc, main_argv) newdir = newvstralloc(newdir, hdp->diskdir, "/", timestamp, NULL); - if(!mkholdingdir(newdir)) { + if(!mkholdingdir(newdir)) { hdp->disksize = 0L; } total_disksize += hdp->disksize; @@ -292,7 +304,6 @@ int main(main_argc, main_argv) if(inparallel > MAX_DUMPERS) inparallel = MAX_DUMPERS; /* fire up the dumpers now while we are waiting */ - if(!nodump) startup_dump_processes(dumper_program, inparallel); /* @@ -303,12 +314,12 @@ int main(main_argc, main_argv) * in parallel with the planner. */ - waitq = *origqp; - tapeq.head = tapeq.tail = NULL; - roomq.head = roomq.tail = NULL; - runq.head = runq.tail = NULL; + runq.head = NULL; + runq.tail = NULL; + waitq = origq; + tapeq = read_flush(); - read_flush(&tapeq); + roomq.head = roomq.tail = NULL; log_add(L_STATS, "startup time %s", walltime_str(curclock())); @@ -320,87 +331,35 @@ int main(main_argc, main_argv) getconf_str(CNF_DUMPORDER)); fflush(stdout); - /* Let's see if the tape is ready */ + /* ok, planner is done, now lets see if the tape is ready */ - cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1); + if(conf_runtapes > 0) { + cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1); - if(cmd != TAPER_OK) { - /* no tape, go into degraded mode: dump to holding disk */ - start_degraded_mode(&runq); - FD_CLR(taper,&readset); + if(cmd != TAPER_OK) { + /* no tape, go into degraded mode: dump to holding disk */ + need_degraded=1; + } + } + else { + need_degraded=1; } - - short_dump_state(); /* for amstatus */ tape_left = tape_length; taper_busy = 0; taper_disk = NULL; + taper_ev_read = NULL; + if(!need_degraded) startaflush(); - /* Start autoflush while waiting for dump schedule */ - if(!nodump) { - /* Start any autoflush tape writes */ - if (!empty(tapeq)) { - startaflush(); - short_dump_state(); /* for amstatus */ - - /* Process taper results until the schedule arrives */ - while (1) { - FD_ZERO(&selectset); - FD_SET(0, &selectset); - FD_SET(taper, &selectset); - - if(select(taper+1, (SELECT_ARG_TYPE *)(&selectset), NULL, NULL, - &sleep_time) == -1) - error("select: %s", strerror(errno)); - if (FD_ISSET(0, &selectset)) break; /* schedule arrived */ - if (FD_ISSET(taper, &selectset)) handle_taper_result(); - short_dump_state(); /* for amstatus */ - } - - } - - /* Read the dump schedule */ - read_schedule(&waitq, &runq); - } - - /* Start any needed flushes */ - startaflush(); - - while(start_some_dumps(&runq) || some_dumps_in_progress() || - any_delayed_disk) { - short_dump_state(); - - /* wait for results */ - - memcpy(&selectset, &readset, sizeof(fd_set)); - if(select(maxfd+1, (SELECT_ARG_TYPE *)(&selectset), - NULL, NULL, &sleep_time) == -1) - error("select: %s", strerror(errno)); - - /* handle any results that have come in */ - - for(fd = 0; fd <= maxfd; fd++) { - /* - * The first pass through the following loop, we have - * data ready for areads (called by getresult, called by - * handle_.*_result). But that may read more than one record, - * so we need to keep processing as long as areads has data. - * We will get control back after each record and the buffer - * will go empty (indicated by areads_dataready(fd) == 0) - * after the last one available has been processed. - */ - while(FD_ISSET(fd, &selectset) || areads_dataready(fd) > 0) { - if(fd == taper) handle_taper_result(); - else handle_dumper_result(fd); - FD_CLR(fd, &selectset); - } - } + if(!nodump) + schedule_ev_read = event_register(0, EV_READFD, read_schedule, NULL); - } + short_dump_state(); + event_loop(0); /* handle any remaining dumps by dumping directly to tape, if possible */ - while(!empty(runq)) { + while(!empty(runq) && taper > 0) { diskp = dequeue_disk(&runq); if(!degraded_mode) { int rc = dump_to_tape(diskp); @@ -419,8 +378,8 @@ int main(main_argc, main_argv) } else log_add(L_FAIL, "%s %s %s %d [%s]", - diskp->host->hostname, diskp->name, - sched(diskp)->datestamp, sched(diskp)->level, + diskp->host->hostname, diskp->name, sched(diskp)->datestamp, + sched(diskp)->level, diskp->no_hold ? "can't dump no-hold disk in degraded mode" : "no more holding disk space"); @@ -474,7 +433,7 @@ int main(main_argc, main_argv) who = stralloc("taper"); } if(what != NULL && who == NULL) { - ap_snprintf(number, sizeof(number), "%ld", (long)pid); + snprintf(number, sizeof(number), "%ld", (long)pid); who = stralloc2("unknown pid ", number); } if(who && what) { @@ -494,6 +453,7 @@ int main(main_argc, main_argv) } amfree(newdir); + check_unfree_serial(); printf("driver: FINISHED time %s\n", walltime_str(curclock())); fflush(stdout); log_add(L_FINISH,"date %s time %s", datestamp, walltime_str(curclock())); @@ -514,21 +474,26 @@ int main(main_argc, main_argv) return 0; } -void startaflush() { +static void +startaflush() +{ disk_t *dp = NULL; disk_t *fit = NULL; char *datestamp; - + unsigned int extra_tapes = 0; if(!degraded_mode && !taper_busy && !empty(tapeq)) { + datestamp = sched(tapeq.head)->datestamp; switch(conf_taperalgo) { case ALGO_FIRST: dp = dequeue_disk(&tapeq); break; - case ALGO_FIRSTFIT: + case ALGO_FIRSTFIT: fit = tapeq.head; while (fit != NULL) { - if(sched(fit)->act_size <= tape_left && + extra_tapes = (fit->tape_splitsize > 0) ? + conf_runtapes - current_tape : 0; + if(sched(fit)->act_size <= (tape_left + tape_length*extra_tapes) && strcmp(sched(fit)->datestamp, datestamp) <= 0) { dp = fit; fit = NULL; @@ -539,7 +504,7 @@ void startaflush() { } if(dp) remove_disk(&tapeq, dp); break; - case ALGO_LARGEST: + case ALGO_LARGEST: fit = dp = tapeq.head; while (fit != NULL) { if(sched(fit)->act_size > sched(dp)->act_size && @@ -550,10 +515,12 @@ void startaflush() { } if(dp) remove_disk(&tapeq, dp); break; - case ALGO_LARGESTFIT: + case ALGO_LARGESTFIT: fit = tapeq.head; while (fit != NULL) { - if(sched(fit)->act_size <= tape_left && + extra_tapes = (fit->tape_splitsize > 0) ? + conf_runtapes - current_tape : 0; + if(sched(fit)->act_size <= (tape_left + tape_length*extra_tapes) && (!dp || sched(fit)->act_size > sched(dp)->act_size) && strcmp(sched(fit)->datestamp, datestamp) <= 0) { dp = fit; @@ -562,7 +529,7 @@ void startaflush() { } if(dp) remove_disk(&tapeq, dp); break; - case ALGO_SMALLEST: + case ALGO_SMALLEST: break; case ALGO_LAST: dp = tapeq.tail; @@ -580,27 +547,40 @@ void startaflush() { strcmp(sched(fit)->datestamp, datestamp) <= 0) { dp = fit; } - fit = fit->next; + fit = fit->next; } if(dp) remove_disk(&tapeq, dp); } - taper_disk = dp; - taper_busy = 1; - taper_cmd(FILE_WRITE, dp, sched(dp)->destname, sched(dp)->level, + if(taper_ev_read == NULL) { + taper_ev_read = event_register(taper, EV_READFD, + handle_taper_result, NULL); + } + if (dp != NULL) { + taper_disk = dp; + taper_busy = 1; + taper_cmd(FILE_WRITE, dp, sched(dp)->destname, sched(dp)->level, sched(dp)->datestamp); - fprintf(stderr,"driver: startaflush: %s %s %s %ld %ld\n", - taperalgo2str(conf_taperalgo), dp->host->hostname, - dp->name, sched(taper_disk)->act_size, tape_left); - if(sched(dp)->act_size <= tape_left) - tape_left -= sched(dp)->act_size; - else - tape_left = 0; + fprintf(stderr,"driver: startaflush: %s %s %s %ld %ld\n", + taperalgo2str(conf_taperalgo), dp->host->hostname, + dp->name, sched(taper_disk)->act_size, tape_left); + if(sched(dp)->act_size <= tape_left) + tape_left -= sched(dp)->act_size; + else + tape_left = 0; + } else { + error("FATAL: Taper marked busy and no work found."); + /*NOTREACHED*/ + } + } else if(!taper_busy && taper_ev_read != NULL) { + event_release(taper_ev_read); + taper_ev_read = NULL; } } -int client_constrained(dp) -disk_t *dp; +static int +client_constrained(dp) + disk_t *dp; { disk_t *dp2; @@ -624,86 +604,109 @@ disk_t *dp; return 0; } -int start_some_dumps(rq) -disklist_t *rq; +static void +start_some_dumps(rq) + disklist_t *rq; { - int total, cur_idle; - disk_t *diskp, *diskp_accept; - dumper_t *dumper; + int cur_idle; + disk_t *diskp, *delayed_diskp, *diskp_accept; assignedhd_t **holdp=NULL, **holdp_accept; - time_t now = time(NULL); + const time_t now = time(NULL); + cmd_t cmd; + int result_argc; + char *result_argv[MAX_ARGS+1]; + chunker_t *chunker; + dumper_t *dumper; + char dumptype; + char *dumporder; - total = 0; idle_reason = IDLE_NO_DUMPERS; - sleep_time.tv_sec = SLEEP_MAX; - sleep_time.tv_usec = 0; - any_delayed_disk = 0; + sleep_time = 0; - if(rq->head == NULL) { - idle_reason = 0; - return 0; + if(dumpers_ev_time != NULL) { + event_release(dumpers_ev_time); + dumpers_ev_time = NULL; } - /* - * A potential problem with starting from the bottom of the dump time - * distribution is that a slave host will have both one of the shortest - * and one of the longest disks, so starting its shortest disk first will - * tie up the host and eliminate its longest disk from consideration the - * first pass through. This could cause a big delay in starting that long - * disk, which could drag out the whole night's dumps. - * - * While starting from the top of the dump time distribution solves the - * above problem, this turns out to be a bad idea, because the big dumps - * will almost certainly pack the holding disk completely, leaving no - * room for even one small dump to start. This ends up shutting out the - * small-end dumpers completely (they stay idle). - * - * The introduction of multiple simultaneous dumps to one host alleviates - * the biggest&smallest dumps problem: both can be started at the - * beginning. - */ - for(dumper = dmptable; dumper < dmptable+inparallel; dumper++) { - if(dumper->busy || dumper->down) continue; - /* found an idle dumper, now find a disk for it */ - diskp = rq->head; + for (dumper = dmptable; dumper < dmptable+inparallel; dumper++) { + + if( dumper->busy ) { + continue; + } + + if (dumper->ev_read != NULL) { +/* assert(dumper->ev_read == NULL);*/ + event_release(dumper->ev_read); + dumper->ev_read = NULL; + } + + /* + * A potential problem with starting from the bottom of the dump time + * distribution is that a slave host will have both one of the shortest + * and one of the longest disks, so starting its shortest disk first will + * tie up the host and eliminate its longest disk from consideration the + * first pass through. This could cause a big delay in starting that long + * disk, which could drag out the whole night's dumps. + * + * While starting from the top of the dump time distribution solves the + * above problem, this turns out to be a bad idea, because the big dumps + * will almost certainly pack the holding disk completely, leaving no + * room for even one small dump to start. This ends up shutting out the + * small-end dumpers completely (they stay idle). + * + * The introduction of multiple simultaneous dumps to one host alleviates + * the biggest&smallest dumps problem: both can be started at the + * beginning. + */ + diskp_accept = NULL; holdp_accept = NULL; - - if(idle_reason == IDLE_NO_DUMPERS) - idle_reason = NOT_IDLE; + delayed_diskp = NULL; cur_idle = NOT_IDLE; - while(diskp) { + dumporder = getconf_str(CNF_DUMPORDER); + if(strlen(dumporder) > (dumper-dmptable)) { + dumptype = dumporder[dumper-dmptable]; + } + else { + if(dumper-dmptable < 3) + dumptype = 't'; + else + dumptype = 'T'; + } + + for(diskp = rq->head; diskp != NULL; diskp = diskp->next) { assert(diskp->host != NULL && sched(diskp) != NULL); /* round estimate to next multiple of DISK_BLOCK_KB */ sched(diskp)->est_size = am_round(sched(diskp)->est_size, DISK_BLOCK_KB); - if(diskp->host->start_t > now) { + if (diskp->host->start_t > now) { cur_idle = max(cur_idle, IDLE_START_WAIT); - sleep_time.tv_sec = min(diskp->host->start_t - now, - sleep_time.tv_sec); - any_delayed_disk = 1; - } - else if(diskp->start_t > now) { + if (delayed_diskp == NULL || sleep_time > diskp->host->start_t) { + delayed_diskp = diskp; + sleep_time = diskp->host->start_t; + } + } else if(diskp->start_t > now) { cur_idle = max(cur_idle, IDLE_START_WAIT); - sleep_time.tv_sec = min(diskp->start_t - now, - sleep_time.tv_sec); - any_delayed_disk = 1; - } - else if(diskp->host->netif->curusage > 0 && - sched(diskp)->est_kps > free_kps(diskp->host->netif)) + if (delayed_diskp == NULL || sleep_time > diskp->start_t) { + delayed_diskp = diskp; + sleep_time = diskp->start_t; + } + } else if (diskp->host->netif->curusage > 0 && + sched(diskp)->est_kps > free_kps(diskp->host->netif)) { cur_idle = max(cur_idle, IDLE_NO_BANDWIDTH); - else if(sched(diskp)->no_space) + } else if(sched(diskp)->no_space) { cur_idle = max(cur_idle, IDLE_NO_DISKSPACE); - else if((holdp = find_diskspace(sched(diskp)->est_size,&cur_idle,NULL)) == NULL) + } else if ((holdp = + find_diskspace(sched(diskp)->est_size,&cur_idle,NULL)) == NULL) { cur_idle = max(cur_idle, IDLE_NO_DISKSPACE); - else if(diskp->no_hold) { + } else if (diskp->no_hold) { free_assignedhd(holdp); cur_idle = max(cur_idle, IDLE_NO_HOLD); - } else if(client_constrained(diskp)) { + } else if (client_constrained(diskp)) { free_assignedhd(holdp); cur_idle = max(cur_idle, IDLE_CLIENT_CONSTRAINED); } else { @@ -711,17 +714,6 @@ disklist_t *rq; /* disk fits, dump it */ int accept = !diskp_accept; if(!accept) { - char dumptype; - char *dumporder = getconf_str(CNF_DUMPORDER); - if(strlen(dumporder) <= (dumper-dmptable)) { - if(dumper-dmptable < 3) - dumptype = 't'; - else - dumptype = 'T'; - } - else { - dumptype = dumporder[dumper-dmptable]; - } switch(dumptype) { case 's': accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size); break; @@ -735,7 +727,7 @@ disklist_t *rq; break; case 'B': accept = (sched(diskp)->est_kps > sched(diskp_accept)->est_kps); break; - default: log_add(L_WARNING, "Unknown dumporder character \'%c\', using 's'.\n", + default: log_add(L_WARNING, "Unknown dumporder character \'%c\', using 's'.\n", dumptype); accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size); break; @@ -755,60 +747,95 @@ disklist_t *rq; free_assignedhd(holdp); } } - diskp = diskp->next; } diskp = diskp_accept; holdp = holdp_accept; - if(diskp) { - cur_idle = NOT_IDLE; + + idle_reason = max(idle_reason, cur_idle); + + /* + * If we have no disk at this point, and there are disks that + * are delayed, then schedule a time event to call this dumper + * with the disk with the shortest delay. + */ + if (diskp == NULL && delayed_diskp != NULL) { + assert(sleep_time > now); + sleep_time -= now; + dumpers_ev_time = event_register(sleep_time, EV_TIME, + handle_dumpers_time, &runq); + return; + } else if (diskp != NULL) { sched(diskp)->act_size = 0; allocate_bandwidth(diskp->host->netif, sched(diskp)->est_kps); sched(diskp)->activehd = assign_holdingdisk(holdp, diskp); amfree(holdp); - diskp->host->inprogress += 1; /* host is now busy */ + sched(diskp)->destname = newstralloc(sched(diskp)->destname, + sched(diskp)->holdp[0]->destname); + diskp->host->inprogress++; /* host is now busy */ diskp->inprogress = 1; sched(diskp)->dumper = dumper; - sched(diskp)->timestamp = time((time_t *)0); + sched(diskp)->timestamp = now; + dumper->ev_read = event_register(dumper->fd, EV_READFD, + handle_dumper_result, dumper); dumper->busy = 1; /* dumper is now busy */ dumper->dp = diskp; /* link disk to dumper */ - total++; remove_disk(rq, diskp); /* take it off the run queue */ - dumper_cmd(dumper, FILE_DUMP, diskp); - diskp->host->start_t = time(NULL) + 15; + + sched(diskp)->origsize = -1; + sched(diskp)->dumpsize = -1; + sched(diskp)->dumptime = -1; + sched(diskp)->tapetime = -1; + chunker = dumper->chunker; + chunker->result = LAST_TOK; + dumper->result = LAST_TOK; + startup_chunk_process(chunker,chunker_program); + chunker_cmd(chunker, START, (void *)datestamp); + chunker->dumper = dumper; + chunker_cmd(chunker, PORT_WRITE, diskp); + cmd = getresult(chunker->fd, 1, &result_argc, result_argv, MAX_ARGS+1); + if(cmd != PORT) { + printf("driver: did not get PORT from %s for %s:%s\n", + chunker->name, diskp->host->hostname, diskp->name); + fflush(stdout); + return ; /* fatal problem */ + } + chunker->ev_read = event_register(chunker->fd, EV_READFD, + handle_chunker_result, chunker); + dumper->output_port = atoi(result_argv[2]); + + dumper_cmd(dumper, PORT_DUMP, diskp); + + diskp->host->start_t = now + 15; + } else if (/* cur_idle != NOT_IDLE && */ + (num_busy_dumpers() > 0 || taper_busy)) { + /* + * We are constrained. + */ } - idle_reason = max(idle_reason, cur_idle); } - return total; -} - -int sort_by_priority_reversed(a, b) -disk_t *a, *b; -{ - if(sched(b)->priority - sched(a)->priority != 0) - return sched(b)->priority - sched(a)->priority; - else - return sort_by_time(a, b); } -int sort_by_time(a, b) -disk_t *a, *b; +/* + * This gets called when a dumper is delayed for some reason. It may + * be because a disk has a delayed start, or amanda is constrained + * by network or disk limits. + */ +static void +handle_dumpers_time(cookie) + void *cookie; { - long diff; - - if ((diff = sched(a)->est_time - sched(b)->est_time) < 0) { - return -1; - } else if (diff > 0) { - return 1; - } else { - return 0; - } + disklist_t *runq = cookie; + event_release(dumpers_ev_time); + dumpers_ev_time = NULL; + start_some_dumps(runq); } -void dump_schedule(qp, str) -disklist_t *qp; -char *str; +static void +dump_schedule(qp, str) + disklist_t *qp; + char *str; { disk_t *dp; @@ -822,14 +849,19 @@ char *str; printf("--------\n"); } - -void start_degraded_mode(queuep) -disklist_t *queuep; +static void +start_degraded_mode(queuep) + disklist_t *queuep; { disk_t *dp; disklist_t newq; unsigned long est_full_size; + if (taper_ev_read != NULL) { + event_release(taper_ev_read); + taper_ev_read = NULL; + } + newq.head = newq.tail = 0; dump_schedule(queuep, "before start degraded mode"); @@ -840,11 +872,11 @@ disklist_t *queuep; if(sched(dp)->level != 0) /* go ahead and do the disk as-is */ - insert_disk(&newq, dp, sort_by_priority_reversed); + enqueue_disk(&newq, dp); else { if (reserved_space + est_full_size + sched(dp)->est_size <= total_disksize) { - insert_disk(&newq, dp, sort_by_priority_reversed); + enqueue_disk(&newq, dp); est_full_size += sched(dp)->est_size; } else if(sched(dp)->degr_level != -1) { @@ -853,12 +885,12 @@ disklist_t *queuep; sched(dp)->est_size = sched(dp)->degr_size; sched(dp)->est_time = sched(dp)->degr_time; sched(dp)->est_kps = sched(dp)->degr_kps; - insert_disk(&newq, dp, sort_by_priority_reversed); + enqueue_disk(&newq, dp); } else { - log_add(L_FAIL, "%s %s %s %d [can't switch to incremental dump]", - dp->host->hostname, dp->name, - sched(dp)->datestamp, sched(dp)->level); + log_add(L_FAIL,"%s %s %s %d [can't switch to incremental dump]", + dp->host->hostname, dp->name, sched(dp)->datestamp, + sched(dp)->level); } } } @@ -869,12 +901,13 @@ disklist_t *queuep; dump_schedule(queuep, "after start degraded mode"); } -void continue_dumps() + +static void continue_port_dumps() { -disk_t *dp, *ndp; -assignedhd_t **h; -int active_dumpers=0, busy_dumpers=0, i; -dumper_t *dumper; + disk_t *dp, *ndp; + assignedhd_t **h; + int active_dumpers=0, busy_dumpers=0, i; + dumper_t *dumper; /* First we try to grant diskspace to some dumps waiting for it. */ for( dp = roomq.head; dp; dp = ndp ) { @@ -882,13 +915,14 @@ dumper_t *dumper; /* find last holdingdisk used by this dump */ for( i = 0, h = sched(dp)->holdp; h[i+1]; i++ ); /* find more space */ - h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size, &active_dumpers, h[i] ); + h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size, + &active_dumpers, h[i] ); if( h ) { for(dumper = dmptable; dumper < dmptable + inparallel && dumper->dp != dp; dumper++); assert( dumper < dmptable + inparallel ); sched(dp)->activehd = assign_holdingdisk( h, dp ); - dumper_cmd( dumper, CONTINUE, dp ); + chunker_cmd( dumper->chunker, CONTINUE, dp ); amfree(h); remove_disk( &roomq, dp ); } @@ -909,17 +943,18 @@ dumper_t *dumper; * it will be dumped directly to tape. Actually, case c is a special * manifestation of case b) where only one dumper is busy. */ - for( dp=NULL, dumper = dmptable; dumper < dmptable + inparallel; dumper++) { + for(dp=NULL, dumper = dmptable; dumper < (dmptable+inparallel); dumper++) { if( dumper->busy ) { busy_dumpers++; if( !find_disk(&roomq, dumper->dp) ) { active_dumpers++; - } else if( !dp || sched(dp)->est_size > sched(dumper->dp)->est_size ) { + } else if( !dp || + sched(dp)->est_size > sched(dumper->dp)->est_size ) { dp = dumper->dp; } } } - if( !active_dumpers && busy_dumpers > 0 && + if((dp != NULL) && (active_dumpers == 0) && (busy_dumpers > 0) && ((!taper_busy && empty(tapeq)) || degraded_mode) && pending_aborts == 0 ) { /* not case a */ if( busy_dumpers == 1 ) { /* case c */ @@ -930,411 +965,587 @@ dumper_t *dumper; * We abort that dump, hopefully not wasting too much time retrying it. */ remove_disk( &roomq, dp ); + chunker_cmd( sched(dp)->dumper->chunker, ABORT, NULL ); dumper_cmd( sched(dp)->dumper, ABORT, NULL ); pending_aborts++; } } -void handle_taper_result() + +static void +handle_taper_result(void *cookie) { disk_t *dp; int filenum; cmd_t cmd; int result_argc; char *result_argv[MAX_ARGS+1]; + int avail_tapes = 0; + + assert(cookie == NULL); + + do { + + short_dump_state(); + + cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1); + + switch(cmd) { + + case PARTIAL: + case DONE: /* DONE