X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=server-src%2Fdriver.c;h=9a0bb27bceba68fa9c80773cb6c6e0cbf4587458;hb=b116e9366c7b2ea2c2eb53b0a13df4090e176235;hp=d7a6fea7a83d9e10bfca84e25697e55ad18daa22;hpb=2627875b7d18858bc1f9f7652811e4d8c15a23eb;p=debian%2Famanda diff --git a/server-src/driver.c b/server-src/driver.c index d7a6fea..9a0bb27 100644 --- a/server-src/driver.c +++ b/server-src/driver.c @@ -35,6 +35,7 @@ */ #include "amanda.h" +#include "find.h" #include "clock.h" #include "conffile.h" #include "diskfile.h" @@ -43,7 +44,6 @@ #include "infofile.h" #include "logfile.h" #include "fsusage.h" -#include "version.h" #include "driverio.h" #include "server_util.h" #include "timestamp.h" @@ -67,7 +67,6 @@ static disklist_t tapeq; // dle on holding disk waiting to be written // to tape static disklist_t roomq; // dle waiting for more space on holding disk static int pending_aborts; -static disk_t *taper_disk; static int degraded_mode; static off_t reserved_space; static off_t total_disksize; @@ -76,9 +75,9 @@ static char *chunker_program; static int inparallel; static int nodump = 0; static off_t tape_length = (off_t)0; -static off_t tape_left = (off_t)0; static int current_tape = 0; static int conf_taperalgo; +static int conf_taper_parallel_write; static int conf_runtapes; static time_t sleep_time; static int idle_reason; @@ -89,6 +88,7 @@ static int need_degraded=0; static holdalloc_t *holdalloc; static int num_holdalloc; static event_handle_t *dumpers_ev_time = NULL; +static event_handle_t *flush_ev_read = NULL; static event_handle_t *schedule_ev_read = NULL; static int conf_flush_threshold_dumped; static int conf_flush_threshold_scheduled; @@ -100,6 +100,10 @@ static int schedule_done; // 1 if we don't wait for a // schedule from the planner static int force_flush; // All dump are terminated, we // must now respect taper_flush +static int taper_nb_scan_volume = 0; +static int nb_sent_new_tape = 0; +static int taper_started = 0; +static taper_t *last_started_taper; static int wait_children(int count); static void wait_for_children(void); @@ -111,7 +115,6 @@ static assignedhd_t **build_diskspace(char *destname); static int client_constrained(disk_t *dp); static void deallocate_bandwidth(netif_t *ip, unsigned long kps); static void dump_schedule(disklist_t *qp, char *str); -static void dump_to_tape(disk_t *dp); static assignedhd_t **find_diskspace(off_t size, int *cur_idle, assignedhd_t *preferred); static unsigned long free_kps(netif_t *ip); @@ -125,26 +128,31 @@ static void handle_dumpers_time(void *); static void handle_taper_result(void *); static void holdingdisk_state(char *time_str); -static dumper_t *idle_dumper(void); +static taper_t *idle_taper(void); +static taper_t *taper_from_name(char *name); static void interface_state(char *time_str); static int queue_length(disklist_t q); -static disklist_t read_flush(void); +static void read_flush(void *cookie); static void read_schedule(void *cookie); static void short_dump_state(void); static void startaflush(void); static void start_degraded_mode(disklist_t *queuep); static void start_some_dumps(disklist_t *rq); static void continue_port_dumps(void); -static void update_failed_dump_to_tape(disk_t *); +static void update_failed_dump(disk_t *); +static int all_taper_idle(void); typedef enum { - TAPE_ACTION_NO_ACTION = 0, - TAPE_ACTION_NEW_TAPE = (1 << 0), - TAPE_ACTION_NO_NEW_TAPE = (1 << 1), - TAPE_ACTION_START_A_FLUSH = (1 << 2) + TAPE_ACTION_NO_ACTION = 0, + TAPE_ACTION_SCAN = (1 << 0), + TAPE_ACTION_NEW_TAPE = (1 << 1), + TAPE_ACTION_NO_NEW_TAPE = (1 << 2), + TAPE_ACTION_START_A_FLUSH = (1 << 3), + TAPE_ACTION_START_A_FLUSH_FIT = (1 << 4), + TAPE_ACTION_MOVE = (1 << 5) } TapeAction; -static TapeAction tape_action(char **why_no_new_tape); +static TapeAction tape_action(taper_t *taper, char **why_no_new_tape); static const char *idle_strings[] = { #define NOT_IDLE 0 @@ -175,10 +183,9 @@ main( char *newdir = NULL; struct fs_usage fsusage; holdingdisk_t *hdp; + identlist_t il; unsigned long reserve = 100; char *conf_diskfile; - cmd_t cmd; - int result_argc; char **result_argv = NULL; char *taper_program; char *conf_tapetype; @@ -186,9 +193,12 @@ main( char *line; char hostname[1025]; intmax_t kb_avail; - config_overwrites_t *cfg_ovr = NULL; + config_overrides_t *cfg_ovr = NULL; char *cfg_opt = NULL; holdalloc_t *ha, *ha_last; + find_result_t *holding_files; + disklist_t holding_disklist = { NULL, NULL }; + int no_taper = FALSE; /* * Configure program for internationalization: @@ -213,20 +223,21 @@ main( /* Don't die when child closes pipe */ signal(SIGPIPE, SIG_IGN); - erroutput_type = (ERR_AMANDALOG|ERR_INTERACTIVE); - set_logerror(logerror); + add_amanda_log_handler(amanda_log_stderr); + add_amanda_log_handler(amanda_log_trace_log); startclock(); - cfg_ovr = extract_commandline_config_overwrites(&argc, &argv); + cfg_ovr = extract_commandline_config_overrides(&argc, &argv); if (argc > 1) cfg_opt = argv[1]; + set_config_overrides(cfg_ovr); config_init(CONFIG_INIT_EXPLICIT_NAME | CONFIG_INIT_USE_CWD, cfg_opt); - apply_config_overwrites(cfg_ovr); conf_diskfile = config_dir_relative(getconf_str(CNF_DISKFILE)); read_diskfile(conf_diskfile, &origq); + disable_skip_disk(&origq); amfree(conf_diskfile); if (config_errors(NULL) >= CFGERR_WARNINGS) { @@ -238,20 +249,35 @@ main( log_add(L_INFO, "%s pid %ld", get_pname(), (long)getpid()); g_printf(_("%s: pid %ld executable %s version %s\n"), - get_pname(), (long) getpid(), argv[0], version()); + get_pname(), (long) getpid(), argv[0], VERSION); if(argc > 2) { - if(strncmp(argv[2], "nodump", 6) == 0) { + if(strcmp(argv[2], "nodump") == 0) { nodump = 1; + argv++; + argc--; } } + if (argc > 2) { + if (strcmp(argv[2], "--no-taper") == 0) { + no_taper = TRUE; + argv++; + argc--; + } + } + safe_cd(); /* do this *after* config_init */ check_running_as(RUNNING_AS_DUMPUSER); dbrename(get_config_name(), DBG_SUBDIR_SERVER); + /* load DLEs from the holding disk, in case there's anything to flush there */ + search_holding_disk(&holding_files, &holding_disklist); + /* note that the dumps are added to the global disklist, so we need not consult + * holding_files or holding_disklist after this */ + amfree(driver_timestamp); /* read timestamp from stdin */ while ((line = agets(stdin)) != NULL) { @@ -292,15 +318,17 @@ main( hd_driver_timestamp = stralloc(driver_timestamp); } - taper_program = vstralloc(amlibexecdir, "/", "taper", versionsuffix(), NULL); - dumper_program = vstralloc(amlibexecdir, "/", "dumper", versionsuffix(), - NULL); - chunker_program = vstralloc(amlibexecdir, "/", "chunker", versionsuffix(), - NULL); + taper_program = vstralloc(amlibexecdir, "/", "taper", NULL); + dumper_program = vstralloc(amlibexecdir, "/", "dumper", NULL); + chunker_program = vstralloc(amlibexecdir, "/", "chunker", NULL); conf_taperalgo = getconf_taperalgo(CNF_TAPERALGO); + conf_taper_parallel_write = getconf_int(CNF_TAPER_PARALLEL_WRITE); conf_tapetype = getconf_str(CNF_TAPETYPE); conf_runtapes = getconf_int(CNF_RUNTAPES); + if (conf_taper_parallel_write > conf_runtapes) { + conf_taper_parallel_write = conf_runtapes; + } tape = lookup_tapetype(conf_tapetype); tape_length = tapetype_get_length(tape); g_printf("driver: tape size %lld\n", (long long)tape_length); @@ -312,8 +340,8 @@ main( flush_threshold_scheduled = (conf_flush_threshold_scheduled * tape_length) / 100; taperflush = (conf_taperflush *tape_length) / 100; - driver_debug(1, _("flush_threshold_dumped: %lld\n"), (long long)flush_threshold_dumped); - driver_debug(1, _("flush_threshold_scheduled: %lld\n"), (long long)flush_threshold_scheduled); + driver_debug(1, _("flush-threshold-dumped: %lld\n"), (long long)flush_threshold_dumped); + driver_debug(1, _("flush-threshold-scheduled: %lld\n"), (long long)flush_threshold_scheduled); driver_debug(1, _("taperflush: %lld\n"), (long long)taperflush); /* set up any configuration-dependent variables */ @@ -325,7 +353,10 @@ main( total_disksize = (off_t)0; ha_last = NULL; num_holdalloc = 0; - for(hdp = getconf_holdingdisks(), dsk = 0; hdp != NULL; hdp = holdingdisk_next(hdp), dsk++) { + for (il = getconf_identlist(CNF_HOLDINGDISK), dsk = 0; + il != NULL; + il = il->next, dsk++) { + hdp = lookup_holdingdisk(il->data); ha = alloc(SIZEOF(holdalloc_t)); num_holdalloc++; @@ -406,10 +437,7 @@ main( /* taper takes a while to get going, so start it up right away */ init_driverio(); - if(conf_runtapes > 0) { - startup_tape_process(taper_program); - taper_cmd(START_TAPER, driver_timestamp, NULL, 0, NULL); - } + startup_tape_process(taper_program, conf_taper_parallel_write, no_taper); /* fire up the dumpers now while we are waiting */ if(!nodump) startup_dump_processes(dumper_program, inparallel, driver_timestamp); @@ -427,10 +455,26 @@ main( directq.head = NULL; directq.tail = NULL; waitq = origq; - taper_state = TAPER_STATE_DEFAULT; - tapeq = read_flush(); + tapeq.head = NULL; + tapeq.tail = NULL; + roomq.head = NULL; + roomq.tail = NULL; + taper_nb_wait_reply = 0; + + need_degraded = 0; + if (no_taper || conf_runtapes <= 0) { + taper_started = 1; /* we'll pretend the taper started and failed immediately */ + need_degraded = 1; + } else { + tapetable[0].state = TAPER_STATE_INIT; + taper_nb_wait_reply++; + taper_nb_scan_volume++; + taper_ev_read = event_register(taper_fd, EV_READFD, + handle_taper_result, NULL); + taper_cmd(START_TAPER, NULL, tapetable[0].name, 0, driver_timestamp); + } - roomq.head = roomq.tail = NULL; + flush_ev_read = event_register((event_id_t)0, EV_READFD, read_flush, NULL); log_add(L_STATS, _("startup time %s"), walltime_str(curclock())); @@ -441,33 +485,9 @@ main( getconf_str(CNF_DUMPORDER)); fflush(stdout); - /* ok, planner is done, now lets see if the tape is ready */ - - if (conf_runtapes > 0) { - cmd = getresult(taper, 1, &result_argc, &result_argv); - if (cmd != TAPER_OK) { - /* no tape, go into degraded mode: dump to holding disk */ - need_degraded = 1; - } - } else { - need_degraded = 1; - } - - tape_left = tape_length; - taper_busy = 0; - amfree(taper_input_error); - amfree(taper_tape_error); - taper_disk = NULL; - taper_ev_read = NULL; - schedule_done = nodump; force_flush = 0; - if(!need_degraded) startaflush(); - - if(!nodump) - schedule_ev_read = event_register((event_id_t)0, EV_READFD, read_schedule, NULL); - short_dump_state(); event_loop(0); @@ -480,7 +500,7 @@ main( } /* handle any remaining dumps by dumping directly to tape, if possible */ - while(!empty(directq) && taper > 0) { + while(!empty(directq) && taper_fd > 0) { time_t sleep_time = 100000000; disk_t *sleep_diskp = NULL; time_t now = time(0); @@ -514,10 +534,12 @@ main( amfree(qname); } else if (!degraded_mode) { - taper_state |= TAPER_STATE_DUMP_TO_TAPE; - dump_to_tape(diskp); - event_loop(0); - taper_state &= ~TAPER_STATE_DUMP_TO_TAPE; + char *qname = quote_string(diskp->name); + log_add(L_FAIL, "%s %s %s %d [%s]", + diskp->host->hostname, qname, sched(diskp)->datestamp, + sched(diskp)->level, + _("can't dump in degraded mode")); + amfree(qname); } else { char *qname = quote_string(diskp->name); @@ -550,7 +572,7 @@ main( } } - if(taper >= 0) { + if(taper_fd >= 0) { taper_cmd(QUIT, NULL, NULL, 0, NULL); } @@ -696,7 +718,7 @@ wait_for_children(void) } } - if(taper_pid > 1 && taper > 0) { + if(taper_pid > 1 && taper_fd > 0) { taper_cmd(QUIT, NULL, NULL, 0, NULL); } @@ -713,43 +735,135 @@ wait_for_children(void) } +static void startaflush_tape(taper_t *taper); + static void startaflush(void) +{ + taper_t *taper; + + for(taper = tapetable; taper <= tapetable+conf_taper_parallel_write; + taper++) { + if (!(taper->state & TAPER_STATE_DONE) && + taper->state & TAPER_STATE_WAIT_FOR_TAPE) { + startaflush_tape(taper); + } + } + for(taper = tapetable; taper <= tapetable+conf_taper_parallel_write; + taper++) { + if (!(taper->state & TAPER_STATE_DONE) && + taper->state & TAPER_STATE_TAPE_REQUESTED) { + startaflush_tape(taper); + } + } + for(taper = tapetable; taper <= tapetable+conf_taper_parallel_write; + taper++) { + if (!(taper->state & TAPER_STATE_DONE) && + taper->state & TAPER_STATE_INIT) { + startaflush_tape(taper); + } + } + for(taper = tapetable; taper <= tapetable+conf_taper_parallel_write; + taper++) { + if (!(taper->state & TAPER_STATE_DONE) && + taper->state & TAPER_STATE_IDLE) { + startaflush_tape(taper); + } + } +} + +static void +startaflush_tape( + taper_t *taper) { disk_t *dp = NULL; disk_t *fit = NULL; char *datestamp; - int extra_tapes = 0; + off_t extra_tapes_size = 0; + off_t taper_left; char *qname; TapeAction result_tape_action; - char *why_no_new_tape; - - result_tape_action = tape_action(&why_no_new_tape); - - if (result_tape_action & TAPE_ACTION_NEW_TAPE) { - taper_state &= ~TAPER_STATE_WAIT_FOR_TAPE; - taper_cmd(NEW_TAPE, NULL, NULL, 0, NULL); + char *why_no_new_tape = NULL; + taper_t *taper1; + + result_tape_action = tape_action(taper, &why_no_new_tape); + + if (result_tape_action & TAPE_ACTION_SCAN) { + taper->state &= ~TAPER_STATE_TAPE_REQUESTED; + taper->state |= TAPER_STATE_WAIT_FOR_TAPE; + taper_nb_scan_volume++; + taper_cmd(START_SCAN, taper->disk, NULL, 0, NULL); + } else if (result_tape_action & TAPE_ACTION_NEW_TAPE) { + taper->state &= ~TAPER_STATE_WAIT_FOR_TAPE; + taper->state |= TAPER_STATE_WAIT_NEW_TAPE; + nb_sent_new_tape++; + taper_cmd(NEW_TAPE, taper->disk, NULL, 0, NULL); } else if (result_tape_action & TAPE_ACTION_NO_NEW_TAPE) { - taper_state &= ~TAPER_STATE_WAIT_FOR_TAPE; - taper_cmd(NO_NEW_TAPE, why_no_new_tape, NULL, 0, NULL); + taper->state &= ~TAPER_STATE_WAIT_FOR_TAPE; + taper_cmd(NO_NEW_TAPE, taper->disk, why_no_new_tape, 0, NULL); + taper->state |= TAPER_STATE_DONE; start_degraded_mode(&runq); + } else if (result_tape_action & TAPE_ACTION_MOVE) { + taper_t *taper1 = idle_taper(); + if (taper1) { + taper->state &= ~TAPER_STATE_TAPE_REQUESTED; + taper->state &= ~TAPER_STATE_WAIT_FOR_TAPE; + taper_cmd(TAKE_SCRIBE_FROM, taper->disk, taper1->name, 0 , NULL); + taper1->state = TAPER_STATE_DEFAULT; + taper->state |= TAPER_STATE_TAPE_STARTED; + taper->left = taper1->left; + if (last_started_taper == taper1) { + last_started_taper = taper; + } + } } - if (!degraded_mode && !taper_busy && !empty(tapeq) && - (result_tape_action & TAPE_ACTION_START_A_FLUSH)) { - + if (!degraded_mode && + taper->state & TAPER_STATE_IDLE && + !empty(tapeq) && + (result_tape_action & TAPE_ACTION_START_A_FLUSH || + result_tape_action & TAPE_ACTION_START_A_FLUSH_FIT)) { + + int taperalgo = conf_taperalgo; + if (result_tape_action & TAPE_ACTION_START_A_FLUSH_FIT) { + if (taperalgo == ALGO_FIRST) + taperalgo = ALGO_FIRSTFIT; + else if (taperalgo == ALGO_LARGEST) + taperalgo = ALGO_LARGESTFIT; + else if (taperalgo == ALGO_SMALLEST) + taperalgo = ALGO_SMALLESTFIT; + else if (taperalgo == ALGO_LAST) + taperalgo = ALGO_LASTFIT; + } + + extra_tapes_size = tape_length * (off_t)(conf_runtapes - current_tape); + for (taper1 = tapetable; taper1 < tapetable + conf_taper_parallel_write; + taper1++) { + if (taper1->state & TAPER_STATE_TAPE_STARTED) { + extra_tapes_size += taper1->left; + } + dp = taper1->disk; + if (dp) { + extra_tapes_size -= (sched(dp)->act_size - taper1->written); + } + } + + if (taper->state & TAPER_STATE_TAPE_STARTED) { + taper_left = taper->left; + } else { + taper_left = tape_length; + } + dp = NULL; datestamp = sched(tapeq.head)->datestamp; - switch(conf_taperalgo) { + switch(taperalgo) { case ALGO_FIRST: dp = dequeue_disk(&tapeq); break; case ALGO_FIRSTFIT: fit = tapeq.head; while (fit != NULL) { - extra_tapes = (fit->tape_splitsize > (off_t)0) ? - conf_runtapes - current_tape : 0; - if(sched(fit)->act_size <= (tape_left + - tape_length * (off_t)extra_tapes) && + if (sched(fit)->act_size <= + (fit->splitsize ? extra_tapes_size : taper_left) && strcmp(sched(fit)->datestamp, datestamp) <= 0) { dp = fit; fit = NULL; @@ -774,10 +888,8 @@ startaflush(void) case ALGO_LARGESTFIT: fit = tapeq.head; while (fit != NULL) { - extra_tapes = (fit->tape_splitsize > (off_t)0) ? - conf_runtapes - current_tape : 0; if(sched(fit)->act_size <= - (tape_left + tape_length * (off_t)extra_tapes) && + (fit->splitsize ? extra_tapes_size : taper_left) && (!dp || sched(fit)->act_size > sched(dp)->act_size) && strcmp(sched(fit)->datestamp, datestamp) <= 0) { dp = fit; @@ -787,62 +899,75 @@ startaflush(void) if(dp) remove_disk(&tapeq, dp); break; case ALGO_SMALLEST: + fit = dp = tapeq.head; + while (fit != NULL) { + if (sched(fit)->act_size < sched(dp)->act_size && + strcmp(sched(fit)->datestamp, datestamp) <= 0) { + dp = fit; + } + fit = fit->next; + } + if(dp) remove_disk(&tapeq, dp); + break; + case ALGO_SMALLESTFIT: + fit = dp = tapeq.head; + while (fit != NULL) { + if (sched(fit)->act_size <= + (fit->splitsize ? extra_tapes_size : taper_left) && + (!dp || sched(fit)->act_size < sched(dp)->act_size) && + strcmp(sched(fit)->datestamp, datestamp) <= 0) { + dp = fit; + } + fit = fit->next; + } + if(dp) remove_disk(&tapeq, dp); break; case ALGO_LAST: dp = tapeq.tail; remove_disk(&tapeq, dp); break; - } - if(!dp) { /* ALGO_SMALLEST, or default if nothing fit. */ - if(conf_taperalgo != ALGO_SMALLEST) { - g_fprintf(stderr, - _("driver: startaflush: Using SMALLEST because nothing fit\n")); - } - fit = dp = tapeq.head; - while (fit != NULL) { - if(sched(fit)->act_size < sched(dp)->act_size && - strcmp(sched(fit)->datestamp, datestamp) <= 0) { - dp = fit; + case ALGO_LASTFIT: + fit = tapeq.tail; + while (fit != NULL) { + if (sched(fit)->act_size <= + (fit->splitsize ? extra_tapes_size : taper_left) && + (!dp || sched(fit)->act_size < sched(dp)->act_size) && + strcmp(sched(fit)->datestamp, datestamp) <= 0) { + dp = fit; + } + fit = fit->prev; } - fit = fit->next; - } - if(dp) remove_disk(&tapeq, dp); - } - if(taper_ev_read == NULL) { - taper_ev_read = event_register((event_id_t)taper, EV_READFD, - handle_taper_result, NULL); + if(dp) remove_disk(&tapeq, dp); + break; } if (dp) { - taper_disk = dp; - taper_busy = 1; - amfree(taper_input_error); - amfree(taper_tape_error); - taper_result = LAST_TOK; - taper_sendresult = 0; - taper_first_label = NULL; - taper_written = 0; - taper_state &= ~TAPER_STATE_DUMP_TO_TAPE; - taper_dumper = NULL; + taper->disk = dp; + taper->dumper = NULL; + amfree(taper->input_error); + amfree(taper->tape_error); + taper->result = LAST_TOK; + taper->sendresult = 0; + amfree(taper->first_label); + taper->written = 0; + taper->state &= ~TAPER_STATE_IDLE; + taper->state |= TAPER_STATE_FILE_TO_TAPE; + taper->dumper = NULL; qname = quote_string(dp->name); + if (taper_nb_wait_reply == 0) { + taper_ev_read = event_register(taper_fd, EV_READFD, + handle_taper_result, NULL); + } + taper_nb_wait_reply++; + sched(dp)->taper = taper; taper_cmd(FILE_WRITE, dp, sched(dp)->destname, sched(dp)->level, sched(dp)->datestamp); g_fprintf(stderr,_("driver: startaflush: %s %s %s %lld %lld\n"), - taperalgo2str(conf_taperalgo), dp->host->hostname, qname, - (long long)sched(taper_disk)->act_size, - (long long)tape_left); - if(sched(dp)->act_size <= tape_left) - tape_left -= sched(dp)->act_size; - else - tape_left = (off_t)0; + taperalgo2str(taperalgo), dp->host->hostname, qname, + (long long)sched(taper->disk)->act_size, + (long long)taper->left); amfree(qname); - } else { - error(_("FATAL: Taper marked busy and no work found.")); - /*NOTREACHED*/ } short_dump_state(); - } else if(!taper_busy && taper_ev_read != NULL) { - event_release(taper_ev_read); - taper_ev_read = NULL; } } @@ -872,23 +997,118 @@ client_constrained( return 0; } +static void +allow_dump_dle( + disk_t *diskp, + taper_t *taper, + char dumptype, + disklist_t *rq, + const time_t now, + int dumper_to_holding, + int *cur_idle, + disk_t **delayed_diskp, + disk_t **diskp_accept, + assignedhd_t ***holdp_accept, + off_t extra_tapes_size) +{ + assignedhd_t **holdp=NULL; + + if (diskp->host->start_t > now) { + *cur_idle = max(*cur_idle, IDLE_START_WAIT); + if (*delayed_diskp == NULL || sleep_time > diskp->host->start_t) { + *delayed_diskp = diskp; + sleep_time = diskp->host->start_t; + } + } else if(diskp->start_t > now) { + *cur_idle = max(*cur_idle, IDLE_START_WAIT); + if (*delayed_diskp == NULL || sleep_time > diskp->start_t) { + *delayed_diskp = diskp; + sleep_time = diskp->start_t; + } + } else if (diskp->host->netif->curusage > 0 && + sched(diskp)->est_kps > free_kps(diskp->host->netif)) { + *cur_idle = max(*cur_idle, IDLE_NO_BANDWIDTH); + } else if (!taper && sched(diskp)->no_space) { + *cur_idle = max(*cur_idle, IDLE_NO_DISKSPACE); + } else if (!taper && diskp->to_holdingdisk == HOLD_NEVER) { + *cur_idle = max(*cur_idle, IDLE_NO_HOLD); + } else if (extra_tapes_size && sched(diskp)->est_size > extra_tapes_size) { + *cur_idle = max(*cur_idle, IDLE_NO_DISKSPACE); + /* no tape space */ + } else if (!taper && (holdp = + find_diskspace(sched(diskp)->est_size, cur_idle, NULL)) == NULL) { + *cur_idle = max(*cur_idle, IDLE_NO_DISKSPACE); + if (empty(tapeq) && dumper_to_holding == 0 && rq != &directq) { + remove_disk(rq, diskp); + if (diskp->to_holdingdisk != HOLD_REQUIRED) { + enqueue_disk(&directq, diskp); + diskp->to_holdingdisk = HOLD_NEVER; + } + } + } else if (client_constrained(diskp)) { + free_assignedhd(holdp); + *cur_idle = max(*cur_idle, IDLE_CLIENT_CONSTRAINED); + } else { + + /* disk fits, dump it */ + int accept = !*diskp_accept; + if(!accept) { + switch(dumptype) { + case 's': accept = (sched(diskp)->est_size < sched(*diskp_accept)->est_size); + break; + case 'S': accept = (sched(diskp)->est_size > sched(*diskp_accept)->est_size); + break; + case 't': accept = (sched(diskp)->est_time < sched(*diskp_accept)->est_time); + break; + case 'T': accept = (sched(diskp)->est_time > sched(*diskp_accept)->est_time); + break; + case 'b': accept = (sched(diskp)->est_kps < sched(*diskp_accept)->est_kps); + break; + case 'B': accept = (sched(diskp)->est_kps > sched(*diskp_accept)->est_kps); + break; + default: log_add(L_WARNING, _("Unknown dumporder character \'%c\', using 's'.\n"), + dumptype); + accept = (sched(diskp)->est_size < sched(*diskp_accept)->est_size); + break; + } + } + if(accept) { + if( !*diskp_accept || !degraded_mode || diskp->priority >= (*diskp_accept)->priority) { + if(*holdp_accept) free_assignedhd(*holdp_accept); + *diskp_accept = diskp; + *holdp_accept = holdp; + } + else { + free_assignedhd(holdp); + } + } + else { + free_assignedhd(holdp); + } + } +} + static void start_some_dumps( - disklist_t * rq) + disklist_t *rq) { + const time_t now = time(NULL); int cur_idle; disk_t *diskp, *delayed_diskp, *diskp_accept; disk_t *dp; assignedhd_t **holdp=NULL, **holdp_accept; - const time_t now = time(NULL); cmd_t cmd; int result_argc; char **result_argv; chunker_t *chunker; dumper_t *dumper; + taper_t *taper; char dumptype; char *dumporder; - int busy_dumpers = 0; + int dumper_to_holding = 0; + + /* don't start any actual dumps until the taper is started */ + if (!taper_started) return; idle_reason = IDLE_NO_DUMPERS; sleep_time = 0; @@ -899,11 +1119,10 @@ start_some_dumps( } for(dumper = dmptable; dumper < (dmptable+inparallel); dumper++) { - if( dumper->busy ) { - busy_dumpers++; + if (dumper->busy && dumper->dp->to_holdingdisk != HOLD_NEVER) { + dumper_to_holding++; } } - for (dumper = dmptable; dumper < dmptable+inparallel; dumper++) { if( dumper->busy || dumper->down) { @@ -951,82 +1170,72 @@ start_some_dumps( dumptype = 'T'; } - for(diskp = rq->head; diskp != NULL; diskp = diskp->next) { - assert(diskp->host != NULL && sched(diskp) != NULL); - - if (diskp->host->start_t > now) { - cur_idle = max(cur_idle, IDLE_START_WAIT); - if (delayed_diskp == NULL || sleep_time > diskp->host->start_t) { - delayed_diskp = diskp; - sleep_time = diskp->host->start_t; - } - } else if(diskp->start_t > now) { - cur_idle = max(cur_idle, IDLE_START_WAIT); - if (delayed_diskp == NULL || sleep_time > diskp->start_t) { - delayed_diskp = diskp; - sleep_time = diskp->start_t; - } - } else if (diskp->host->netif->curusage > 0 && - sched(diskp)->est_kps > free_kps(diskp->host->netif)) { - cur_idle = max(cur_idle, IDLE_NO_BANDWIDTH); - } else if(sched(diskp)->no_space) { - cur_idle = max(cur_idle, IDLE_NO_DISKSPACE); - } else if (diskp->to_holdingdisk == HOLD_NEVER) { - cur_idle = max(cur_idle, IDLE_NO_HOLD); - } else if ((holdp = - find_diskspace(sched(diskp)->est_size, &cur_idle, NULL)) == NULL) { - cur_idle = max(cur_idle, IDLE_NO_DISKSPACE); - if (empty(tapeq) && busy_dumpers == 0) { - remove_disk(rq, diskp); - enqueue_disk(&directq, diskp); - } - } else if (client_constrained(diskp)) { - free_assignedhd(holdp); - cur_idle = max(cur_idle, IDLE_CLIENT_CONSTRAINED); - } else { - - /* disk fits, dump it */ - int accept = !diskp_accept; - if(!accept) { - switch(dumptype) { - case 's': accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size); - break; - case 'S': accept = (sched(diskp)->est_size > sched(diskp_accept)->est_size); - break; - case 't': accept = (sched(diskp)->est_time < sched(diskp_accept)->est_time); - break; - case 'T': accept = (sched(diskp)->est_time > sched(diskp_accept)->est_time); - break; - case 'b': accept = (sched(diskp)->est_kps < sched(diskp_accept)->est_kps); - break; - case 'B': accept = (sched(diskp)->est_kps > sched(diskp_accept)->est_kps); - break; - default: log_add(L_WARNING, _("Unknown dumporder character \'%c\', using 's'.\n"), - dumptype); - accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size); - break; + diskp = NULL; + taper = NULL; + if (!empty(directq)) { + taper = idle_taper(); + if (taper) { + TapeAction result_tape_action; + char *why_no_new_tape = NULL; + result_tape_action = tape_action(taper, &why_no_new_tape); + if (result_tape_action & TAPE_ACTION_START_A_FLUSH || + result_tape_action & TAPE_ACTION_START_A_FLUSH_FIT) { + off_t extra_tapes_size = 0; + taper_t *taper1; + + if (result_tape_action & TAPE_ACTION_START_A_FLUSH_FIT) { + extra_tapes_size = tape_length * + (off_t)(conf_runtapes - current_tape); + for (taper1 = tapetable; + taper1 < tapetable + conf_taper_parallel_write; + taper1++) { + if (taper1->state & TAPER_STATE_TAPE_STARTED) { + extra_tapes_size += taper1->left; + } + dp = taper1->disk; + if (dp) { + extra_tapes_size -= (sched(dp)->est_size - + taper1->written); + } + } } - } - if(accept) { - if( !diskp_accept || !degraded_mode || diskp->priority >= diskp_accept->priority) { - if(holdp_accept) free_assignedhd(holdp_accept); - diskp_accept = diskp; - holdp_accept = holdp; + + for (diskp = directq.head; diskp != NULL; + diskp = diskp->next) { + allow_dump_dle(diskp, taper, dumptype, &directq, now, + dumper_to_holding, &cur_idle, + &delayed_diskp, &diskp_accept, + &holdp_accept, extra_tapes_size); } - else { - free_assignedhd(holdp); + if (diskp_accept) { + diskp = diskp_accept; + holdp = holdp_accept; + } else { + taper = NULL; } - } - else { - free_assignedhd(holdp); + } else { + taper = NULL; } } } - diskp = diskp_accept; - holdp = holdp_accept; + if (diskp == NULL) { + for(diskp = rq->head; diskp != NULL; diskp = diskp->next) { + assert(diskp->host != NULL && sched(diskp) != NULL); + + allow_dump_dle(diskp, NULL, dumptype, rq, now, + dumper_to_holding, &cur_idle, &delayed_diskp, + &diskp_accept, &holdp_accept, 0); + } + diskp = diskp_accept; + holdp = holdp_accept; + } idle_reason = max(idle_reason, cur_idle); + if (diskp == NULL && idle_reason == IDLE_NO_DISKSPACE) { + /* continue flush waiting for new tape */ + startaflush(); + } /* * If we have no disk at this point, and there are disks that @@ -1039,7 +1248,7 @@ start_some_dumps( dumpers_ev_time = event_register((event_id_t)sleep_time, EV_TIME, handle_dumpers_time, &runq); return; - } else if (diskp != NULL) { + } else if (diskp != NULL && taper == NULL) { sched(diskp)->act_size = (off_t)0; allocate_bandwidth(diskp->host->netif, sched(diskp)->est_kps); sched(diskp)->activehd = assign_holdingdisk(holdp, diskp); @@ -1050,6 +1259,7 @@ start_some_dumps( diskp->inprogress = 1; sched(diskp)->dumper = dumper; sched(diskp)->timestamp = now; + amfree(diskp->dataport_list); dumper->busy = 1; /* dumper is now busy */ dumper->dp = diskp; /* link disk to dumper */ @@ -1059,7 +1269,7 @@ start_some_dumps( sched(diskp)->dumpsize = (off_t)-1; sched(diskp)->dumptime = (time_t)0; sched(diskp)->tapetime = (time_t)0; - chunker = dumper->chunker; + chunker = dumper->chunker = &chktable[dumper - dmptable]; chunker->result = LAST_TOK; dumper->result = LAST_TOK; startup_chunk_process(chunker,chunker_program); @@ -1100,6 +1310,8 @@ start_some_dumps( chunker->ev_read = event_register((event_id_t)chunker->fd, EV_READFD, handle_chunker_result, chunker); dumper->output_port = atoi(result_argv[1]); + amfree(diskp->dataport_list); + diskp->dataport_list = stralloc(result_argv[2]); if (diskp->host->pre_script == 0) { for (dp=diskp->host->disks; dp != NULL; dp = dp->hostnext) { @@ -1118,6 +1330,46 @@ start_some_dumps( if (result_argv) g_strfreev(result_argv); short_dump_state(); + } else if (diskp != NULL && taper != NULL) { /* dump to tape */ + sched(diskp)->act_size = (off_t)0; + allocate_bandwidth(diskp->host->netif, sched(diskp)->est_kps); + diskp->host->inprogress++; /* host is now busy */ + diskp->inprogress = 1; + sched(diskp)->dumper = dumper; + sched(diskp)->taper = taper; + sched(diskp)->timestamp = now; + dumper->chunker = NULL; + amfree(diskp->dataport_list); + + dumper->busy = 1; /* dumper is now busy */ + dumper->dp = diskp; /* link disk to dumper */ + remove_disk(&directq, diskp); /* take it off the direct queue */ + + sched(diskp)->origsize = (off_t)-1; + sched(diskp)->dumpsize = (off_t)-1; + sched(diskp)->dumptime = (time_t)0; + sched(diskp)->tapetime = (time_t)0; + dumper->result = LAST_TOK; + taper->result = LAST_TOK; + taper->input_error = NULL; + taper->tape_error = NULL; + taper->disk = diskp; + taper->first_label = NULL; + taper->written = 0; + taper->dumper = dumper; + taper->state |= TAPER_STATE_DUMP_TO_TAPE; + taper->state &= ~TAPER_STATE_IDLE; + if (taper_nb_wait_reply == 0) { + taper_ev_read = event_register(taper_fd, EV_READFD, + handle_taper_result, NULL); + } + + taper_nb_wait_reply++; + taper_cmd(PORT_WRITE, diskp, NULL, sched(diskp)->level, + sched(diskp)->datestamp); + diskp->host->start_t = now + 15; + + short_dump_state(); } } } @@ -1247,15 +1499,11 @@ continue_port_dumps(void) * a) diskspace has been allocated for other dumps which are * still running or already being written to tape * b) all other dumps have been suspended due to lack of diskspace - * c) this dump doesn't fit on all the holding disks * Case a) is not a problem. We just wait for the diskspace to * be freed by moving the current disk to a queue. * If case b) occurs, we have a deadlock situation. We select * a dump from the queue to be aborted and abort it. It will - * be retried later dumping to disk. - * If case c) is detected, the dump is aborted. Next time - * it will be dumped directly to tape. Actually, case c is a special - * manifestation of case b) where only one dumper is busy. + * be retried directly to tape. */ for(dp=NULL, dumper = dmptable; dumper < (dmptable+inparallel); dumper++) { if( dumper->busy ) { @@ -1269,12 +1517,9 @@ continue_port_dumps(void) } } if((dp != NULL) && (active_dumpers == 0) && (busy_dumpers > 0) && - ((!taper_busy && empty(tapeq)) || degraded_mode) && - pending_aborts == 0 ) { /* not case a */ - if( busy_dumpers == 1 ) { /* case c */ - sched(dp)->no_space = 1; - } - /* case b */ + ((all_taper_idle() && empty(tapeq)) || degraded_mode) && + pending_aborts == 0 ) { /* case b */ + sched(dp)->no_space = 1; /* At this time, dp points to the dump with the smallest est_size. * We abort that dump, hopefully not wasting too much time retrying it. */ @@ -1290,81 +1535,119 @@ static void handle_taper_result( void *cookie G_GNUC_UNUSED) { - disk_t *dp; + disk_t *dp = NULL, *dp1; + dumper_t *dumper; cmd_t cmd; int result_argc; char **result_argv; char *qname, *q; char *s; + taper_t *taper = NULL; + taper_t *taper1; + int i; + off_t partsize; assert(cookie == NULL); - amfree(taper_input_error); - amfree(taper_tape_error); - + do { - + short_dump_state(); - - cmd = getresult(taper, 1, &result_argc, &result_argv); - + taper = NULL; + + cmd = getresult(taper_fd, 1, &result_argc, &result_argv); + switch(cmd) { - + + case TAPER_OK: + if(result_argc != 2) { + error(_("error: [taper FAILED result_argc != 2: %d"), result_argc); + /*NOTREACHED*/ + } + + taper = NULL; + taper_started = 1; + for (i=0; i < conf_taper_parallel_write; i++) { + if (strcmp(tapetable[i].name, result_argv[1]) == 0) { + taper= &tapetable[i]; + } + } + assert(taper != NULL); + taper->left = 0; + taper->state &= ~TAPER_STATE_INIT; + taper->state |= TAPER_STATE_RESERVATION; + taper->state |= TAPER_STATE_IDLE; + amfree(taper->first_label); + taper_nb_wait_reply--; + taper_nb_scan_volume--; + last_started_taper = taper; + if (taper_nb_wait_reply == 0) { + event_release(taper_ev_read); + taper_ev_read = NULL; + } + start_some_dumps(&runq); + startaflush(); + break; + case FAILED: /* FAILED INPUT-* TAPE-* */ if(result_argc != 6) { error(_("error: [taper FAILED result_argc != 6: %d"), result_argc); /*NOTREACHED*/ } - + dp = serial2disk(result_argv[1]); - assert(dp == taper_disk); - if (!taper_dumper) + taper = sched(dp)->taper; + assert(dp == taper->disk); + if (!taper->dumper) free_serial(result_argv[1]); - + qname = quote_string(dp->name); g_printf(_("driver: finished-cmd time %s taper wrote %s:%s\n"), walltime_str(curclock()), dp->host->hostname, qname); fflush(stdout); if (strcmp(result_argv[2], "INPUT-ERROR") == 0) { - taper_input_error = newstralloc(taper_input_error, result_argv[4]); + taper->input_error = newstralloc(taper->input_error, result_argv[4]); } else if (strcmp(result_argv[2], "INPUT-GOOD") != 0) { - taper_tape_error = newstralloc(taper_tape_error, + taper->tape_error = newstralloc(taper->tape_error, _("Taper protocol error")); - taper_result = FAILED; + taper->result = FAILED; log_add(L_FAIL, _("%s %s %s %d [%s]"), dp->host->hostname, qname, sched(dp)->datestamp, - sched(dp)->level, taper_tape_error); + sched(dp)->level, taper->tape_error); amfree(qname); break; } if (strcmp(result_argv[3], "TAPE-ERROR") == 0) { - taper_tape_error = newstralloc(taper_tape_error, result_argv[5]); + taper->state &= ~TAPER_STATE_TAPE_STARTED; + taper->tape_error = newstralloc(taper->tape_error, result_argv[5]); } else if (strcmp(result_argv[3], "TAPE-GOOD") != 0) { - taper_tape_error = newstralloc(taper_tape_error, + taper->state &= ~TAPER_STATE_TAPE_STARTED; + taper->tape_error = newstralloc(taper->tape_error, _("Taper protocol error")); - taper_result = FAILED; + taper->result = FAILED; log_add(L_FAIL, _("%s %s %s %d [%s]"), dp->host->hostname, qname, sched(dp)->datestamp, - sched(dp)->level, taper_tape_error); + sched(dp)->level, taper->tape_error); amfree(qname); break; } amfree(qname); - taper_result = cmd; + taper->result = cmd; break; - + case PARTIAL: /* PARTIAL INPUT-* TAPE-* */ case DONE: /* DONE INPUT-GOOD TAPE-GOOD */ if(result_argc != 7) { error(_("error: [taper PARTIAL result_argc != 7: %d"), result_argc); /*NOTREACHED*/ } - + dp = serial2disk(result_argv[1]); - assert(dp == taper_disk); - if (!taper_dumper) + taper = sched(dp)->taper; + assert(dp == taper->disk); + if (!taper->dumper) free_serial(result_argv[1]); qname = quote_string(dp->name); @@ -1373,26 +1656,28 @@ handle_taper_result( fflush(stdout); if (strcmp(result_argv[2], "INPUT-ERROR") == 0) { - taper_input_error = newstralloc(taper_input_error, result_argv[5]); + taper->input_error = newstralloc(taper->input_error, result_argv[5]); } else if (strcmp(result_argv[2], "INPUT-GOOD") != 0) { - taper_tape_error = newstralloc(taper_tape_error, + taper->tape_error = newstralloc(taper->tape_error, _("Taper protocol error")); - taper_result = FAILED; + taper->result = FAILED; log_add(L_FAIL, _("%s %s %s %d [%s]"), dp->host->hostname, qname, sched(dp)->datestamp, - sched(dp)->level, taper_tape_error); + sched(dp)->level, taper->tape_error); amfree(qname); break; } if (strcmp(result_argv[3], "TAPE-ERROR") == 0) { - taper_tape_error = newstralloc(taper_tape_error, result_argv[6]); + taper->state &= ~TAPER_STATE_TAPE_STARTED; + taper->tape_error = newstralloc(taper->tape_error, result_argv[6]); } else if (strcmp(result_argv[3], "TAPE-GOOD") != 0) { - taper_tape_error = newstralloc(taper_tape_error, + taper->state &= ~TAPER_STATE_TAPE_STARTED; + taper->tape_error = newstralloc(taper->tape_error, _("Taper protocol error")); - taper_result = FAILED; + taper->result = FAILED; log_add(L_FAIL, _("%s %s %s %d [%s]"), dp->host->hostname, qname, sched(dp)->datestamp, - sched(dp)->level, taper_tape_error); + sched(dp)->level, taper->tape_error); amfree(qname); break; } @@ -1403,27 +1688,37 @@ handle_taper_result( sched(dp)->dumpsize = atol(s); } - taper_result = cmd; + taper->result = cmd; amfree(qname); break; - + case PARTDONE: /* PARTDONE