X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=server-src%2Fdriver.c;h=9a0bb27bceba68fa9c80773cb6c6e0cbf4587458;hb=b116e9366c7b2ea2c2eb53b0a13df4090e176235;hp=248eb218c9aa9e253b2f35ec3f599b88728dcc0f;hpb=0de2ad0a86685398621fb8ffa6990c029681bb3a;p=debian%2Famanda diff --git a/server-src/driver.c b/server-src/driver.c index 248eb21..9a0bb27 100644 --- a/server-src/driver.c +++ b/server-src/driver.c @@ -1,6 +1,6 @@ /* * Amanda, The Advanced Maryland Automatic Network Disk Archiver - * Copyright (c) 1991-2000 University of Maryland at College Park + * Copyright (c) 1991-1998 University of Maryland at College Park * All Rights Reserved. * * Permission to use, copy, modify, distribute, and sell this software and its @@ -24,7 +24,7 @@ * file named AUTHORS, in the root directory of this distribution. */ /* - * $Id: driver.c,v 1.58.2.31.2.8.2.20.2.14 2005/02/09 18:12:31 martinea Exp $ + * $Id: driver.c 6512 2007-05-24 17:00:24Z ian $ * * controlling process for the Amanda backup system */ @@ -35,273 +35,412 @@ */ #include "amanda.h" +#include "find.h" #include "clock.h" #include "conffile.h" #include "diskfile.h" +#include "event.h" #include "holding.h" #include "infofile.h" #include "logfile.h" -#include "statfs.h" -#include "version.h" +#include "fsusage.h" #include "driverio.h" #include "server_util.h" - -disklist_t waitq, runq, tapeq, roomq; -int pending_aborts, inside_dump_to_tape; -disk_t *taper_disk; -int degraded_mode; -unsigned long reserved_space; -unsigned long total_disksize; -char *dumper_program; -int inparallel; -int nodump = 0; -long tape_length, tape_left = 0; -int conf_taperalgo; -am_host_t *flushhost = NULL; - -int client_constrained P((disk_t *dp)); -int sort_by_priority_reversed P((disk_t *a, disk_t *b)); -int sort_by_time P((disk_t *a, disk_t *b)); -int start_some_dumps P((disklist_t *rq)); -void dump_schedule P((disklist_t *qp, char *str)); -void start_degraded_mode P((disklist_t *queuep)); -void handle_taper_result P((void)); -dumper_t *idle_dumper P((void)); -int some_dumps_in_progress P((void)); -int num_busy_dumpers P((void)); -dumper_t *lookup_dumper P((int fd)); -void handle_dumper_result P((int fd)); -void read_flush P((disklist_t *tapeqp)); -void read_schedule P((disklist_t *waitqp, disklist_t *runqp)); -int free_kps P((interface_t *ip)); -void interface_state P((char *time_str)); -void allocate_bandwidth P((interface_t *ip, int kps)); -void deallocate_bandwidth P((interface_t *ip, int kps)); -unsigned long free_space P((void)); -assignedhd_t **find_diskspace P((unsigned long size, int *cur_idle, assignedhd_t *preferred)); -char *diskname2filename P((char *dname)); -int assign_holdingdisk P((assignedhd_t **holdp, disk_t *diskp)); -static void adjust_diskspace P((disk_t *diskp, cmd_t cmd)); -static void delete_diskspace P((disk_t *diskp)); -assignedhd_t **build_diskspace P((char *destname)); -void holdingdisk_state P((char *time_str)); -int dump_to_tape P((disk_t *dp)); -int queue_length P((disklist_t q)); -void short_dump_state P((void)); -void dump_state P((char *str)); -void startaflush P((void)); -int main P((int main_argc, char **main_argv)); - +#include "timestamp.h" + +#define driver_debug(i, ...) do { \ + if ((i) <= debug_driver) { \ + dbprintf(__VA_ARGS__); \ + } \ +} while (0) + +#define hold_debug(i, ...) do { \ + if ((i) <= debug_holding) { \ + dbprintf(__VA_ARGS__); \ + } \ +} while (0) + +static disklist_t waitq; // dle waiting estimate result +static disklist_t runq; // dle waiting to be dumped to holding disk +static disklist_t directq; // dle waiting to be dumped directly to tape +static disklist_t tapeq; // dle on holding disk waiting to be written + // to tape +static disklist_t roomq; // dle waiting for more space on holding disk +static int pending_aborts; +static int degraded_mode; +static off_t reserved_space; +static off_t total_disksize; +static char *dumper_program; +static char *chunker_program; +static int inparallel; +static int nodump = 0; +static off_t tape_length = (off_t)0; +static int current_tape = 0; +static int conf_taperalgo; +static int conf_taper_parallel_write; +static int conf_runtapes; +static time_t sleep_time; static int idle_reason; -char *datestamp; -char *timestamp; - -char *idle_strings[] = { +static char *driver_timestamp; +static char *hd_driver_timestamp; +static am_host_t *flushhost = NULL; +static int need_degraded=0; +static holdalloc_t *holdalloc; +static int num_holdalloc; +static event_handle_t *dumpers_ev_time = NULL; +static event_handle_t *flush_ev_read = NULL; +static event_handle_t *schedule_ev_read = NULL; +static int conf_flush_threshold_dumped; +static int conf_flush_threshold_scheduled; +static int conf_taperflush; +static off_t flush_threshold_dumped; +static off_t flush_threshold_scheduled; +static off_t taperflush; +static int schedule_done; // 1 if we don't wait for a + // schedule from the planner +static int force_flush; // All dump are terminated, we + // must now respect taper_flush +static int taper_nb_scan_volume = 0; +static int nb_sent_new_tape = 0; +static int taper_started = 0; +static taper_t *last_started_taper; + +static int wait_children(int count); +static void wait_for_children(void); +static void allocate_bandwidth(netif_t *ip, unsigned long kps); +static int assign_holdingdisk(assignedhd_t **holdp, disk_t *diskp); +static void adjust_diskspace(disk_t *diskp, cmd_t cmd); +static void delete_diskspace(disk_t *diskp); +static assignedhd_t **build_diskspace(char *destname); +static int client_constrained(disk_t *dp); +static void deallocate_bandwidth(netif_t *ip, unsigned long kps); +static void dump_schedule(disklist_t *qp, char *str); +static assignedhd_t **find_diskspace(off_t size, int *cur_idle, + assignedhd_t *preferred); +static unsigned long free_kps(netif_t *ip); +static off_t free_space(void); +static void dumper_chunker_result(disk_t *dp); +static void dumper_taper_result(disk_t *dp); +static void file_taper_result(disk_t *dp); +static void handle_dumper_result(void *); +static void handle_chunker_result(void *); +static void handle_dumpers_time(void *); +static void handle_taper_result(void *); + +static void holdingdisk_state(char *time_str); +static taper_t *idle_taper(void); +static taper_t *taper_from_name(char *name); +static void interface_state(char *time_str); +static int queue_length(disklist_t q); +static void read_flush(void *cookie); +static void read_schedule(void *cookie); +static void short_dump_state(void); +static void startaflush(void); +static void start_degraded_mode(disklist_t *queuep); +static void start_some_dumps(disklist_t *rq); +static void continue_port_dumps(void); +static void update_failed_dump(disk_t *); +static int all_taper_idle(void); + +typedef enum { + TAPE_ACTION_NO_ACTION = 0, + TAPE_ACTION_SCAN = (1 << 0), + TAPE_ACTION_NEW_TAPE = (1 << 1), + TAPE_ACTION_NO_NEW_TAPE = (1 << 2), + TAPE_ACTION_START_A_FLUSH = (1 << 3), + TAPE_ACTION_START_A_FLUSH_FIT = (1 << 4), + TAPE_ACTION_MOVE = (1 << 5) +} TapeAction; + +static TapeAction tape_action(taper_t *taper, char **why_no_new_tape); + +static const char *idle_strings[] = { #define NOT_IDLE 0 - "not-idle", -#define IDLE_START_WAIT 1 - "start-wait", -#define IDLE_NO_DUMPERS 2 - "no-dumpers", + T_("not-idle"), +#define IDLE_NO_DUMPERS 1 + T_("no-dumpers"), +#define IDLE_START_WAIT 2 + T_("start-wait"), #define IDLE_NO_HOLD 3 - "no-hold", + T_("no-hold"), #define IDLE_CLIENT_CONSTRAINED 4 - "client-constrained", -#define IDLE_NO_DISKSPACE 5 - "no-diskspace", -#define IDLE_TOO_LARGE 6 - "file-too-large", -#define IDLE_NO_BANDWIDTH 7 - "no-bandwidth", -#define IDLE_TAPER_WAIT 8 - "taper-wait", + T_("client-constrained"), +#define IDLE_NO_BANDWIDTH 5 + T_("no-bandwidth"), +#define IDLE_NO_DISKSPACE 6 + T_("no-diskspace") }; -#define SLEEP_MAX (24*3600) -struct timeval sleep_time = { SLEEP_MAX, 0 }; -/* enabled if any disks are in start-wait: */ -int any_delayed_disk = 0; - -int main(main_argc, main_argv) - int main_argc; - char **main_argv; +int +main( + int argc, + char ** argv) { - disklist_t *origqp; + disklist_t origq; disk_t *diskp; - fd_set selectset; - int fd, dsk; + int dsk; dumper_t *dumper; char *newdir = NULL; - generic_fs_stats_t fs; + struct fs_usage fsusage; holdingdisk_t *hdp; - unsigned long malloc_hist_1, malloc_size_1; - unsigned long malloc_hist_2, malloc_size_2; + identlist_t il; unsigned long reserve = 100; - char *conffile; char *conf_diskfile; - cmd_t cmd; - int result_argc; - char *result_argv[MAX_ARGS+1]; + char **result_argv = NULL; char *taper_program; - amwait_t retstat; char *conf_tapetype; tapetype_t *tape; + char *line; + char hostname[1025]; + intmax_t kb_avail; + config_overrides_t *cfg_ovr = NULL; + char *cfg_opt = NULL; + holdalloc_t *ha, *ha_last; + find_result_t *holding_files; + disklist_t holding_disklist = { NULL, NULL }; + int no_taper = FALSE; - for(fd = 3; fd < FD_SETSIZE; fd++) { - /* - * Make sure nobody spoofs us with a lot of extra open files - * that would cause an open we do to get a very high file - * descriptor, which in turn might be used as an index into - * an array (e.g. an fd_set). - */ - close(fd); - } + /* + * Configure program for internationalization: + * 1) Only set the message locale for now. + * 2) Set textdomain for all amanda related programs to "amanda" + * We don't want to be forced to support dozens of message catalogs. + */ + setlocale(LC_MESSAGES, "C"); + textdomain("amanda"); - setvbuf(stdout, (char *)NULL, _IOLBF, 0); - setvbuf(stderr, (char *)NULL, _IOLBF, 0); + safe_fd(-1, 0); + + setvbuf(stdout, (char *)NULL, (int)_IOLBF, 0); + setvbuf(stderr, (char *)NULL, (int)_IOLBF, 0); set_pname("driver"); - signal(SIGPIPE, SIG_IGN); + dbopen(DBG_SUBDIR_SERVER); - malloc_size_1 = malloc_inuse(&malloc_hist_1); + atexit(wait_for_children); + + /* Don't die when child closes pipe */ + signal(SIGPIPE, SIG_IGN); - erroutput_type = (ERR_AMANDALOG|ERR_INTERACTIVE); - set_logerror(logerror); + add_amanda_log_handler(amanda_log_stderr); + add_amanda_log_handler(amanda_log_trace_log); startclock(); - FD_ZERO(&readset); - printf("%s: pid %ld executable %s version %s\n", - get_pname(), (long) getpid(), main_argv[0], version()); + cfg_ovr = extract_commandline_config_overrides(&argc, &argv); - if (main_argc > 1) { - config_name = stralloc(main_argv[1]); - config_dir = vstralloc(CONFIG_DIR, "/", config_name, "/", NULL); - if(main_argc > 2) { - if(strncmp(main_argv[2], "nodump", 6) == 0) { - nodump = 1; - } - } + if (argc > 1) + cfg_opt = argv[1]; + set_config_overrides(cfg_ovr); + config_init(CONFIG_INIT_EXPLICIT_NAME | CONFIG_INIT_USE_CWD, cfg_opt); - } else { - char my_cwd[STR_SIZE]; + conf_diskfile = config_dir_relative(getconf_str(CNF_DISKFILE)); + read_diskfile(conf_diskfile, &origq); + disable_skip_disk(&origq); + amfree(conf_diskfile); - if (getcwd(my_cwd, sizeof(my_cwd)) == NULL) { - error("cannot determine current working directory"); - } - config_dir = stralloc2(my_cwd, "/"); - if ((config_name = strrchr(my_cwd, '/')) != NULL) { - config_name = stralloc(config_name + 1); + if (config_errors(NULL) >= CFGERR_WARNINGS) { + config_print_errors(); + if (config_errors(NULL) >= CFGERR_ERRORS) { + g_critical(_("errors processing config file")); } } - safe_cd(); + log_add(L_INFO, "%s pid %ld", get_pname(), (long)getpid()); + g_printf(_("%s: pid %ld executable %s version %s\n"), + get_pname(), (long) getpid(), argv[0], VERSION); - conffile = stralloc2(config_dir, CONFFILE_NAME); - if(read_conffile(conffile)) { - error("errors processing config file \"%s\"", conffile); + if(argc > 2) { + if(strcmp(argv[2], "nodump") == 0) { + nodump = 1; + argv++; + argc--; + } } - amfree(conffile); - - amfree(datestamp); - datestamp = construct_datestamp(NULL); - timestamp = construct_timestamp(NULL); - log_add(L_START,"date %s", datestamp); - taper_program = vstralloc(libexecdir, "/", "taper", versionsuffix(), NULL); - dumper_program = vstralloc(libexecdir, "/", "dumper", versionsuffix(), - NULL); + if (argc > 2) { + if (strcmp(argv[2], "--no-taper") == 0) { + no_taper = TRUE; + argv++; + argc--; + } + } - conf_taperalgo = getconf_int(CNF_TAPERALGO); - conf_tapetype = getconf_str(CNF_TAPETYPE); - tape = lookup_tapetype(conf_tapetype); - tape_length = tape->length; - printf("driver: tape size %ld\n", tape_length); + safe_cd(); /* do this *after* config_init */ - /* taper takes a while to get going, so start it up right away */ + check_running_as(RUNNING_AS_DUMPUSER); - init_driverio(); - startup_tape_process(taper_program); - taper_cmd(START_TAPER, datestamp, NULL, 0, NULL); + dbrename(get_config_name(), DBG_SUBDIR_SERVER); - /* start initializing: read in databases */ + /* load DLEs from the holding disk, in case there's anything to flush there */ + search_holding_disk(&holding_files, &holding_disklist); + /* note that the dumps are added to the global disklist, so we need not consult + * holding_files or holding_disklist after this */ - conf_diskfile = getconf_str(CNF_DISKFILE); - if (*conf_diskfile == '/') { - conf_diskfile = stralloc(conf_diskfile); - } else { - conf_diskfile = stralloc2(config_dir, conf_diskfile); + amfree(driver_timestamp); + /* read timestamp from stdin */ + while ((line = agets(stdin)) != NULL) { + if (line[0] != '\0') + break; + amfree(line); } - if((origqp = read_diskfile(conf_diskfile)) == NULL) { - error("could not load disklist \"%s\"", conf_diskfile); + if ( line == NULL ) { + error(_("Did not get DATE line from planner")); + /*NOTREACHED*/ + } + driver_timestamp = alloc(15); + strncpy(driver_timestamp, &line[5], 14); + driver_timestamp[14] = '\0'; + amfree(line); + log_add(L_START,_("date %s"), driver_timestamp); + + gethostname(hostname, SIZEOF(hostname)); + log_add(L_STATS,_("hostname %s"), hostname); + + /* check that we don't do many dump in a day and usetimestamps is off */ + if(strlen(driver_timestamp) == 8) { + if (!nodump) { + char *conf_logdir = getconf_str(CNF_LOGDIR); + char *logfile = vstralloc(conf_logdir, "/log.", + driver_timestamp, ".0", NULL); + char *oldlogfile = vstralloc(conf_logdir, "/oldlog/log.", + driver_timestamp, ".0", NULL); + if(access(logfile, F_OK) == 0 || access(oldlogfile, F_OK) == 0) { + log_add(L_WARNING, _("WARNING: This is not the first amdump run today. Enable the usetimestamps option in the configuration file if you want to run amdump more than once per calendar day.")); + } + amfree(oldlogfile); + amfree(logfile); + } + hd_driver_timestamp = get_timestamp_from_time(0); + } + else { + hd_driver_timestamp = stralloc(driver_timestamp); } - amfree(conf_diskfile); - /* set up any configuration-dependent variables */ + taper_program = vstralloc(amlibexecdir, "/", "taper", NULL); + dumper_program = vstralloc(amlibexecdir, "/", "dumper", NULL); + chunker_program = vstralloc(amlibexecdir, "/", "chunker", NULL); - inparallel = getconf_int(CNF_INPARALLEL); + conf_taperalgo = getconf_taperalgo(CNF_TAPERALGO); + conf_taper_parallel_write = getconf_int(CNF_TAPER_PARALLEL_WRITE); + conf_tapetype = getconf_str(CNF_TAPETYPE); + conf_runtapes = getconf_int(CNF_RUNTAPES); + if (conf_taper_parallel_write > conf_runtapes) { + conf_taper_parallel_write = conf_runtapes; + } + tape = lookup_tapetype(conf_tapetype); + tape_length = tapetype_get_length(tape); + g_printf("driver: tape size %lld\n", (long long)tape_length); + conf_flush_threshold_dumped = getconf_int(CNF_FLUSH_THRESHOLD_DUMPED); + conf_flush_threshold_scheduled = getconf_int(CNF_FLUSH_THRESHOLD_SCHEDULED); + conf_taperflush = getconf_int(CNF_TAPERFLUSH); + + flush_threshold_dumped = (conf_flush_threshold_dumped * tape_length) / 100; + flush_threshold_scheduled = (conf_flush_threshold_scheduled * tape_length) / 100; + taperflush = (conf_taperflush *tape_length) / 100; - reserve = getconf_int(CNF_RESERVE); + driver_debug(1, _("flush-threshold-dumped: %lld\n"), (long long)flush_threshold_dumped); + driver_debug(1, _("flush-threshold-scheduled: %lld\n"), (long long)flush_threshold_scheduled); + driver_debug(1, _("taperflush: %lld\n"), (long long)taperflush); + + /* set up any configuration-dependent variables */ - total_disksize = 0; - for(hdp = getconf_holdingdisks(), dsk = 0; hdp != NULL; hdp = hdp->next, dsk++) { - hdp->up = (void *)alloc(sizeof(holdalloc_t)); - holdalloc(hdp)->allocated_dumpers = 0; - holdalloc(hdp)->allocated_space = 0L; + inparallel = getconf_int(CNF_INPARALLEL); - if(get_fs_stats(hdp->diskdir, &fs) == -1 - || access(hdp->diskdir, W_OK) == -1) { - log_add(L_WARNING, "WARNING: ignoring holding disk %s: %s\n", - hdp->diskdir, strerror(errno)); - hdp->disksize = 0L; + reserve = (unsigned long)getconf_int(CNF_RESERVE); + + total_disksize = (off_t)0; + ha_last = NULL; + num_holdalloc = 0; + for (il = getconf_identlist(CNF_HOLDINGDISK), dsk = 0; + il != NULL; + il = il->next, dsk++) { + hdp = lookup_holdingdisk(il->data); + ha = alloc(SIZEOF(holdalloc_t)); + num_holdalloc++; + + /* link the list in the same order as getconf_holdingdisks's results */ + ha->next = NULL; + if (ha_last == NULL) + holdalloc = ha; + else + ha_last->next = ha; + ha_last = ha; + + ha->hdisk = hdp; + ha->allocated_dumpers = 0; + ha->allocated_space = (off_t)0; + ha->disksize = holdingdisk_get_disksize(hdp); + + /* get disk size */ + if(get_fs_usage(holdingdisk_get_diskdir(hdp), NULL, &fsusage) == -1 + || access(holdingdisk_get_diskdir(hdp), W_OK) == -1) { + log_add(L_WARNING, _("WARNING: ignoring holding disk %s: %s\n"), + holdingdisk_get_diskdir(hdp), strerror(errno)); + ha->disksize = 0L; continue; } - if(fs.avail != -1) { - if(hdp->disksize > 0) { - if(hdp->disksize > fs.avail) { - log_add(L_WARNING, - "WARNING: %s: %ld KB requested, but only %ld KB available.", - hdp->diskdir, hdp->disksize, fs.avail); - hdp->disksize = fs.avail; - } - } - else if(fs.avail + hdp->disksize < 0) { + /* do the division first to avoid potential integer overflow */ + if (fsusage.fsu_bavail_top_bit_set) + kb_avail = 0; + else + kb_avail = fsusage.fsu_bavail / 1024 * fsusage.fsu_blocksize; + + if(ha->disksize > (off_t)0) { + if(ha->disksize > kb_avail) { log_add(L_WARNING, - "WARNING: %s: not %ld KB free.", - hdp->diskdir, -hdp->disksize); - hdp->disksize = 0L; - continue; + _("WARNING: %s: %lld KB requested, " + "but only %lld KB available."), + holdingdisk_get_diskdir(hdp), + (long long)ha->disksize, + (long long)kb_avail); + ha->disksize = kb_avail; } - else - hdp->disksize += fs.avail; } + /* ha->disksize is negative; use all but that amount */ + else if(kb_avail < -ha->disksize) { + log_add(L_WARNING, + _("WARNING: %s: not %lld KB free."), + holdingdisk_get_diskdir(hdp), + (long long)-ha->disksize); + ha->disksize = (off_t)0; + continue; + } + else + ha->disksize += kb_avail; - printf("driver: adding holding disk %d dir %s size %ld chunksize %ld\n", - dsk, hdp->diskdir, hdp->disksize, hdp->chunksize); + g_printf(_("driver: adding holding disk %d dir %s size %lld chunksize %lld\n"), + dsk, holdingdisk_get_diskdir(hdp), + (long long)ha->disksize, + (long long)(holdingdisk_get_chunksize(hdp))); newdir = newvstralloc(newdir, - hdp->diskdir, "/", timestamp, + holdingdisk_get_diskdir(hdp), "/", hd_driver_timestamp, NULL); - if(!mkholdingdir(newdir)) { - hdp->disksize = 0L; + if(!mkholdingdir(newdir)) { + ha->disksize = (off_t)0; } - total_disksize += hdp->disksize; + total_disksize += ha->disksize; } - reserved_space = total_disksize * (reserve / 100.0); + reserved_space = total_disksize * (off_t)(reserve / 100); - printf("reserving %ld out of %ld for degraded-mode dumps\n", - reserved_space, free_space()); + g_printf(_("reserving %lld out of %lld for degraded-mode dumps\n"), + (long long)reserved_space, (long long)free_space()); amfree(newdir); if(inparallel > MAX_DUMPERS) inparallel = MAX_DUMPERS; - /* fire up the dumpers now while we are waiting */ + /* taper takes a while to get going, so start it up right away */ + + init_driverio(); + startup_tape_process(taper_program, conf_taper_parallel_write, no_taper); - if(!nodump) startup_dump_processes(dumper_program, inparallel); + /* fire up the dumpers now while we are waiting */ + if(!nodump) startup_dump_processes(dumper_program, inparallel, driver_timestamp); /* * Read schedule from stdin. Usually, this is a pipe from planner, @@ -311,233 +450,421 @@ int main(main_argc, main_argv) * in parallel with the planner. */ - waitq = *origqp; - tapeq.head = tapeq.tail = NULL; - roomq.head = roomq.tail = NULL; - runq.head = runq.tail = NULL; + runq.head = NULL; + runq.tail = NULL; + directq.head = NULL; + directq.tail = NULL; + waitq = origq; + tapeq.head = NULL; + tapeq.tail = NULL; + roomq.head = NULL; + roomq.tail = NULL; + taper_nb_wait_reply = 0; + + need_degraded = 0; + if (no_taper || conf_runtapes <= 0) { + taper_started = 1; /* we'll pretend the taper started and failed immediately */ + need_degraded = 1; + } else { + tapetable[0].state = TAPER_STATE_INIT; + taper_nb_wait_reply++; + taper_nb_scan_volume++; + taper_ev_read = event_register(taper_fd, EV_READFD, + handle_taper_result, NULL); + taper_cmd(START_TAPER, NULL, tapetable[0].name, 0, driver_timestamp); + } - read_flush(&tapeq); + flush_ev_read = event_register((event_id_t)0, EV_READFD, read_flush, NULL); - log_add(L_STATS, "startup time %s", walltime_str(curclock())); + log_add(L_STATS, _("startup time %s"), walltime_str(curclock())); - printf("driver: start time %s inparallel %d bandwidth %d diskspace %lu", - walltime_str(curclock()), inparallel, free_kps((interface_t *)0), - free_space()); - printf(" dir %s datestamp %s driver: drain-ends tapeq %s big-dumpers %s\n", - "OBSOLETE", datestamp, taperalgo2str(conf_taperalgo), + g_printf(_("driver: start time %s inparallel %d bandwidth %lu diskspace %lld "), walltime_str(curclock()), inparallel, + free_kps(NULL), (long long)free_space()); + g_printf(_(" dir %s datestamp %s driver: drain-ends tapeq %s big-dumpers %s\n"), + "OBSOLETE", driver_timestamp, taperalgo2str(conf_taperalgo), getconf_str(CNF_DUMPORDER)); fflush(stdout); - /* Let's see if the tape is ready */ - - cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1); - - if(cmd != TAPER_OK) { - /* no tape, go into degraded mode: dump to holding disk */ - start_degraded_mode(&runq); - FD_CLR(taper,&readset); - } + schedule_done = nodump; + force_flush = 0; - short_dump_state(); /* for amstatus */ - - tape_left = tape_length; - taper_busy = 0; - taper_disk = NULL; + short_dump_state(); + event_loop(0); - /* Start autoflush while waiting for dump schedule */ - if(!nodump) { - /* Start any autoflush tape writes */ - if (!empty(tapeq)) { - startaflush(); - short_dump_state(); /* for amstatus */ + force_flush = 1; - /* Process taper results until the schedule arrives */ - while (1) { - FD_ZERO(&selectset); - FD_SET(0, &selectset); - FD_SET(taper, &selectset); + /* mv runq to directq */ + while (!empty(runq)) { + diskp = dequeue_disk(&runq); + headqueue_disk(&directq, diskp); + } - if(select(taper+1, (SELECT_ARG_TYPE *)(&selectset), NULL, NULL, - &sleep_time) == -1) - error("select: %s", strerror(errno)); - if (FD_ISSET(0, &selectset)) break; /* schedule arrived */ - if (FD_ISSET(taper, &selectset)) handle_taper_result(); - short_dump_state(); /* for amstatus */ + /* handle any remaining dumps by dumping directly to tape, if possible */ + while(!empty(directq) && taper_fd > 0) { + time_t sleep_time = 100000000; + disk_t *sleep_diskp = NULL; + time_t now = time(0); + + /* Find one we can do immediately or the sonner */ + for (diskp = directq.head; diskp != NULL; diskp = diskp->next) { + if (diskp->to_holdingdisk == HOLD_REQUIRED || + degraded_mode) { + sleep_time = 0; + sleep_diskp = diskp; + } else if (diskp->host->start_t - now < sleep_time && + diskp->start_t -now < sleep_time) { + if (diskp->host->start_t > diskp->start_t) + sleep_time = diskp->host->start_t - now; + else + sleep_time = diskp->start_t - now; + sleep_diskp = diskp; } - } + diskp = sleep_diskp; + if (sleep_time > 0) + sleep(sleep_time); + remove_disk(&directq, diskp); - /* Read the dump schedule */ - read_schedule(&waitq, &runq); + if (diskp->to_holdingdisk == HOLD_REQUIRED) { + char *qname = quote_string(diskp->name); + log_add(L_FAIL, "%s %s %s %d [%s]", + diskp->host->hostname, qname, sched(diskp)->datestamp, + sched(diskp)->level, + _("can't dump required holdingdisk")); + amfree(qname); + } + else if (!degraded_mode) { + char *qname = quote_string(diskp->name); + log_add(L_FAIL, "%s %s %s %d [%s]", + diskp->host->hostname, qname, sched(diskp)->datestamp, + sched(diskp)->level, + _("can't dump in degraded mode")); + amfree(qname); + } + else { + char *qname = quote_string(diskp->name); + log_add(L_FAIL, "%s %s %s %d [%s]", + diskp->host->hostname, qname, sched(diskp)->datestamp, + sched(diskp)->level, + num_holdalloc == 0 ? + _("can't do degraded dump without holding disk") : + diskp->to_holdingdisk != HOLD_NEVER ? + _("out of holding space in degraded mode") : + _("can't dump 'holdingdisk never' dle in degraded mode")); + amfree(qname); + } } - /* Start any needed flushes */ + /* fill up the tape or start new one for taperflush */ startaflush(); + event_loop(0); - while(start_some_dumps(&runq) || some_dumps_in_progress() || - any_delayed_disk) { - short_dump_state(); - - /* wait for results */ - - memcpy(&selectset, &readset, sizeof(fd_set)); - if(select(maxfd+1, (SELECT_ARG_TYPE *)(&selectset), - NULL, NULL, &sleep_time) == -1) - error("select: %s", strerror(errno)); + short_dump_state(); /* for amstatus */ - /* handle any results that have come in */ + g_printf(_("driver: QUITTING time %s telling children to quit\n"), + walltime_str(curclock())); + fflush(stdout); - for(fd = 0; fd <= maxfd; fd++) { - /* - * The first pass through the following loop, we have - * data ready for areads (called by getresult, called by - * handle_.*_result). But that may read more than one record, - * so we need to keep processing as long as areads has data. - * We will get control back after each record and the buffer - * will go empty (indicated by areads_dataready(fd) == 0) - * after the last one available has been processed. - */ - while(FD_ISSET(fd, &selectset) || areads_dataready(fd) > 0) { - if(fd == taper) handle_taper_result(); - else handle_dumper_result(fd); - FD_CLR(fd, &selectset); - } + if(!nodump) { + for(dumper = dmptable; dumper < dmptable + inparallel; dumper++) { + if(dumper->fd >= 0) + dumper_cmd(dumper, QUIT, NULL, NULL); } + } + if(taper_fd >= 0) { + taper_cmd(QUIT, NULL, NULL, 0, NULL); } - /* handle any remaining dumps by dumping directly to tape, if possible */ + /* wait for all to die */ + wait_children(600); - while(!empty(runq)) { - diskp = dequeue_disk(&runq); - if(!degraded_mode) { - int rc = dump_to_tape(diskp); - if(rc == 1) - log_add(L_INFO, - "%s %s %d [dump to tape failed, will try again]", - diskp->host->hostname, - diskp->name, - sched(diskp)->level); - else if(rc == 2) - log_add(L_FAIL, "%s %s %s %d [dump to tape failed]", - diskp->host->hostname, - diskp->name, - sched(diskp)->datestamp, - sched(diskp)->level); - } - else - log_add(L_FAIL, "%s %s %s %d [%s]", - diskp->host->hostname, diskp->name, - sched(diskp)->datestamp, sched(diskp)->level, - diskp->no_hold ? - "can't dump no-hold disk in degraded mode" : - "no more holding disk space"); - } + /* cleanup */ + holding_cleanup(NULL, NULL); - short_dump_state(); /* for amstatus */ + amfree(newdir); - printf("driver: QUITTING time %s telling children to quit\n", - walltime_str(curclock())); + check_unfree_serial(); + g_printf(_("driver: FINISHED time %s\n"), walltime_str(curclock())); fflush(stdout); + log_add(L_FINISH,_("date %s time %s"), driver_timestamp, walltime_str(curclock())); + log_add(L_INFO, "pid-done %ld", (long)getpid()); + amfree(driver_timestamp); + + amfree(dumper_program); + amfree(taper_program); + if (result_argv) + g_strfreev(result_argv); + + dbclose(); + + return 0; +} + +/* sleep up to count seconds, and wait for terminating child process */ +/* if sleep is negative, this function will not timeout */ +/* exit once all child process are finished or the timout expired */ +/* return 0 if no more children to wait */ +/* return 1 if some children are still alive */ +static int +wait_children(int count) +{ + pid_t pid; + amwait_t retstat; + char *who; + char *what; + int code=0; + dumper_t *dumper; + int wait_errno; + + do { + do { + pid = waitpid((pid_t)-1, &retstat, WNOHANG); + wait_errno = errno; + if (pid > 0) { + what = NULL; + if (! WIFEXITED(retstat)) { + what = _("signal"); + code = WTERMSIG(retstat); + } else if (WEXITSTATUS(retstat) != 0) { + what = _("code"); + code = WEXITSTATUS(retstat); + } + who = NULL; + for (dumper = dmptable; dumper < dmptable + inparallel; + dumper++) { + if (pid == dumper->pid) { + who = stralloc(dumper->name); + dumper->pid = -1; + break; + } + if (dumper->chunker && pid == dumper->chunker->pid) { + who = stralloc(dumper->chunker->name); + dumper->chunker->pid = -1; + break; + } + } + if (who == NULL && pid == taper_pid) { + who = stralloc("taper"); + taper_pid = -1; + } + if(what != NULL && who == NULL) { + who = stralloc("unknown"); + } + if(who && what) { + log_add(L_WARNING, _("%s pid %u exited with %s %d\n"), who, + (unsigned)pid, what, code); + g_printf(_("driver: %s pid %u exited with %s %d\n"), who, + (unsigned)pid, what, code); + } + amfree(who); + } + } while (pid > 0 || wait_errno == EINTR); + if (errno != ECHILD) + sleep(1); + if (count > 0) + count--; + } while ((errno != ECHILD) && (count != 0)); + return (errno != ECHILD); +} + +static void +kill_children(int signal) +{ + dumper_t *dumper; if(!nodump) { - for(dumper = dmptable; dumper < dmptable + inparallel; dumper++) { - dumper_cmd(dumper, QUIT, NULL); - } + for(dumper = dmptable; dumper < dmptable + inparallel; dumper++) { + if (!dumper->down && dumper->pid > 1) { + g_printf(_("driver: sending signal %d to %s pid %u\n"), signal, + dumper->name, (unsigned)dumper->pid); + if (kill(dumper->pid, signal) == -1 && errno == ESRCH) { + if (dumper->chunker) + dumper->chunker->pid = 0; + } + if (dumper->chunker && dumper->chunker->pid > 1) { + g_printf(_("driver: sending signal %d to %s pid %u\n"), signal, + dumper->chunker->name, + (unsigned)dumper->chunker->pid); + if (kill(dumper->chunker->pid, signal) == -1 && + errno == ESRCH) + dumper->chunker->pid = 0; + } + } + } } - if(taper >= 0) { - taper_cmd(QUIT, NULL, NULL, 0, NULL); + if(taper_pid > 1) { + g_printf(_("driver: sending signal %d to %s pid %u\n"), signal, + "taper", (unsigned)taper_pid); + if (kill(taper_pid, signal) == -1 && errno == ESRCH) + taper_pid = 0; } +} - /* wait for all to die */ +static void +wait_for_children(void) +{ + dumper_t *dumper; - while(1) { - char number[NUM_STR_SIZE]; - pid_t pid; - char *who; - char *what; - int code=0; - - if((pid = wait(&retstat)) == -1) { - if(errno == EINTR) continue; - else break; - } - what = NULL; - if(! WIFEXITED(retstat)) { - what = "signal"; - code = WTERMSIG(retstat); - } else if(WEXITSTATUS(retstat) != 0) { - what = "code"; - code = WEXITSTATUS(retstat); - } - who = NULL; + if(!nodump) { for(dumper = dmptable; dumper < dmptable + inparallel; dumper++) { - if(pid == dumper->pid) { - who = stralloc(dumper->name); - break; + if (dumper->pid > 1 && dumper->fd >= 0) { + dumper_cmd(dumper, QUIT, NULL, NULL); + if (dumper->chunker && dumper->chunker->pid > 1 && + dumper->chunker->fd >= 0) + chunker_cmd(dumper->chunker, QUIT, NULL, NULL); } } - if(who == NULL && pid == taper_pid) { - who = stralloc("taper"); - } - if(what != NULL && who == NULL) { - ap_snprintf(number, sizeof(number), "%ld", (long)pid); - who = stralloc2("unknown pid ", number); - } - if(who && what) { - log_add(L_WARNING, "%s exited with %s %d\n", who, what, code); - printf("driver: %s exited with %s %d\n", who, what, code); - } - amfree(who); } - for(dumper = dmptable; dumper < dmptable + inparallel; dumper++) { - amfree(dumper->name); + if(taper_pid > 1 && taper_fd > 0) { + taper_cmd(QUIT, NULL, NULL, 0, NULL); } - for(hdp = getconf_holdingdisks(); hdp != NULL; hdp = hdp->next) { - cleanup_holdingdisk(hdp->diskdir, 0); - amfree(hdp->up); - } - amfree(newdir); + if(wait_children(60) == 0) + return; - printf("driver: FINISHED time %s\n", walltime_str(curclock())); - fflush(stdout); - log_add(L_FINISH,"date %s time %s", datestamp, walltime_str(curclock())); - amfree(datestamp); - amfree(timestamp); + kill_children(SIGHUP); + if(wait_children(60) == 0) + return; - amfree(dumper_program); - amfree(taper_program); - amfree(config_dir); - amfree(config_name); + kill_children(SIGKILL); + if(wait_children(-1) == 0) + return; - malloc_size_2 = malloc_inuse(&malloc_hist_2); +} - if(malloc_size_1 != malloc_size_2) { - malloc_list(fileno(stderr), malloc_hist_1, malloc_hist_2); - } +static void startaflush_tape(taper_t *taper); - return 0; +static void +startaflush(void) +{ + taper_t *taper; + + for(taper = tapetable; taper <= tapetable+conf_taper_parallel_write; + taper++) { + if (!(taper->state & TAPER_STATE_DONE) && + taper->state & TAPER_STATE_WAIT_FOR_TAPE) { + startaflush_tape(taper); + } + } + for(taper = tapetable; taper <= tapetable+conf_taper_parallel_write; + taper++) { + if (!(taper->state & TAPER_STATE_DONE) && + taper->state & TAPER_STATE_TAPE_REQUESTED) { + startaflush_tape(taper); + } + } + for(taper = tapetable; taper <= tapetable+conf_taper_parallel_write; + taper++) { + if (!(taper->state & TAPER_STATE_DONE) && + taper->state & TAPER_STATE_INIT) { + startaflush_tape(taper); + } + } + for(taper = tapetable; taper <= tapetable+conf_taper_parallel_write; + taper++) { + if (!(taper->state & TAPER_STATE_DONE) && + taper->state & TAPER_STATE_IDLE) { + startaflush_tape(taper); + } + } } -void startaflush() { +static void +startaflush_tape( + taper_t *taper) +{ disk_t *dp = NULL; disk_t *fit = NULL; char *datestamp; + off_t extra_tapes_size = 0; + off_t taper_left; + char *qname; + TapeAction result_tape_action; + char *why_no_new_tape = NULL; + taper_t *taper1; + + result_tape_action = tape_action(taper, &why_no_new_tape); + + if (result_tape_action & TAPE_ACTION_SCAN) { + taper->state &= ~TAPER_STATE_TAPE_REQUESTED; + taper->state |= TAPER_STATE_WAIT_FOR_TAPE; + taper_nb_scan_volume++; + taper_cmd(START_SCAN, taper->disk, NULL, 0, NULL); + } else if (result_tape_action & TAPE_ACTION_NEW_TAPE) { + taper->state &= ~TAPER_STATE_WAIT_FOR_TAPE; + taper->state |= TAPER_STATE_WAIT_NEW_TAPE; + nb_sent_new_tape++; + taper_cmd(NEW_TAPE, taper->disk, NULL, 0, NULL); + } else if (result_tape_action & TAPE_ACTION_NO_NEW_TAPE) { + taper->state &= ~TAPER_STATE_WAIT_FOR_TAPE; + taper_cmd(NO_NEW_TAPE, taper->disk, why_no_new_tape, 0, NULL); + taper->state |= TAPER_STATE_DONE; + start_degraded_mode(&runq); + } else if (result_tape_action & TAPE_ACTION_MOVE) { + taper_t *taper1 = idle_taper(); + if (taper1) { + taper->state &= ~TAPER_STATE_TAPE_REQUESTED; + taper->state &= ~TAPER_STATE_WAIT_FOR_TAPE; + taper_cmd(TAKE_SCRIBE_FROM, taper->disk, taper1->name, 0 , NULL); + taper1->state = TAPER_STATE_DEFAULT; + taper->state |= TAPER_STATE_TAPE_STARTED; + taper->left = taper1->left; + if (last_started_taper == taper1) { + last_started_taper = taper; + } + } + } + + if (!degraded_mode && + taper->state & TAPER_STATE_IDLE && + !empty(tapeq) && + (result_tape_action & TAPE_ACTION_START_A_FLUSH || + result_tape_action & TAPE_ACTION_START_A_FLUSH_FIT)) { + + int taperalgo = conf_taperalgo; + if (result_tape_action & TAPE_ACTION_START_A_FLUSH_FIT) { + if (taperalgo == ALGO_FIRST) + taperalgo = ALGO_FIRSTFIT; + else if (taperalgo == ALGO_LARGEST) + taperalgo = ALGO_LARGESTFIT; + else if (taperalgo == ALGO_SMALLEST) + taperalgo = ALGO_SMALLESTFIT; + else if (taperalgo == ALGO_LAST) + taperalgo = ALGO_LASTFIT; + } + + extra_tapes_size = tape_length * (off_t)(conf_runtapes - current_tape); + for (taper1 = tapetable; taper1 < tapetable + conf_taper_parallel_write; + taper1++) { + if (taper1->state & TAPER_STATE_TAPE_STARTED) { + extra_tapes_size += taper1->left; + } + dp = taper1->disk; + if (dp) { + extra_tapes_size -= (sched(dp)->act_size - taper1->written); + } + } - if(!degraded_mode && !taper_busy && !empty(tapeq)) { + if (taper->state & TAPER_STATE_TAPE_STARTED) { + taper_left = taper->left; + } else { + taper_left = tape_length; + } + dp = NULL; datestamp = sched(tapeq.head)->datestamp; - switch(conf_taperalgo) { + switch(taperalgo) { case ALGO_FIRST: dp = dequeue_disk(&tapeq); break; - case ALGO_FIRSTFIT: + case ALGO_FIRSTFIT: fit = tapeq.head; while (fit != NULL) { - if(sched(fit)->act_size <= tape_left && - strcmp(sched(fit)->datestamp, datestamp) <= 0) { + if (sched(fit)->act_size <= + (fit->splitsize ? extra_tapes_size : taper_left) && + strcmp(sched(fit)->datestamp, datestamp) <= 0) { dp = fit; fit = NULL; } @@ -547,7 +874,7 @@ void startaflush() { } if(dp) remove_disk(&tapeq, dp); break; - case ALGO_LARGEST: + case ALGO_LARGEST: fit = dp = tapeq.head; while (fit != NULL) { if(sched(fit)->act_size > sched(dp)->act_size && @@ -558,10 +885,11 @@ void startaflush() { } if(dp) remove_disk(&tapeq, dp); break; - case ALGO_LARGESTFIT: + case ALGO_LARGESTFIT: fit = tapeq.head; while (fit != NULL) { - if(sched(fit)->act_size <= tape_left && + if(sched(fit)->act_size <= + (fit->splitsize ? extra_tapes_size : taper_left) && (!dp || sched(fit)->act_size > sched(dp)->act_size) && strcmp(sched(fit)->datestamp, datestamp) <= 0) { dp = fit; @@ -570,45 +898,82 @@ void startaflush() { } if(dp) remove_disk(&tapeq, dp); break; - case ALGO_SMALLEST: + case ALGO_SMALLEST: + fit = dp = tapeq.head; + while (fit != NULL) { + if (sched(fit)->act_size < sched(dp)->act_size && + strcmp(sched(fit)->datestamp, datestamp) <= 0) { + dp = fit; + } + fit = fit->next; + } + if(dp) remove_disk(&tapeq, dp); + break; + case ALGO_SMALLESTFIT: + fit = dp = tapeq.head; + while (fit != NULL) { + if (sched(fit)->act_size <= + (fit->splitsize ? extra_tapes_size : taper_left) && + (!dp || sched(fit)->act_size < sched(dp)->act_size) && + strcmp(sched(fit)->datestamp, datestamp) <= 0) { + dp = fit; + } + fit = fit->next; + } + if(dp) remove_disk(&tapeq, dp); break; case ALGO_LAST: dp = tapeq.tail; remove_disk(&tapeq, dp); break; + case ALGO_LASTFIT: + fit = tapeq.tail; + while (fit != NULL) { + if (sched(fit)->act_size <= + (fit->splitsize ? extra_tapes_size : taper_left) && + (!dp || sched(fit)->act_size < sched(dp)->act_size) && + strcmp(sched(fit)->datestamp, datestamp) <= 0) { + dp = fit; + } + fit = fit->prev; + } + if(dp) remove_disk(&tapeq, dp); + break; } - if(!dp) { /* ALGO_SMALLEST, or default if nothing fit. */ - if(conf_taperalgo != ALGO_SMALLEST) { - fprintf(stderr, - "driver: startaflush: Using SMALLEST because nothing fit\n"); + if (dp) { + taper->disk = dp; + taper->dumper = NULL; + amfree(taper->input_error); + amfree(taper->tape_error); + taper->result = LAST_TOK; + taper->sendresult = 0; + amfree(taper->first_label); + taper->written = 0; + taper->state &= ~TAPER_STATE_IDLE; + taper->state |= TAPER_STATE_FILE_TO_TAPE; + taper->dumper = NULL; + qname = quote_string(dp->name); + if (taper_nb_wait_reply == 0) { + taper_ev_read = event_register(taper_fd, EV_READFD, + handle_taper_result, NULL); } - fit = dp = tapeq.head; - while (fit != NULL) { - if(sched(fit)->act_size < sched(dp)->act_size && - strcmp(sched(fit)->datestamp, datestamp) <= 0) { - dp = fit; - } - fit = fit->next; - } - if(dp) remove_disk(&tapeq, dp); - } - taper_disk = dp; - taper_busy = 1; - taper_cmd(FILE_WRITE, dp, sched(dp)->destname, sched(dp)->level, - sched(dp)->datestamp); - fprintf(stderr,"driver: startaflush: %s %s %s %ld %ld\n", - taperalgo2str(conf_taperalgo), dp->host->hostname, - dp->name, sched(taper_disk)->act_size, tape_left); - if(sched(dp)->act_size <= tape_left) - tape_left -= sched(dp)->act_size; - else - tape_left = 0; + taper_nb_wait_reply++; + sched(dp)->taper = taper; + taper_cmd(FILE_WRITE, dp, sched(dp)->destname, sched(dp)->level, + sched(dp)->datestamp); + g_fprintf(stderr,_("driver: startaflush: %s %s %s %lld %lld\n"), + taperalgo2str(taperalgo), dp->host->hostname, qname, + (long long)sched(taper->disk)->act_size, + (long long)taper->left); + amfree(qname); + } + short_dump_state(); } } - -int client_constrained(dp) -disk_t *dp; +static int +client_constrained( + disk_t * dp) { disk_t *dp2; @@ -632,271 +997,498 @@ disk_t *dp; return 0; } -int start_some_dumps(rq) -disklist_t *rq; +static void +allow_dump_dle( + disk_t *diskp, + taper_t *taper, + char dumptype, + disklist_t *rq, + const time_t now, + int dumper_to_holding, + int *cur_idle, + disk_t **delayed_diskp, + disk_t **diskp_accept, + assignedhd_t ***holdp_accept, + off_t extra_tapes_size) { - int total, cur_idle; - disk_t *diskp, *diskp_accept; - dumper_t *dumper; + assignedhd_t **holdp=NULL; + + if (diskp->host->start_t > now) { + *cur_idle = max(*cur_idle, IDLE_START_WAIT); + if (*delayed_diskp == NULL || sleep_time > diskp->host->start_t) { + *delayed_diskp = diskp; + sleep_time = diskp->host->start_t; + } + } else if(diskp->start_t > now) { + *cur_idle = max(*cur_idle, IDLE_START_WAIT); + if (*delayed_diskp == NULL || sleep_time > diskp->start_t) { + *delayed_diskp = diskp; + sleep_time = diskp->start_t; + } + } else if (diskp->host->netif->curusage > 0 && + sched(diskp)->est_kps > free_kps(diskp->host->netif)) { + *cur_idle = max(*cur_idle, IDLE_NO_BANDWIDTH); + } else if (!taper && sched(diskp)->no_space) { + *cur_idle = max(*cur_idle, IDLE_NO_DISKSPACE); + } else if (!taper && diskp->to_holdingdisk == HOLD_NEVER) { + *cur_idle = max(*cur_idle, IDLE_NO_HOLD); + } else if (extra_tapes_size && sched(diskp)->est_size > extra_tapes_size) { + *cur_idle = max(*cur_idle, IDLE_NO_DISKSPACE); + /* no tape space */ + } else if (!taper && (holdp = + find_diskspace(sched(diskp)->est_size, cur_idle, NULL)) == NULL) { + *cur_idle = max(*cur_idle, IDLE_NO_DISKSPACE); + if (empty(tapeq) && dumper_to_holding == 0 && rq != &directq) { + remove_disk(rq, diskp); + if (diskp->to_holdingdisk != HOLD_REQUIRED) { + enqueue_disk(&directq, diskp); + diskp->to_holdingdisk = HOLD_NEVER; + } + } + } else if (client_constrained(diskp)) { + free_assignedhd(holdp); + *cur_idle = max(*cur_idle, IDLE_CLIENT_CONSTRAINED); + } else { + + /* disk fits, dump it */ + int accept = !*diskp_accept; + if(!accept) { + switch(dumptype) { + case 's': accept = (sched(diskp)->est_size < sched(*diskp_accept)->est_size); + break; + case 'S': accept = (sched(diskp)->est_size > sched(*diskp_accept)->est_size); + break; + case 't': accept = (sched(diskp)->est_time < sched(*diskp_accept)->est_time); + break; + case 'T': accept = (sched(diskp)->est_time > sched(*diskp_accept)->est_time); + break; + case 'b': accept = (sched(diskp)->est_kps < sched(*diskp_accept)->est_kps); + break; + case 'B': accept = (sched(diskp)->est_kps > sched(*diskp_accept)->est_kps); + break; + default: log_add(L_WARNING, _("Unknown dumporder character \'%c\', using 's'.\n"), + dumptype); + accept = (sched(diskp)->est_size < sched(*diskp_accept)->est_size); + break; + } + } + if(accept) { + if( !*diskp_accept || !degraded_mode || diskp->priority >= (*diskp_accept)->priority) { + if(*holdp_accept) free_assignedhd(*holdp_accept); + *diskp_accept = diskp; + *holdp_accept = holdp; + } + else { + free_assignedhd(holdp); + } + } + else { + free_assignedhd(holdp); + } + } +} + +static void +start_some_dumps( + disklist_t *rq) +{ + const time_t now = time(NULL); + int cur_idle; + disk_t *diskp, *delayed_diskp, *diskp_accept; + disk_t *dp; assignedhd_t **holdp=NULL, **holdp_accept; - time_t now = time(NULL); + cmd_t cmd; + int result_argc; + char **result_argv; + chunker_t *chunker; + dumper_t *dumper; + taper_t *taper; + char dumptype; + char *dumporder; + int dumper_to_holding = 0; + + /* don't start any actual dumps until the taper is started */ + if (!taper_started) return; - total = 0; idle_reason = IDLE_NO_DUMPERS; - sleep_time.tv_sec = SLEEP_MAX; - sleep_time.tv_usec = 0; - any_delayed_disk = 0; + sleep_time = 0; - if(rq->head == NULL) { - idle_reason = 0; - return 0; + if(dumpers_ev_time != NULL) { + event_release(dumpers_ev_time); + dumpers_ev_time = NULL; } - /* - * A potential problem with starting from the bottom of the dump time - * distribution is that a slave host will have both one of the shortest - * and one of the longest disks, so starting its shortest disk first will - * tie up the host and eliminate its longest disk from consideration the - * first pass through. This could cause a big delay in starting that long - * disk, which could drag out the whole night's dumps. - * - * While starting from the top of the dump time distribution solves the - * above problem, this turns out to be a bad idea, because the big dumps - * will almost certainly pack the holding disk completely, leaving no - * room for even one small dump to start. This ends up shutting out the - * small-end dumpers completely (they stay idle). - * - * The introduction of multiple simultaneous dumps to one host alleviates - * the biggest&smallest dumps problem: both can be started at the - * beginning. - */ - for(dumper = dmptable; dumper < dmptable+inparallel; dumper++) { - if(dumper->busy || dumper->down) continue; - /* found an idle dumper, now find a disk for it */ - diskp = rq->head; - diskp_accept = NULL; - holdp_accept = NULL; + for(dumper = dmptable; dumper < (dmptable+inparallel); dumper++) { + if (dumper->busy && dumper->dp->to_holdingdisk != HOLD_NEVER) { + dumper_to_holding++; + } + } + for (dumper = dmptable; dumper < dmptable+inparallel; dumper++) { - if(idle_reason == IDLE_NO_DUMPERS) - idle_reason = NOT_IDLE; + if( dumper->busy || dumper->down) { + continue; + } - cur_idle = NOT_IDLE; + if (dumper->ev_read != NULL) { + event_release(dumper->ev_read); + dumper->ev_read = NULL; + } - while(diskp) { - assert(diskp->host != NULL && sched(diskp) != NULL); + /* + * A potential problem with starting from the bottom of the dump time + * distribution is that a slave host will have both one of the shortest + * and one of the longest disks, so starting its shortest disk first will + * tie up the host and eliminate its longest disk from consideration the + * first pass through. This could cause a big delay in starting that long + * disk, which could drag out the whole night's dumps. + * + * While starting from the top of the dump time distribution solves the + * above problem, this turns out to be a bad idea, because the big dumps + * will almost certainly pack the holding disk completely, leaving no + * room for even one small dump to start. This ends up shutting out the + * small-end dumpers completely (they stay idle). + * + * The introduction of multiple simultaneous dumps to one host alleviates + * the biggest&smallest dumps problem: both can be started at the + * beginning. + */ - /* round estimate to next multiple of DISK_BLOCK_KB */ - sched(diskp)->est_size = am_round(sched(diskp)->est_size, - DISK_BLOCK_KB); - - if(diskp->host->start_t > now) { - cur_idle = max(cur_idle, IDLE_START_WAIT); - sleep_time.tv_sec = min(diskp->host->start_t - now, - sleep_time.tv_sec); - any_delayed_disk = 1; - } - else if(diskp->start_t > now) { - cur_idle = max(cur_idle, IDLE_START_WAIT); - sleep_time.tv_sec = min(diskp->start_t - now, - sleep_time.tv_sec); - any_delayed_disk = 1; - } - else if(diskp->host->netif->curusage > 0 && - sched(diskp)->est_kps > free_kps(diskp->host->netif)) - cur_idle = max(cur_idle, IDLE_NO_BANDWIDTH); - else if(sched(diskp)->no_space) - cur_idle = max(cur_idle, IDLE_NO_DISKSPACE); - else if((holdp = find_diskspace(sched(diskp)->est_size,&cur_idle,NULL)) == NULL) - cur_idle = max(cur_idle, IDLE_NO_DISKSPACE); - else if(diskp->no_hold) { - free_assignedhd(holdp); - cur_idle = max(cur_idle, IDLE_NO_HOLD); - } else if(client_constrained(diskp)) { - free_assignedhd(holdp); - cur_idle = max(cur_idle, IDLE_CLIENT_CONSTRAINED); - } else { + diskp_accept = NULL; + holdp_accept = NULL; + delayed_diskp = NULL; - /* disk fits, dump it */ - int accept = !diskp_accept; - if(!accept) { - char dumptype; - char *dumporder = getconf_str(CNF_DUMPORDER); - if(strlen(dumporder) <= (dumper-dmptable)) { - if(dumper-dmptable < 3) - dumptype = 't'; - else - dumptype = 'T'; - } - else { - dumptype = dumporder[dumper-dmptable]; - } - switch(dumptype) { - case 's': accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size); - break; - case 'S': accept = (sched(diskp)->est_size > sched(diskp_accept)->est_size); - break; - case 't': accept = (sched(diskp)->est_time < sched(diskp_accept)->est_time); - break; - case 'T': accept = (sched(diskp)->est_time > sched(diskp_accept)->est_time); - break; - case 'b': accept = (sched(diskp)->est_kps < sched(diskp_accept)->est_kps); - break; - case 'B': accept = (sched(diskp)->est_kps > sched(diskp_accept)->est_kps); - break; - default: log_add(L_WARNING, "Unknown dumporder character \'%c\', using 's'.\n", - dumptype); - accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size); - break; + cur_idle = NOT_IDLE; + + dumporder = getconf_str(CNF_DUMPORDER); + if(strlen(dumporder) > (size_t)(dumper-dmptable)) { + dumptype = dumporder[dumper-dmptable]; + } + else { + if(dumper-dmptable < 3) + dumptype = 't'; + else + dumptype = 'T'; + } + + diskp = NULL; + taper = NULL; + if (!empty(directq)) { + taper = idle_taper(); + if (taper) { + TapeAction result_tape_action; + char *why_no_new_tape = NULL; + result_tape_action = tape_action(taper, &why_no_new_tape); + if (result_tape_action & TAPE_ACTION_START_A_FLUSH || + result_tape_action & TAPE_ACTION_START_A_FLUSH_FIT) { + off_t extra_tapes_size = 0; + taper_t *taper1; + + if (result_tape_action & TAPE_ACTION_START_A_FLUSH_FIT) { + extra_tapes_size = tape_length * + (off_t)(conf_runtapes - current_tape); + for (taper1 = tapetable; + taper1 < tapetable + conf_taper_parallel_write; + taper1++) { + if (taper1->state & TAPER_STATE_TAPE_STARTED) { + extra_tapes_size += taper1->left; + } + dp = taper1->disk; + if (dp) { + extra_tapes_size -= (sched(dp)->est_size - + taper1->written); + } + } } - } - if(accept) { - if( !diskp_accept || !degraded_mode || diskp->priority >= diskp_accept->priority) { - if(holdp_accept) free_assignedhd(holdp_accept); - diskp_accept = diskp; - holdp_accept = holdp; + + for (diskp = directq.head; diskp != NULL; + diskp = diskp->next) { + allow_dump_dle(diskp, taper, dumptype, &directq, now, + dumper_to_holding, &cur_idle, + &delayed_diskp, &diskp_accept, + &holdp_accept, extra_tapes_size); } - else { - free_assignedhd(holdp); + if (diskp_accept) { + diskp = diskp_accept; + holdp = holdp_accept; + } else { + taper = NULL; } - } - else { - free_assignedhd(holdp); + } else { + taper = NULL; } } - diskp = diskp->next; } - diskp = diskp_accept; - holdp = holdp_accept; - if(diskp) { - cur_idle = NOT_IDLE; - sched(diskp)->act_size = 0; - allocate_bandwidth(diskp->host->netif, sched(diskp)->est_kps); - sched(diskp)->activehd = assign_holdingdisk(holdp, diskp); - amfree(holdp); - diskp->host->inprogress += 1; /* host is now busy */ - diskp->inprogress = 1; - sched(diskp)->dumper = dumper; - sched(diskp)->timestamp = time((time_t *)0); + if (diskp == NULL) { + for(diskp = rq->head; diskp != NULL; diskp = diskp->next) { + assert(diskp->host != NULL && sched(diskp) != NULL); + + allow_dump_dle(diskp, NULL, dumptype, rq, now, + dumper_to_holding, &cur_idle, &delayed_diskp, + &diskp_accept, &holdp_accept, 0); + } + diskp = diskp_accept; + holdp = holdp_accept; + } + + idle_reason = max(idle_reason, cur_idle); + if (diskp == NULL && idle_reason == IDLE_NO_DISKSPACE) { + /* continue flush waiting for new tape */ + startaflush(); + } + + /* + * If we have no disk at this point, and there are disks that + * are delayed, then schedule a time event to call this dumper + * with the disk with the shortest delay. + */ + if (diskp == NULL && delayed_diskp != NULL) { + assert(sleep_time > now); + sleep_time -= now; + dumpers_ev_time = event_register((event_id_t)sleep_time, EV_TIME, + handle_dumpers_time, &runq); + return; + } else if (diskp != NULL && taper == NULL) { + sched(diskp)->act_size = (off_t)0; + allocate_bandwidth(diskp->host->netif, sched(diskp)->est_kps); + sched(diskp)->activehd = assign_holdingdisk(holdp, diskp); + amfree(holdp); + sched(diskp)->destname = newstralloc(sched(diskp)->destname, + sched(diskp)->holdp[0]->destname); + diskp->host->inprogress++; /* host is now busy */ + diskp->inprogress = 1; + sched(diskp)->dumper = dumper; + sched(diskp)->timestamp = now; + amfree(diskp->dataport_list); dumper->busy = 1; /* dumper is now busy */ dumper->dp = diskp; /* link disk to dumper */ - total++; remove_disk(rq, diskp); /* take it off the run queue */ - dumper_cmd(dumper, FILE_DUMP, diskp); - diskp->host->start_t = time(NULL) + 15; + + sched(diskp)->origsize = (off_t)-1; + sched(diskp)->dumpsize = (off_t)-1; + sched(diskp)->dumptime = (time_t)0; + sched(diskp)->tapetime = (time_t)0; + chunker = dumper->chunker = &chktable[dumper - dmptable]; + chunker->result = LAST_TOK; + dumper->result = LAST_TOK; + startup_chunk_process(chunker,chunker_program); + chunker_cmd(chunker, START, NULL, driver_timestamp); + chunker->dumper = dumper; + chunker_cmd(chunker, PORT_WRITE, diskp, NULL); + cmd = getresult(chunker->fd, 1, &result_argc, &result_argv); + if(cmd != PORT) { + assignedhd_t **h=NULL; + int activehd; + char *qname = quote_string(diskp->name); + + g_printf(_("driver: did not get PORT from %s for %s:%s\n"), + chunker->name, diskp->host->hostname, qname); + amfree(qname); + fflush(stdout); + + deallocate_bandwidth(diskp->host->netif, sched(diskp)->est_kps); + h = sched(diskp)->holdp; + activehd = sched(diskp)->activehd; + h[activehd]->used = 0; + h[activehd]->disk->allocated_dumpers--; + adjust_diskspace(diskp, DONE); + delete_diskspace(diskp); + diskp->host->inprogress--; + diskp->inprogress = 0; + sched(diskp)->dumper = NULL; + dumper->busy = 0; + dumper->dp = NULL; + sched(diskp)->dump_attempted++; + free_serial_dp(diskp); + if(sched(diskp)->dump_attempted < 2) + enqueue_disk(rq, diskp); + } + else { + dumper->ev_read = event_register((event_id_t)dumper->fd, EV_READFD, + handle_dumper_result, dumper); + chunker->ev_read = event_register((event_id_t)chunker->fd, EV_READFD, + handle_chunker_result, chunker); + dumper->output_port = atoi(result_argv[1]); + amfree(diskp->dataport_list); + diskp->dataport_list = stralloc(result_argv[2]); + + if (diskp->host->pre_script == 0) { + for (dp=diskp->host->disks; dp != NULL; dp = dp->hostnext) { + run_server_scripts(EXECUTE_ON_PRE_HOST_BACKUP, + get_config_name(), dp, -1); + } + diskp->host->pre_script = 1; + } + run_server_scripts(EXECUTE_ON_PRE_DLE_BACKUP, + get_config_name(), diskp, + sched(diskp)->level); + dumper_cmd(dumper, PORT_DUMP, diskp, NULL); + } + diskp->host->start_t = now + 15; + + if (result_argv) + g_strfreev(result_argv); + short_dump_state(); + } else if (diskp != NULL && taper != NULL) { /* dump to tape */ + sched(diskp)->act_size = (off_t)0; + allocate_bandwidth(diskp->host->netif, sched(diskp)->est_kps); + diskp->host->inprogress++; /* host is now busy */ + diskp->inprogress = 1; + sched(diskp)->dumper = dumper; + sched(diskp)->taper = taper; + sched(diskp)->timestamp = now; + dumper->chunker = NULL; + amfree(diskp->dataport_list); + + dumper->busy = 1; /* dumper is now busy */ + dumper->dp = diskp; /* link disk to dumper */ + remove_disk(&directq, diskp); /* take it off the direct queue */ + + sched(diskp)->origsize = (off_t)-1; + sched(diskp)->dumpsize = (off_t)-1; + sched(diskp)->dumptime = (time_t)0; + sched(diskp)->tapetime = (time_t)0; + dumper->result = LAST_TOK; + taper->result = LAST_TOK; + taper->input_error = NULL; + taper->tape_error = NULL; + taper->disk = diskp; + taper->first_label = NULL; + taper->written = 0; + taper->dumper = dumper; + taper->state |= TAPER_STATE_DUMP_TO_TAPE; + taper->state &= ~TAPER_STATE_IDLE; + if (taper_nb_wait_reply == 0) { + taper_ev_read = event_register(taper_fd, EV_READFD, + handle_taper_result, NULL); + } + + taper_nb_wait_reply++; + taper_cmd(PORT_WRITE, diskp, NULL, sched(diskp)->level, + sched(diskp)->datestamp); + diskp->host->start_t = now + 15; + + short_dump_state(); } - idle_reason = max(idle_reason, cur_idle); } - return total; } -int sort_by_priority_reversed(a, b) -disk_t *a, *b; -{ - if(sched(b)->priority - sched(a)->priority != 0) - return sched(b)->priority - sched(a)->priority; - else - return sort_by_time(a, b); -} +/* + * This gets called when a dumper is delayed for some reason. It may + * be because a disk has a delayed start, or amanda is constrained + * by network or disk limits. + */ -int sort_by_time(a, b) -disk_t *a, *b; +static void +handle_dumpers_time( + void * cookie) { - long diff; - - if ((diff = sched(a)->est_time - sched(b)->est_time) < 0) { - return -1; - } else if (diff > 0) { - return 1; - } else { - return 0; - } + disklist_t *runq = cookie; + event_release(dumpers_ev_time); + dumpers_ev_time = NULL; + start_some_dumps(runq); } -void dump_schedule(qp, str) -disklist_t *qp; -char *str; +static void +dump_schedule( + disklist_t *qp, + char * str) { disk_t *dp; + char *qname; - printf("dump of driver schedule %s:\n--------\n", str); + g_printf(_("dump of driver schedule %s:\n--------\n"), str); for(dp = qp->head; dp != NULL; dp = dp->next) { - printf(" %-20s %-25s lv %d t %5ld s %8lu p %d\n", - dp->host->hostname, dp->name, sched(dp)->level, - sched(dp)->est_time, sched(dp)->est_size, sched(dp)->priority); + qname = quote_string(dp->name); + g_printf(" %-20s %-25s lv %d t %5lu s %lld p %d\n", + dp->host->hostname, qname, sched(dp)->level, + sched(dp)->est_time, + (long long)sched(dp)->est_size, sched(dp)->priority); + amfree(qname); } - printf("--------\n"); + g_printf("--------\n"); } - -void start_degraded_mode(queuep) -disklist_t *queuep; +static void +start_degraded_mode( + /*@keep@*/ disklist_t *queuep) { disk_t *dp; disklist_t newq; - unsigned long est_full_size; + off_t est_full_size; + char *qname; newq.head = newq.tail = 0; - dump_schedule(queuep, "before start degraded mode"); + dump_schedule(queuep, _("before start degraded mode")); - est_full_size = 0; + est_full_size = (off_t)0; while(!empty(*queuep)) { dp = dequeue_disk(queuep); + qname = quote_string(dp->name); if(sched(dp)->level != 0) /* go ahead and do the disk as-is */ - insert_disk(&newq, dp, sort_by_priority_reversed); + enqueue_disk(&newq, dp); else { if (reserved_space + est_full_size + sched(dp)->est_size <= total_disksize) { - insert_disk(&newq, dp, sort_by_priority_reversed); + enqueue_disk(&newq, dp); est_full_size += sched(dp)->est_size; } else if(sched(dp)->degr_level != -1) { sched(dp)->level = sched(dp)->degr_level; sched(dp)->dumpdate = sched(dp)->degr_dumpdate; - sched(dp)->est_size = sched(dp)->degr_size; + sched(dp)->est_nsize = sched(dp)->degr_nsize; + sched(dp)->est_csize = sched(dp)->degr_csize; sched(dp)->est_time = sched(dp)->degr_time; sched(dp)->est_kps = sched(dp)->degr_kps; - insert_disk(&newq, dp, sort_by_priority_reversed); + enqueue_disk(&newq, dp); } else { - log_add(L_FAIL, "%s %s %s %d [can't switch to incremental dump]", - dp->host->hostname, dp->name, - sched(dp)->datestamp, sched(dp)->level); + log_add(L_FAIL, "%s %s %s %d [%s]", + dp->host->hostname, qname, sched(dp)->datestamp, + sched(dp)->level, sched(dp)->degr_mesg); } } + amfree(qname); } - *queuep = newq; + /*@i@*/ *queuep = newq; degraded_mode = 1; - dump_schedule(queuep, "after start degraded mode"); + dump_schedule(queuep, _("after start degraded mode")); } -void continue_dumps() + +static void +continue_port_dumps(void) { -disk_t *dp, *ndp; -assignedhd_t **h; -int active_dumpers=0, busy_dumpers=0, i; -dumper_t *dumper; + disk_t *dp, *ndp; + assignedhd_t **h; + int active_dumpers=0, busy_dumpers=0, i; + dumper_t *dumper; /* First we try to grant diskspace to some dumps waiting for it. */ for( dp = roomq.head; dp; dp = ndp ) { ndp = dp->next; /* find last holdingdisk used by this dump */ - for( i = 0, h = sched(dp)->holdp; h[i+1]; i++ ); + for( i = 0, h = sched(dp)->holdp; h[i+1]; i++ ) { + (void)h; /* Quiet lint */ + } /* find more space */ - h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size, &active_dumpers, h[i] ); + h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size, + &active_dumpers, h[i] ); if( h ) { for(dumper = dmptable; dumper < dmptable + inparallel && - dumper->dp != dp; dumper++); + dumper->dp != dp; dumper++) { + (void)dp; /* Quiet lint */ + } assert( dumper < dmptable + inparallel ); sched(dp)->activehd = assign_holdingdisk( h, dp ); - dumper_cmd( dumper, CONTINUE, dp ); + chunker_cmd( dumper->chunker, CONTINUE, dp, NULL ); amfree(h); remove_disk( &roomq, dp ); } @@ -907,447 +1499,1136 @@ dumper_t *dumper; * a) diskspace has been allocated for other dumps which are * still running or already being written to tape * b) all other dumps have been suspended due to lack of diskspace - * c) this dump doesn't fit on all the holding disks * Case a) is not a problem. We just wait for the diskspace to * be freed by moving the current disk to a queue. * If case b) occurs, we have a deadlock situation. We select * a dump from the queue to be aborted and abort it. It will - * be retried later dumping to disk. - * If case c) is detected, the dump is aborted. Next time - * it will be dumped directly to tape. Actually, case c is a special - * manifestation of case b) where only one dumper is busy. + * be retried directly to tape. */ - for( dp=NULL, dumper = dmptable; dumper < dmptable + inparallel; dumper++) { + for(dp=NULL, dumper = dmptable; dumper < (dmptable+inparallel); dumper++) { if( dumper->busy ) { busy_dumpers++; if( !find_disk(&roomq, dumper->dp) ) { active_dumpers++; - } else if( !dp || sched(dp)->est_size > sched(dumper->dp)->est_size ) { + } else if( !dp || + sched(dp)->est_size > sched(dumper->dp)->est_size ) { dp = dumper->dp; } } } - if( !active_dumpers && busy_dumpers > 0 && - ((!taper_busy && empty(tapeq)) || degraded_mode) && - pending_aborts == 0 ) { /* not case a */ - if( busy_dumpers == 1 ) { /* case c */ - sched(dp)->no_space = 1; - } - /* case b */ + if((dp != NULL) && (active_dumpers == 0) && (busy_dumpers > 0) && + ((all_taper_idle() && empty(tapeq)) || degraded_mode) && + pending_aborts == 0 ) { /* case b */ + sched(dp)->no_space = 1; /* At this time, dp points to the dump with the smallest est_size. * We abort that dump, hopefully not wasting too much time retrying it. */ remove_disk( &roomq, dp ); - dumper_cmd( sched(dp)->dumper, ABORT, NULL ); + chunker_cmd(sched(dp)->dumper->chunker, ABORT, NULL, _("Not enough holding disk space")); + dumper_cmd( sched(dp)->dumper, ABORT, NULL, _("Not enough holding disk space")); pending_aborts++; } } -void handle_taper_result() + +static void +handle_taper_result( + void *cookie G_GNUC_UNUSED) { - disk_t *dp; - int filenum; + disk_t *dp = NULL, *dp1; + dumper_t *dumper; cmd_t cmd; int result_argc; - char *result_argv[MAX_ARGS+1]; + char **result_argv; + char *qname, *q; + char *s; + taper_t *taper = NULL; + taper_t *taper1; + int i; + off_t partsize; - cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1); + assert(cookie == NULL); - switch(cmd) { + do { - case DONE: /* DONE