*/
#include "amanda.h"
+#include "find.h"
#include "clock.h"
#include "conffile.h"
#include "diskfile.h"
#include "infofile.h"
#include "logfile.h"
#include "fsusage.h"
-#include "version.h"
#include "driverio.h"
#include "server_util.h"
#include "timestamp.h"
// to tape
static disklist_t roomq; // dle waiting for more space on holding disk
static int pending_aborts;
-static disk_t *taper_disk;
static int degraded_mode;
static off_t reserved_space;
static off_t total_disksize;
static int inparallel;
static int nodump = 0;
static off_t tape_length = (off_t)0;
-static off_t tape_left = (off_t)0;
static int current_tape = 0;
static int conf_taperalgo;
+static int conf_taper_parallel_write;
static int conf_runtapes;
static time_t sleep_time;
static int idle_reason;
static holdalloc_t *holdalloc;
static int num_holdalloc;
static event_handle_t *dumpers_ev_time = NULL;
+static event_handle_t *flush_ev_read = NULL;
static event_handle_t *schedule_ev_read = NULL;
static int conf_flush_threshold_dumped;
static int conf_flush_threshold_scheduled;
// schedule from the planner
static int force_flush; // All dump are terminated, we
// must now respect taper_flush
+static int taper_nb_scan_volume = 0;
+static int nb_sent_new_tape = 0;
+static int taper_started = 0;
+static taper_t *last_started_taper;
static int wait_children(int count);
static void wait_for_children(void);
static int client_constrained(disk_t *dp);
static void deallocate_bandwidth(netif_t *ip, unsigned long kps);
static void dump_schedule(disklist_t *qp, char *str);
-static void dump_to_tape(disk_t *dp);
static assignedhd_t **find_diskspace(off_t size, int *cur_idle,
assignedhd_t *preferred);
static unsigned long free_kps(netif_t *ip);
static void handle_taper_result(void *);
static void holdingdisk_state(char *time_str);
-static dumper_t *idle_dumper(void);
+static taper_t *idle_taper(void);
+static taper_t *taper_from_name(char *name);
static void interface_state(char *time_str);
static int queue_length(disklist_t q);
-static disklist_t read_flush(void);
+static void read_flush(void *cookie);
static void read_schedule(void *cookie);
static void short_dump_state(void);
static void startaflush(void);
static void start_degraded_mode(disklist_t *queuep);
static void start_some_dumps(disklist_t *rq);
static void continue_port_dumps(void);
-static void update_failed_dump_to_tape(disk_t *);
+static void update_failed_dump(disk_t *);
+static int all_taper_idle(void);
typedef enum {
- TAPE_ACTION_NO_ACTION = 0,
- TAPE_ACTION_NEW_TAPE = (1 << 0),
- TAPE_ACTION_NO_NEW_TAPE = (1 << 1),
- TAPE_ACTION_START_A_FLUSH = (1 << 2)
+ TAPE_ACTION_NO_ACTION = 0,
+ TAPE_ACTION_SCAN = (1 << 0),
+ TAPE_ACTION_NEW_TAPE = (1 << 1),
+ TAPE_ACTION_NO_NEW_TAPE = (1 << 2),
+ TAPE_ACTION_START_A_FLUSH = (1 << 3),
+ TAPE_ACTION_START_A_FLUSH_FIT = (1 << 4),
+ TAPE_ACTION_MOVE = (1 << 5)
} TapeAction;
-static TapeAction tape_action(char **why_no_new_tape);
+static TapeAction tape_action(taper_t *taper, char **why_no_new_tape);
static const char *idle_strings[] = {
#define NOT_IDLE 0
char *newdir = NULL;
struct fs_usage fsusage;
holdingdisk_t *hdp;
+ identlist_t il;
unsigned long reserve = 100;
char *conf_diskfile;
- cmd_t cmd;
- int result_argc;
char **result_argv = NULL;
char *taper_program;
char *conf_tapetype;
char *line;
char hostname[1025];
intmax_t kb_avail;
- config_overwrites_t *cfg_ovr = NULL;
+ config_overrides_t *cfg_ovr = NULL;
char *cfg_opt = NULL;
holdalloc_t *ha, *ha_last;
+ find_result_t *holding_files;
+ disklist_t holding_disklist = { NULL, NULL };
+ int no_taper = FALSE;
/*
* Configure program for internationalization:
/* Don't die when child closes pipe */
signal(SIGPIPE, SIG_IGN);
- erroutput_type = (ERR_AMANDALOG|ERR_INTERACTIVE);
- set_logerror(logerror);
+ add_amanda_log_handler(amanda_log_stderr);
+ add_amanda_log_handler(amanda_log_trace_log);
startclock();
- cfg_ovr = extract_commandline_config_overwrites(&argc, &argv);
+ cfg_ovr = extract_commandline_config_overrides(&argc, &argv);
if (argc > 1)
cfg_opt = argv[1];
+ set_config_overrides(cfg_ovr);
config_init(CONFIG_INIT_EXPLICIT_NAME | CONFIG_INIT_USE_CWD, cfg_opt);
- apply_config_overwrites(cfg_ovr);
conf_diskfile = config_dir_relative(getconf_str(CNF_DISKFILE));
read_diskfile(conf_diskfile, &origq);
+ disable_skip_disk(&origq);
amfree(conf_diskfile);
if (config_errors(NULL) >= CFGERR_WARNINGS) {
log_add(L_INFO, "%s pid %ld", get_pname(), (long)getpid());
g_printf(_("%s: pid %ld executable %s version %s\n"),
- get_pname(), (long) getpid(), argv[0], version());
+ get_pname(), (long) getpid(), argv[0], VERSION);
if(argc > 2) {
- if(strncmp(argv[2], "nodump", 6) == 0) {
+ if(strcmp(argv[2], "nodump") == 0) {
nodump = 1;
+ argv++;
+ argc--;
}
}
+ if (argc > 2) {
+ if (strcmp(argv[2], "--no-taper") == 0) {
+ no_taper = TRUE;
+ argv++;
+ argc--;
+ }
+ }
+
safe_cd(); /* do this *after* config_init */
check_running_as(RUNNING_AS_DUMPUSER);
dbrename(get_config_name(), DBG_SUBDIR_SERVER);
+ /* load DLEs from the holding disk, in case there's anything to flush there */
+ search_holding_disk(&holding_files, &holding_disklist);
+ /* note that the dumps are added to the global disklist, so we need not consult
+ * holding_files or holding_disklist after this */
+
amfree(driver_timestamp);
/* read timestamp from stdin */
while ((line = agets(stdin)) != NULL) {
hd_driver_timestamp = stralloc(driver_timestamp);
}
- taper_program = vstralloc(amlibexecdir, "/", "taper", versionsuffix(), NULL);
- dumper_program = vstralloc(amlibexecdir, "/", "dumper", versionsuffix(),
- NULL);
- chunker_program = vstralloc(amlibexecdir, "/", "chunker", versionsuffix(),
- NULL);
+ taper_program = vstralloc(amlibexecdir, "/", "taper", NULL);
+ dumper_program = vstralloc(amlibexecdir, "/", "dumper", NULL);
+ chunker_program = vstralloc(amlibexecdir, "/", "chunker", NULL);
conf_taperalgo = getconf_taperalgo(CNF_TAPERALGO);
+ conf_taper_parallel_write = getconf_int(CNF_TAPER_PARALLEL_WRITE);
conf_tapetype = getconf_str(CNF_TAPETYPE);
conf_runtapes = getconf_int(CNF_RUNTAPES);
+ if (conf_taper_parallel_write > conf_runtapes) {
+ conf_taper_parallel_write = conf_runtapes;
+ }
tape = lookup_tapetype(conf_tapetype);
tape_length = tapetype_get_length(tape);
g_printf("driver: tape size %lld\n", (long long)tape_length);
flush_threshold_scheduled = (conf_flush_threshold_scheduled * tape_length) / 100;
taperflush = (conf_taperflush *tape_length) / 100;
- driver_debug(1, _("flush_threshold_dumped: %lld\n"), (long long)flush_threshold_dumped);
- driver_debug(1, _("flush_threshold_scheduled: %lld\n"), (long long)flush_threshold_scheduled);
+ driver_debug(1, _("flush-threshold-dumped: %lld\n"), (long long)flush_threshold_dumped);
+ driver_debug(1, _("flush-threshold-scheduled: %lld\n"), (long long)flush_threshold_scheduled);
driver_debug(1, _("taperflush: %lld\n"), (long long)taperflush);
/* set up any configuration-dependent variables */
total_disksize = (off_t)0;
ha_last = NULL;
num_holdalloc = 0;
- for(hdp = getconf_holdingdisks(), dsk = 0; hdp != NULL; hdp = holdingdisk_next(hdp), dsk++) {
+ for (il = getconf_identlist(CNF_HOLDINGDISK), dsk = 0;
+ il != NULL;
+ il = il->next, dsk++) {
+ hdp = lookup_holdingdisk(il->data);
ha = alloc(SIZEOF(holdalloc_t));
num_holdalloc++;
/* taper takes a while to get going, so start it up right away */
init_driverio();
- if(conf_runtapes > 0) {
- startup_tape_process(taper_program);
- taper_cmd(START_TAPER, driver_timestamp, NULL, 0, NULL);
- }
+ startup_tape_process(taper_program, conf_taper_parallel_write, no_taper);
/* fire up the dumpers now while we are waiting */
if(!nodump) startup_dump_processes(dumper_program, inparallel, driver_timestamp);
directq.head = NULL;
directq.tail = NULL;
waitq = origq;
- taper_state = TAPER_STATE_DEFAULT;
- tapeq = read_flush();
+ tapeq.head = NULL;
+ tapeq.tail = NULL;
+ roomq.head = NULL;
+ roomq.tail = NULL;
+ taper_nb_wait_reply = 0;
+
+ need_degraded = 0;
+ if (no_taper || conf_runtapes <= 0) {
+ taper_started = 1; /* we'll pretend the taper started and failed immediately */
+ need_degraded = 1;
+ } else {
+ tapetable[0].state = TAPER_STATE_INIT;
+ taper_nb_wait_reply++;
+ taper_nb_scan_volume++;
+ taper_ev_read = event_register(taper_fd, EV_READFD,
+ handle_taper_result, NULL);
+ taper_cmd(START_TAPER, NULL, tapetable[0].name, 0, driver_timestamp);
+ }
- roomq.head = roomq.tail = NULL;
+ flush_ev_read = event_register((event_id_t)0, EV_READFD, read_flush, NULL);
log_add(L_STATS, _("startup time %s"), walltime_str(curclock()));
getconf_str(CNF_DUMPORDER));
fflush(stdout);
- /* ok, planner is done, now lets see if the tape is ready */
-
- if (conf_runtapes > 0) {
- cmd = getresult(taper, 1, &result_argc, &result_argv);
- if (cmd != TAPER_OK) {
- /* no tape, go into degraded mode: dump to holding disk */
- need_degraded = 1;
- }
- } else {
- need_degraded = 1;
- }
-
- tape_left = tape_length;
- taper_busy = 0;
- amfree(taper_input_error);
- amfree(taper_tape_error);
- taper_disk = NULL;
- taper_ev_read = NULL;
-
schedule_done = nodump;
force_flush = 0;
- if(!need_degraded) startaflush();
-
- if(!nodump)
- schedule_ev_read = event_register((event_id_t)0, EV_READFD, read_schedule, NULL);
-
short_dump_state();
event_loop(0);
}
/* handle any remaining dumps by dumping directly to tape, if possible */
- while(!empty(directq) && taper > 0) {
+ while(!empty(directq) && taper_fd > 0) {
time_t sleep_time = 100000000;
disk_t *sleep_diskp = NULL;
time_t now = time(0);
amfree(qname);
}
else if (!degraded_mode) {
- taper_state |= TAPER_STATE_DUMP_TO_TAPE;
- dump_to_tape(diskp);
- event_loop(0);
- taper_state &= ~TAPER_STATE_DUMP_TO_TAPE;
+ char *qname = quote_string(diskp->name);
+ log_add(L_FAIL, "%s %s %s %d [%s]",
+ diskp->host->hostname, qname, sched(diskp)->datestamp,
+ sched(diskp)->level,
+ _("can't dump in degraded mode"));
+ amfree(qname);
}
else {
char *qname = quote_string(diskp->name);
}
}
- if(taper >= 0) {
+ if(taper_fd >= 0) {
taper_cmd(QUIT, NULL, NULL, 0, NULL);
}
}
}
- if(taper_pid > 1 && taper > 0) {
+ if(taper_pid > 1 && taper_fd > 0) {
taper_cmd(QUIT, NULL, NULL, 0, NULL);
}
}
+static void startaflush_tape(taper_t *taper);
+
static void
startaflush(void)
+{
+ taper_t *taper;
+
+ for(taper = tapetable; taper <= tapetable+conf_taper_parallel_write;
+ taper++) {
+ if (!(taper->state & TAPER_STATE_DONE) &&
+ taper->state & TAPER_STATE_WAIT_FOR_TAPE) {
+ startaflush_tape(taper);
+ }
+ }
+ for(taper = tapetable; taper <= tapetable+conf_taper_parallel_write;
+ taper++) {
+ if (!(taper->state & TAPER_STATE_DONE) &&
+ taper->state & TAPER_STATE_TAPE_REQUESTED) {
+ startaflush_tape(taper);
+ }
+ }
+ for(taper = tapetable; taper <= tapetable+conf_taper_parallel_write;
+ taper++) {
+ if (!(taper->state & TAPER_STATE_DONE) &&
+ taper->state & TAPER_STATE_INIT) {
+ startaflush_tape(taper);
+ }
+ }
+ for(taper = tapetable; taper <= tapetable+conf_taper_parallel_write;
+ taper++) {
+ if (!(taper->state & TAPER_STATE_DONE) &&
+ taper->state & TAPER_STATE_IDLE) {
+ startaflush_tape(taper);
+ }
+ }
+}
+
+static void
+startaflush_tape(
+ taper_t *taper)
{
disk_t *dp = NULL;
disk_t *fit = NULL;
char *datestamp;
- int extra_tapes = 0;
+ off_t extra_tapes_size = 0;
+ off_t taper_left;
char *qname;
TapeAction result_tape_action;
- char *why_no_new_tape;
-
- result_tape_action = tape_action(&why_no_new_tape);
-
- if (result_tape_action & TAPE_ACTION_NEW_TAPE) {
- taper_state &= ~TAPER_STATE_WAIT_FOR_TAPE;
- taper_cmd(NEW_TAPE, NULL, NULL, 0, NULL);
+ char *why_no_new_tape = NULL;
+ taper_t *taper1;
+
+ result_tape_action = tape_action(taper, &why_no_new_tape);
+
+ if (result_tape_action & TAPE_ACTION_SCAN) {
+ taper->state &= ~TAPER_STATE_TAPE_REQUESTED;
+ taper->state |= TAPER_STATE_WAIT_FOR_TAPE;
+ taper_nb_scan_volume++;
+ taper_cmd(START_SCAN, taper->disk, NULL, 0, NULL);
+ } else if (result_tape_action & TAPE_ACTION_NEW_TAPE) {
+ taper->state &= ~TAPER_STATE_WAIT_FOR_TAPE;
+ taper->state |= TAPER_STATE_WAIT_NEW_TAPE;
+ nb_sent_new_tape++;
+ taper_cmd(NEW_TAPE, taper->disk, NULL, 0, NULL);
} else if (result_tape_action & TAPE_ACTION_NO_NEW_TAPE) {
- taper_state &= ~TAPER_STATE_WAIT_FOR_TAPE;
- taper_cmd(NO_NEW_TAPE, why_no_new_tape, NULL, 0, NULL);
+ taper->state &= ~TAPER_STATE_WAIT_FOR_TAPE;
+ taper_cmd(NO_NEW_TAPE, taper->disk, why_no_new_tape, 0, NULL);
+ taper->state |= TAPER_STATE_DONE;
start_degraded_mode(&runq);
+ } else if (result_tape_action & TAPE_ACTION_MOVE) {
+ taper_t *taper1 = idle_taper();
+ if (taper1) {
+ taper->state &= ~TAPER_STATE_TAPE_REQUESTED;
+ taper->state &= ~TAPER_STATE_WAIT_FOR_TAPE;
+ taper_cmd(TAKE_SCRIBE_FROM, taper->disk, taper1->name, 0 , NULL);
+ taper1->state = TAPER_STATE_DEFAULT;
+ taper->state |= TAPER_STATE_TAPE_STARTED;
+ taper->left = taper1->left;
+ if (last_started_taper == taper1) {
+ last_started_taper = taper;
+ }
+ }
}
- if (!degraded_mode && !taper_busy && !empty(tapeq) &&
- (result_tape_action & TAPE_ACTION_START_A_FLUSH)) {
-
+ if (!degraded_mode &&
+ taper->state & TAPER_STATE_IDLE &&
+ !empty(tapeq) &&
+ (result_tape_action & TAPE_ACTION_START_A_FLUSH ||
+ result_tape_action & TAPE_ACTION_START_A_FLUSH_FIT)) {
+
+ int taperalgo = conf_taperalgo;
+ if (result_tape_action & TAPE_ACTION_START_A_FLUSH_FIT) {
+ if (taperalgo == ALGO_FIRST)
+ taperalgo = ALGO_FIRSTFIT;
+ else if (taperalgo == ALGO_LARGEST)
+ taperalgo = ALGO_LARGESTFIT;
+ else if (taperalgo == ALGO_SMALLEST)
+ taperalgo = ALGO_SMALLESTFIT;
+ else if (taperalgo == ALGO_LAST)
+ taperalgo = ALGO_LASTFIT;
+ }
+
+ extra_tapes_size = tape_length * (off_t)(conf_runtapes - current_tape);
+ for (taper1 = tapetable; taper1 < tapetable + conf_taper_parallel_write;
+ taper1++) {
+ if (taper1->state & TAPER_STATE_TAPE_STARTED) {
+ extra_tapes_size += taper1->left;
+ }
+ dp = taper1->disk;
+ if (dp) {
+ extra_tapes_size -= (sched(dp)->act_size - taper1->written);
+ }
+ }
+
+ if (taper->state & TAPER_STATE_TAPE_STARTED) {
+ taper_left = taper->left;
+ } else {
+ taper_left = tape_length;
+ }
+ dp = NULL;
datestamp = sched(tapeq.head)->datestamp;
- switch(conf_taperalgo) {
+ switch(taperalgo) {
case ALGO_FIRST:
dp = dequeue_disk(&tapeq);
break;
case ALGO_FIRSTFIT:
fit = tapeq.head;
while (fit != NULL) {
- extra_tapes = (fit->tape_splitsize > (off_t)0) ?
- conf_runtapes - current_tape : 0;
- if(sched(fit)->act_size <= (tape_left +
- tape_length * (off_t)extra_tapes) &&
+ if (sched(fit)->act_size <=
+ (fit->splitsize ? extra_tapes_size : taper_left) &&
strcmp(sched(fit)->datestamp, datestamp) <= 0) {
dp = fit;
fit = NULL;
case ALGO_LARGESTFIT:
fit = tapeq.head;
while (fit != NULL) {
- extra_tapes = (fit->tape_splitsize > (off_t)0) ?
- conf_runtapes - current_tape : 0;
if(sched(fit)->act_size <=
- (tape_left + tape_length * (off_t)extra_tapes) &&
+ (fit->splitsize ? extra_tapes_size : taper_left) &&
(!dp || sched(fit)->act_size > sched(dp)->act_size) &&
strcmp(sched(fit)->datestamp, datestamp) <= 0) {
dp = fit;
if(dp) remove_disk(&tapeq, dp);
break;
case ALGO_SMALLEST:
+ fit = dp = tapeq.head;
+ while (fit != NULL) {
+ if (sched(fit)->act_size < sched(dp)->act_size &&
+ strcmp(sched(fit)->datestamp, datestamp) <= 0) {
+ dp = fit;
+ }
+ fit = fit->next;
+ }
+ if(dp) remove_disk(&tapeq, dp);
+ break;
+ case ALGO_SMALLESTFIT:
+ fit = dp = tapeq.head;
+ while (fit != NULL) {
+ if (sched(fit)->act_size <=
+ (fit->splitsize ? extra_tapes_size : taper_left) &&
+ (!dp || sched(fit)->act_size < sched(dp)->act_size) &&
+ strcmp(sched(fit)->datestamp, datestamp) <= 0) {
+ dp = fit;
+ }
+ fit = fit->next;
+ }
+ if(dp) remove_disk(&tapeq, dp);
break;
case ALGO_LAST:
dp = tapeq.tail;
remove_disk(&tapeq, dp);
break;
- }
- if(!dp) { /* ALGO_SMALLEST, or default if nothing fit. */
- if(conf_taperalgo != ALGO_SMALLEST) {
- g_fprintf(stderr,
- _("driver: startaflush: Using SMALLEST because nothing fit\n"));
- }
- fit = dp = tapeq.head;
- while (fit != NULL) {
- if(sched(fit)->act_size < sched(dp)->act_size &&
- strcmp(sched(fit)->datestamp, datestamp) <= 0) {
- dp = fit;
+ case ALGO_LASTFIT:
+ fit = tapeq.tail;
+ while (fit != NULL) {
+ if (sched(fit)->act_size <=
+ (fit->splitsize ? extra_tapes_size : taper_left) &&
+ (!dp || sched(fit)->act_size < sched(dp)->act_size) &&
+ strcmp(sched(fit)->datestamp, datestamp) <= 0) {
+ dp = fit;
+ }
+ fit = fit->prev;
}
- fit = fit->next;
- }
- if(dp) remove_disk(&tapeq, dp);
- }
- if(taper_ev_read == NULL) {
- taper_ev_read = event_register((event_id_t)taper, EV_READFD,
- handle_taper_result, NULL);
+ if(dp) remove_disk(&tapeq, dp);
+ break;
}
if (dp) {
- taper_disk = dp;
- taper_busy = 1;
- amfree(taper_input_error);
- amfree(taper_tape_error);
- taper_result = LAST_TOK;
- taper_sendresult = 0;
- taper_first_label = NULL;
- taper_written = 0;
- taper_state &= ~TAPER_STATE_DUMP_TO_TAPE;
- taper_dumper = NULL;
+ taper->disk = dp;
+ taper->dumper = NULL;
+ amfree(taper->input_error);
+ amfree(taper->tape_error);
+ taper->result = LAST_TOK;
+ taper->sendresult = 0;
+ amfree(taper->first_label);
+ taper->written = 0;
+ taper->state &= ~TAPER_STATE_IDLE;
+ taper->state |= TAPER_STATE_FILE_TO_TAPE;
+ taper->dumper = NULL;
qname = quote_string(dp->name);
+ if (taper_nb_wait_reply == 0) {
+ taper_ev_read = event_register(taper_fd, EV_READFD,
+ handle_taper_result, NULL);
+ }
+ taper_nb_wait_reply++;
+ sched(dp)->taper = taper;
taper_cmd(FILE_WRITE, dp, sched(dp)->destname, sched(dp)->level,
sched(dp)->datestamp);
g_fprintf(stderr,_("driver: startaflush: %s %s %s %lld %lld\n"),
- taperalgo2str(conf_taperalgo), dp->host->hostname, qname,
- (long long)sched(taper_disk)->act_size,
- (long long)tape_left);
- if(sched(dp)->act_size <= tape_left)
- tape_left -= sched(dp)->act_size;
- else
- tape_left = (off_t)0;
+ taperalgo2str(taperalgo), dp->host->hostname, qname,
+ (long long)sched(taper->disk)->act_size,
+ (long long)taper->left);
amfree(qname);
- } else {
- error(_("FATAL: Taper marked busy and no work found."));
- /*NOTREACHED*/
}
short_dump_state();
- } else if(!taper_busy && taper_ev_read != NULL) {
- event_release(taper_ev_read);
- taper_ev_read = NULL;
}
}
return 0;
}
+static void
+allow_dump_dle(
+ disk_t *diskp,
+ taper_t *taper,
+ char dumptype,
+ disklist_t *rq,
+ const time_t now,
+ int dumper_to_holding,
+ int *cur_idle,
+ disk_t **delayed_diskp,
+ disk_t **diskp_accept,
+ assignedhd_t ***holdp_accept,
+ off_t extra_tapes_size)
+{
+ assignedhd_t **holdp=NULL;
+
+ if (diskp->host->start_t > now) {
+ *cur_idle = max(*cur_idle, IDLE_START_WAIT);
+ if (*delayed_diskp == NULL || sleep_time > diskp->host->start_t) {
+ *delayed_diskp = diskp;
+ sleep_time = diskp->host->start_t;
+ }
+ } else if(diskp->start_t > now) {
+ *cur_idle = max(*cur_idle, IDLE_START_WAIT);
+ if (*delayed_diskp == NULL || sleep_time > diskp->start_t) {
+ *delayed_diskp = diskp;
+ sleep_time = diskp->start_t;
+ }
+ } else if (diskp->host->netif->curusage > 0 &&
+ sched(diskp)->est_kps > free_kps(diskp->host->netif)) {
+ *cur_idle = max(*cur_idle, IDLE_NO_BANDWIDTH);
+ } else if (!taper && sched(diskp)->no_space) {
+ *cur_idle = max(*cur_idle, IDLE_NO_DISKSPACE);
+ } else if (!taper && diskp->to_holdingdisk == HOLD_NEVER) {
+ *cur_idle = max(*cur_idle, IDLE_NO_HOLD);
+ } else if (extra_tapes_size && sched(diskp)->est_size > extra_tapes_size) {
+ *cur_idle = max(*cur_idle, IDLE_NO_DISKSPACE);
+ /* no tape space */
+ } else if (!taper && (holdp =
+ find_diskspace(sched(diskp)->est_size, cur_idle, NULL)) == NULL) {
+ *cur_idle = max(*cur_idle, IDLE_NO_DISKSPACE);
+ if (empty(tapeq) && dumper_to_holding == 0 && rq != &directq) {
+ remove_disk(rq, diskp);
+ if (diskp->to_holdingdisk != HOLD_REQUIRED) {
+ enqueue_disk(&directq, diskp);
+ diskp->to_holdingdisk = HOLD_NEVER;
+ }
+ }
+ } else if (client_constrained(diskp)) {
+ free_assignedhd(holdp);
+ *cur_idle = max(*cur_idle, IDLE_CLIENT_CONSTRAINED);
+ } else {
+
+ /* disk fits, dump it */
+ int accept = !*diskp_accept;
+ if(!accept) {
+ switch(dumptype) {
+ case 's': accept = (sched(diskp)->est_size < sched(*diskp_accept)->est_size);
+ break;
+ case 'S': accept = (sched(diskp)->est_size > sched(*diskp_accept)->est_size);
+ break;
+ case 't': accept = (sched(diskp)->est_time < sched(*diskp_accept)->est_time);
+ break;
+ case 'T': accept = (sched(diskp)->est_time > sched(*diskp_accept)->est_time);
+ break;
+ case 'b': accept = (sched(diskp)->est_kps < sched(*diskp_accept)->est_kps);
+ break;
+ case 'B': accept = (sched(diskp)->est_kps > sched(*diskp_accept)->est_kps);
+ break;
+ default: log_add(L_WARNING, _("Unknown dumporder character \'%c\', using 's'.\n"),
+ dumptype);
+ accept = (sched(diskp)->est_size < sched(*diskp_accept)->est_size);
+ break;
+ }
+ }
+ if(accept) {
+ if( !*diskp_accept || !degraded_mode || diskp->priority >= (*diskp_accept)->priority) {
+ if(*holdp_accept) free_assignedhd(*holdp_accept);
+ *diskp_accept = diskp;
+ *holdp_accept = holdp;
+ }
+ else {
+ free_assignedhd(holdp);
+ }
+ }
+ else {
+ free_assignedhd(holdp);
+ }
+ }
+}
+
static void
start_some_dumps(
- disklist_t * rq)
+ disklist_t *rq)
{
+ const time_t now = time(NULL);
int cur_idle;
disk_t *diskp, *delayed_diskp, *diskp_accept;
disk_t *dp;
assignedhd_t **holdp=NULL, **holdp_accept;
- const time_t now = time(NULL);
cmd_t cmd;
int result_argc;
char **result_argv;
chunker_t *chunker;
dumper_t *dumper;
+ taper_t *taper;
char dumptype;
char *dumporder;
- int busy_dumpers = 0;
+ int dumper_to_holding = 0;
+
+ /* don't start any actual dumps until the taper is started */
+ if (!taper_started) return;
idle_reason = IDLE_NO_DUMPERS;
sleep_time = 0;
}
for(dumper = dmptable; dumper < (dmptable+inparallel); dumper++) {
- if( dumper->busy ) {
- busy_dumpers++;
+ if (dumper->busy && dumper->dp->to_holdingdisk != HOLD_NEVER) {
+ dumper_to_holding++;
}
}
-
for (dumper = dmptable; dumper < dmptable+inparallel; dumper++) {
if( dumper->busy || dumper->down) {
dumptype = 'T';
}
- for(diskp = rq->head; diskp != NULL; diskp = diskp->next) {
- assert(diskp->host != NULL && sched(diskp) != NULL);
-
- if (diskp->host->start_t > now) {
- cur_idle = max(cur_idle, IDLE_START_WAIT);
- if (delayed_diskp == NULL || sleep_time > diskp->host->start_t) {
- delayed_diskp = diskp;
- sleep_time = diskp->host->start_t;
- }
- } else if(diskp->start_t > now) {
- cur_idle = max(cur_idle, IDLE_START_WAIT);
- if (delayed_diskp == NULL || sleep_time > diskp->start_t) {
- delayed_diskp = diskp;
- sleep_time = diskp->start_t;
- }
- } else if (diskp->host->netif->curusage > 0 &&
- sched(diskp)->est_kps > free_kps(diskp->host->netif)) {
- cur_idle = max(cur_idle, IDLE_NO_BANDWIDTH);
- } else if(sched(diskp)->no_space) {
- cur_idle = max(cur_idle, IDLE_NO_DISKSPACE);
- } else if (diskp->to_holdingdisk == HOLD_NEVER) {
- cur_idle = max(cur_idle, IDLE_NO_HOLD);
- } else if ((holdp =
- find_diskspace(sched(diskp)->est_size, &cur_idle, NULL)) == NULL) {
- cur_idle = max(cur_idle, IDLE_NO_DISKSPACE);
- if (empty(tapeq) && busy_dumpers == 0) {
- remove_disk(rq, diskp);
- enqueue_disk(&directq, diskp);
- }
- } else if (client_constrained(diskp)) {
- free_assignedhd(holdp);
- cur_idle = max(cur_idle, IDLE_CLIENT_CONSTRAINED);
- } else {
-
- /* disk fits, dump it */
- int accept = !diskp_accept;
- if(!accept) {
- switch(dumptype) {
- case 's': accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size);
- break;
- case 'S': accept = (sched(diskp)->est_size > sched(diskp_accept)->est_size);
- break;
- case 't': accept = (sched(diskp)->est_time < sched(diskp_accept)->est_time);
- break;
- case 'T': accept = (sched(diskp)->est_time > sched(diskp_accept)->est_time);
- break;
- case 'b': accept = (sched(diskp)->est_kps < sched(diskp_accept)->est_kps);
- break;
- case 'B': accept = (sched(diskp)->est_kps > sched(diskp_accept)->est_kps);
- break;
- default: log_add(L_WARNING, _("Unknown dumporder character \'%c\', using 's'.\n"),
- dumptype);
- accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size);
- break;
+ diskp = NULL;
+ taper = NULL;
+ if (!empty(directq)) {
+ taper = idle_taper();
+ if (taper) {
+ TapeAction result_tape_action;
+ char *why_no_new_tape = NULL;
+ result_tape_action = tape_action(taper, &why_no_new_tape);
+ if (result_tape_action & TAPE_ACTION_START_A_FLUSH ||
+ result_tape_action & TAPE_ACTION_START_A_FLUSH_FIT) {
+ off_t extra_tapes_size = 0;
+ taper_t *taper1;
+
+ if (result_tape_action & TAPE_ACTION_START_A_FLUSH_FIT) {
+ extra_tapes_size = tape_length *
+ (off_t)(conf_runtapes - current_tape);
+ for (taper1 = tapetable;
+ taper1 < tapetable + conf_taper_parallel_write;
+ taper1++) {
+ if (taper1->state & TAPER_STATE_TAPE_STARTED) {
+ extra_tapes_size += taper1->left;
+ }
+ dp = taper1->disk;
+ if (dp) {
+ extra_tapes_size -= (sched(dp)->est_size -
+ taper1->written);
+ }
+ }
}
- }
- if(accept) {
- if( !diskp_accept || !degraded_mode || diskp->priority >= diskp_accept->priority) {
- if(holdp_accept) free_assignedhd(holdp_accept);
- diskp_accept = diskp;
- holdp_accept = holdp;
+
+ for (diskp = directq.head; diskp != NULL;
+ diskp = diskp->next) {
+ allow_dump_dle(diskp, taper, dumptype, &directq, now,
+ dumper_to_holding, &cur_idle,
+ &delayed_diskp, &diskp_accept,
+ &holdp_accept, extra_tapes_size);
}
- else {
- free_assignedhd(holdp);
+ if (diskp_accept) {
+ diskp = diskp_accept;
+ holdp = holdp_accept;
+ } else {
+ taper = NULL;
}
- }
- else {
- free_assignedhd(holdp);
+ } else {
+ taper = NULL;
}
}
}
- diskp = diskp_accept;
- holdp = holdp_accept;
+ if (diskp == NULL) {
+ for(diskp = rq->head; diskp != NULL; diskp = diskp->next) {
+ assert(diskp->host != NULL && sched(diskp) != NULL);
+
+ allow_dump_dle(diskp, NULL, dumptype, rq, now,
+ dumper_to_holding, &cur_idle, &delayed_diskp,
+ &diskp_accept, &holdp_accept, 0);
+ }
+ diskp = diskp_accept;
+ holdp = holdp_accept;
+ }
idle_reason = max(idle_reason, cur_idle);
+ if (diskp == NULL && idle_reason == IDLE_NO_DISKSPACE) {
+ /* continue flush waiting for new tape */
+ startaflush();
+ }
/*
* If we have no disk at this point, and there are disks that
dumpers_ev_time = event_register((event_id_t)sleep_time, EV_TIME,
handle_dumpers_time, &runq);
return;
- } else if (diskp != NULL) {
+ } else if (diskp != NULL && taper == NULL) {
sched(diskp)->act_size = (off_t)0;
allocate_bandwidth(diskp->host->netif, sched(diskp)->est_kps);
sched(diskp)->activehd = assign_holdingdisk(holdp, diskp);
diskp->inprogress = 1;
sched(diskp)->dumper = dumper;
sched(diskp)->timestamp = now;
+ amfree(diskp->dataport_list);
dumper->busy = 1; /* dumper is now busy */
dumper->dp = diskp; /* link disk to dumper */
sched(diskp)->dumpsize = (off_t)-1;
sched(diskp)->dumptime = (time_t)0;
sched(diskp)->tapetime = (time_t)0;
- chunker = dumper->chunker;
+ chunker = dumper->chunker = &chktable[dumper - dmptable];
chunker->result = LAST_TOK;
dumper->result = LAST_TOK;
startup_chunk_process(chunker,chunker_program);
chunker->ev_read = event_register((event_id_t)chunker->fd, EV_READFD,
handle_chunker_result, chunker);
dumper->output_port = atoi(result_argv[1]);
+ amfree(diskp->dataport_list);
+ diskp->dataport_list = stralloc(result_argv[2]);
if (diskp->host->pre_script == 0) {
for (dp=diskp->host->disks; dp != NULL; dp = dp->hostnext) {
if (result_argv)
g_strfreev(result_argv);
short_dump_state();
+ } else if (diskp != NULL && taper != NULL) { /* dump to tape */
+ sched(diskp)->act_size = (off_t)0;
+ allocate_bandwidth(diskp->host->netif, sched(diskp)->est_kps);
+ diskp->host->inprogress++; /* host is now busy */
+ diskp->inprogress = 1;
+ sched(diskp)->dumper = dumper;
+ sched(diskp)->taper = taper;
+ sched(diskp)->timestamp = now;
+ dumper->chunker = NULL;
+ amfree(diskp->dataport_list);
+
+ dumper->busy = 1; /* dumper is now busy */
+ dumper->dp = diskp; /* link disk to dumper */
+ remove_disk(&directq, diskp); /* take it off the direct queue */
+
+ sched(diskp)->origsize = (off_t)-1;
+ sched(diskp)->dumpsize = (off_t)-1;
+ sched(diskp)->dumptime = (time_t)0;
+ sched(diskp)->tapetime = (time_t)0;
+ dumper->result = LAST_TOK;
+ taper->result = LAST_TOK;
+ taper->input_error = NULL;
+ taper->tape_error = NULL;
+ taper->disk = diskp;
+ taper->first_label = NULL;
+ taper->written = 0;
+ taper->dumper = dumper;
+ taper->state |= TAPER_STATE_DUMP_TO_TAPE;
+ taper->state &= ~TAPER_STATE_IDLE;
+ if (taper_nb_wait_reply == 0) {
+ taper_ev_read = event_register(taper_fd, EV_READFD,
+ handle_taper_result, NULL);
+ }
+
+ taper_nb_wait_reply++;
+ taper_cmd(PORT_WRITE, diskp, NULL, sched(diskp)->level,
+ sched(diskp)->datestamp);
+ diskp->host->start_t = now + 15;
+
+ short_dump_state();
}
}
}
* a) diskspace has been allocated for other dumps which are
* still running or already being written to tape
* b) all other dumps have been suspended due to lack of diskspace
- * c) this dump doesn't fit on all the holding disks
* Case a) is not a problem. We just wait for the diskspace to
* be freed by moving the current disk to a queue.
* If case b) occurs, we have a deadlock situation. We select
* a dump from the queue to be aborted and abort it. It will
- * be retried later dumping to disk.
- * If case c) is detected, the dump is aborted. Next time
- * it will be dumped directly to tape. Actually, case c is a special
- * manifestation of case b) where only one dumper is busy.
+ * be retried directly to tape.
*/
for(dp=NULL, dumper = dmptable; dumper < (dmptable+inparallel); dumper++) {
if( dumper->busy ) {
}
}
if((dp != NULL) && (active_dumpers == 0) && (busy_dumpers > 0) &&
- ((!taper_busy && empty(tapeq)) || degraded_mode) &&
- pending_aborts == 0 ) { /* not case a */
- if( busy_dumpers == 1 ) { /* case c */
- sched(dp)->no_space = 1;
- }
- /* case b */
+ ((all_taper_idle() && empty(tapeq)) || degraded_mode) &&
+ pending_aborts == 0 ) { /* case b */
+ sched(dp)->no_space = 1;
/* At this time, dp points to the dump with the smallest est_size.
* We abort that dump, hopefully not wasting too much time retrying it.
*/
handle_taper_result(
void *cookie G_GNUC_UNUSED)
{
- disk_t *dp;
+ disk_t *dp = NULL, *dp1;
+ dumper_t *dumper;
cmd_t cmd;
int result_argc;
char **result_argv;
char *qname, *q;
char *s;
+ taper_t *taper = NULL;
+ taper_t *taper1;
+ int i;
+ off_t partsize;
assert(cookie == NULL);
- amfree(taper_input_error);
- amfree(taper_tape_error);
-
+
do {
-
+
short_dump_state();
-
- cmd = getresult(taper, 1, &result_argc, &result_argv);
-
+ taper = NULL;
+
+ cmd = getresult(taper_fd, 1, &result_argc, &result_argv);
+
switch(cmd) {
-
+
+ case TAPER_OK:
+ if(result_argc != 2) {
+ error(_("error: [taper FAILED result_argc != 2: %d"), result_argc);
+ /*NOTREACHED*/
+ }
+
+ taper = NULL;
+ taper_started = 1;
+ for (i=0; i < conf_taper_parallel_write; i++) {
+ if (strcmp(tapetable[i].name, result_argv[1]) == 0) {
+ taper= &tapetable[i];
+ }
+ }
+ assert(taper != NULL);
+ taper->left = 0;
+ taper->state &= ~TAPER_STATE_INIT;
+ taper->state |= TAPER_STATE_RESERVATION;
+ taper->state |= TAPER_STATE_IDLE;
+ amfree(taper->first_label);
+ taper_nb_wait_reply--;
+ taper_nb_scan_volume--;
+ last_started_taper = taper;
+ if (taper_nb_wait_reply == 0) {
+ event_release(taper_ev_read);
+ taper_ev_read = NULL;
+ }
+ start_some_dumps(&runq);
+ startaflush();
+ break;
+
case FAILED: /* FAILED <handle> INPUT-* TAPE-* <input err mesg> <tape err mesg> */
if(result_argc != 6) {
error(_("error: [taper FAILED result_argc != 6: %d"), result_argc);
/*NOTREACHED*/
}
-
+
dp = serial2disk(result_argv[1]);
- assert(dp == taper_disk);
- if (!taper_dumper)
+ taper = sched(dp)->taper;
+ assert(dp == taper->disk);
+ if (!taper->dumper)
free_serial(result_argv[1]);
-
+
qname = quote_string(dp->name);
g_printf(_("driver: finished-cmd time %s taper wrote %s:%s\n"),
walltime_str(curclock()), dp->host->hostname, qname);
fflush(stdout);
if (strcmp(result_argv[2], "INPUT-ERROR") == 0) {
- taper_input_error = newstralloc(taper_input_error, result_argv[4]);
+ taper->input_error = newstralloc(taper->input_error, result_argv[4]);
} else if (strcmp(result_argv[2], "INPUT-GOOD") != 0) {
- taper_tape_error = newstralloc(taper_tape_error,
+ taper->tape_error = newstralloc(taper->tape_error,
_("Taper protocol error"));
- taper_result = FAILED;
+ taper->result = FAILED;
log_add(L_FAIL, _("%s %s %s %d [%s]"),
dp->host->hostname, qname, sched(dp)->datestamp,
- sched(dp)->level, taper_tape_error);
+ sched(dp)->level, taper->tape_error);
amfree(qname);
break;
}
if (strcmp(result_argv[3], "TAPE-ERROR") == 0) {
- taper_tape_error = newstralloc(taper_tape_error, result_argv[5]);
+ taper->state &= ~TAPER_STATE_TAPE_STARTED;
+ taper->tape_error = newstralloc(taper->tape_error, result_argv[5]);
} else if (strcmp(result_argv[3], "TAPE-GOOD") != 0) {
- taper_tape_error = newstralloc(taper_tape_error,
+ taper->state &= ~TAPER_STATE_TAPE_STARTED;
+ taper->tape_error = newstralloc(taper->tape_error,
_("Taper protocol error"));
- taper_result = FAILED;
+ taper->result = FAILED;
log_add(L_FAIL, _("%s %s %s %d [%s]"),
dp->host->hostname, qname, sched(dp)->datestamp,
- sched(dp)->level, taper_tape_error);
+ sched(dp)->level, taper->tape_error);
amfree(qname);
break;
}
amfree(qname);
- taper_result = cmd;
+ taper->result = cmd;
break;
-
+
case PARTIAL: /* PARTIAL <handle> INPUT-* TAPE-* <stat mess> <input err mesg> <tape err mesg>*/
case DONE: /* DONE <handle> INPUT-GOOD TAPE-GOOD <stat mess> <input err mesg> <tape err mesg> */
if(result_argc != 7) {
error(_("error: [taper PARTIAL result_argc != 7: %d"), result_argc);
/*NOTREACHED*/
}
-
+
dp = serial2disk(result_argv[1]);
- assert(dp == taper_disk);
- if (!taper_dumper)
+ taper = sched(dp)->taper;
+ assert(dp == taper->disk);
+ if (!taper->dumper)
free_serial(result_argv[1]);
qname = quote_string(dp->name);
fflush(stdout);
if (strcmp(result_argv[2], "INPUT-ERROR") == 0) {
- taper_input_error = newstralloc(taper_input_error, result_argv[5]);
+ taper->input_error = newstralloc(taper->input_error, result_argv[5]);
} else if (strcmp(result_argv[2], "INPUT-GOOD") != 0) {
- taper_tape_error = newstralloc(taper_tape_error,
+ taper->tape_error = newstralloc(taper->tape_error,
_("Taper protocol error"));
- taper_result = FAILED;
+ taper->result = FAILED;
log_add(L_FAIL, _("%s %s %s %d [%s]"),
dp->host->hostname, qname, sched(dp)->datestamp,
- sched(dp)->level, taper_tape_error);
+ sched(dp)->level, taper->tape_error);
amfree(qname);
break;
}
if (strcmp(result_argv[3], "TAPE-ERROR") == 0) {
- taper_tape_error = newstralloc(taper_tape_error, result_argv[6]);
+ taper->state &= ~TAPER_STATE_TAPE_STARTED;
+ taper->tape_error = newstralloc(taper->tape_error, result_argv[6]);
} else if (strcmp(result_argv[3], "TAPE-GOOD") != 0) {
- taper_tape_error = newstralloc(taper_tape_error,
+ taper->state &= ~TAPER_STATE_TAPE_STARTED;
+ taper->tape_error = newstralloc(taper->tape_error,
_("Taper protocol error"));
- taper_result = FAILED;
+ taper->result = FAILED;
log_add(L_FAIL, _("%s %s %s %d [%s]"),
dp->host->hostname, qname, sched(dp)->datestamp,
- sched(dp)->level, taper_tape_error);
+ sched(dp)->level, taper->tape_error);
amfree(qname);
break;
}
sched(dp)->dumpsize = atol(s);
}
- taper_result = cmd;
+ taper->result = cmd;
amfree(qname);
break;
-
+
case PARTDONE: /* PARTDONE <handle> <label> <fileno> <kbytes> <stat> */
dp = serial2disk(result_argv[1]);
- assert(dp == taper_disk);
+ taper = sched(dp)->taper;
+ assert(dp == taper->disk);
if (result_argc != 6) {
error(_("error [taper PARTDONE result_argc != 6: %d]"),
result_argc);
/*NOTREACHED*/
}
- if (!taper_first_label) {
- taper_first_label = stralloc(result_argv[2]);
- taper_first_fileno = OFF_T_ATOI(result_argv[3]);
- }
- taper_written = OFF_T_ATOI(result_argv[4]);
- if (taper_written > sched(taper_disk)->act_size)
- sched(taper_disk)->act_size = taper_written;
-
+ if (!taper->first_label) {
+ amfree(taper->first_label);
+ taper->first_label = stralloc(result_argv[2]);
+ taper->first_fileno = OFF_T_ATOI(result_argv[3]);
+ }
+ taper->written += OFF_T_ATOI(result_argv[4]);
+ if (taper->written > sched(taper->disk)->act_size)
+ sched(taper->disk)->act_size = taper->written;
+
+ partsize = 0;
+ s = strstr(result_argv[5], " kb ");
+ if (s) {
+ s += 4;
+ partsize = atol(s);
+ }
+ taper->left -= partsize;
+
break;
case REQUEST_NEW_TAPE: /* REQUEST-NEW-TAPE <handle> */
result_argc);
/*NOTREACHED*/
}
- taper_state &= ~TAPER_STATE_TAPE_STARTED;
- if (current_tape >= conf_runtapes) {
- taper_cmd(NO_NEW_TAPE, "runtapes volumes already written", NULL, 0, NULL);
- log_add(L_WARNING,
- _("Out of tapes; going into degraded mode."));
- start_degraded_mode(&runq);
- } else {
- TapeAction result_tape_action;
- char *why_no_new_tape;
-
- taper_state |= TAPER_STATE_WAIT_FOR_TAPE;
- result_tape_action = tape_action(&why_no_new_tape);
- if (result_tape_action & TAPE_ACTION_NEW_TAPE) {
- taper_cmd(NEW_TAPE, NULL, NULL, 0, NULL);
- taper_state &= ~TAPER_STATE_WAIT_FOR_TAPE;
- } else if (result_tape_action & TAPE_ACTION_NO_NEW_TAPE) {
- taper_cmd(NO_NEW_TAPE, why_no_new_tape, NULL, 0, NULL);
- taper_state &= ~TAPER_STATE_WAIT_FOR_TAPE;
- start_degraded_mode(&runq);
- }
- }
+ dp = serial2disk(result_argv[1]);
+ taper = sched(dp)->taper;
+ taper->state &= ~TAPER_STATE_TAPE_STARTED;
+ taper->state |= TAPER_STATE_TAPE_REQUESTED;
+
+ start_some_dumps(&runq);
+ startaflush();
break;
case NEW_TAPE: /* NEW-TAPE <handle> <label> */
/*NOTREACHED*/
}
- /* Update our tape counter and reset tape_left */
+ nb_sent_new_tape--;
+ taper_nb_scan_volume--;
+ dp = serial2disk(result_argv[1]);
+ taper = sched(dp)->taper;
+ /* Update our tape counter and reset taper->left */
current_tape++;
- tape_left = tape_length;
- taper_state |= TAPER_STATE_TAPE_STARTED;
+ taper->left = tape_length;
+ taper->state &= ~TAPER_STATE_WAIT_NEW_TAPE;
+ taper->state |= TAPER_STATE_TAPE_STARTED;
+ last_started_taper = NULL;
+
+ /* start a new worker */
+ for (i = 0; i < conf_taper_parallel_write ; i++) {
+ taper1 = &tapetable[i];
+ if (need_degraded == 0 &&
+ taper1->state == TAPER_STATE_DEFAULT) {
+ taper1->state = TAPER_STATE_INIT;
+ if (taper_nb_wait_reply == 0) {
+ taper_ev_read = event_register(taper_fd, EV_READFD,
+ handle_taper_result, NULL);
+ }
+ taper_nb_wait_reply++;
+ taper_nb_scan_volume++;
+ taper_cmd(START_TAPER, NULL, taper1->name, 0,
+ driver_timestamp);
+ break;
+ }
+ }
break;
case NO_NEW_TAPE: /* NO-NEW-TAPE <handle> */
result_argc);
/*NOTREACHED*/
}
+ nb_sent_new_tape--;
+ taper_nb_scan_volume--;
+ dp = serial2disk(result_argv[1]);
+ taper = sched(dp)->taper;
+ taper->state |= TAPER_STATE_DONE;
+ last_started_taper = NULL;
start_degraded_mode(&runq);
break;
case DUMPER_STATUS: /* DUMPER-STATUS <handle> */
if (result_argc != 2) {
- error(_("error [taper NO_NEW_TAPE result_argc != 2: %d]"),
+ error(_("error [taper DUMPER_STATUS result_argc != 2: %d]"),
result_argc);
/*NOTREACHED*/
}
- if (taper_dumper->result == LAST_TOK) {
- taper_sendresult = 1;
+ dp = serial2disk(result_argv[1]);
+ taper = sched(dp)->taper;
+ if (taper->dumper->result == LAST_TOK) {
+ taper->sendresult = 1;
} else {
- if( taper_dumper->result == DONE) {
- taper_cmd(DONE, NULL, NULL, 0, NULL);
+ if( taper->dumper->result == DONE) {
+ taper_cmd(DONE, dp, NULL, 0, NULL);
} else {
- taper_cmd(FAILED, NULL, NULL, 0, NULL);
+ taper_cmd(FAILED, dp, NULL, 0, NULL);
}
}
break;
- case TAPE_ERROR: /* TAPE-ERROR <handle> <err mess> */
- dp = serial2disk(result_argv[1]);
- if (!taper_dumper)
- free_serial(result_argv[1]);
- qname = quote_string(dp->name);
- g_printf(_("driver: finished-cmd time %s taper wrote %s:%s\n"),
- walltime_str(curclock()), dp->host->hostname, qname);
- amfree(qname);
- fflush(stdout);
- q = quote_string(result_argv[2]);
- log_add(L_WARNING, _("Taper error: %s"), q);
- amfree(q);
- taper_tape_error = newstralloc(taper_tape_error, result_argv[2]);
- /*FALLTHROUGH*/
+ case TAPE_ERROR: /* TAPE-ERROR <name> <err mess> */
+ taper_started = 1;
+ if (strcmp(result_argv[1], "SETUP") == 0) {
+ taper_nb_wait_reply = 0;
+ taper_nb_scan_volume = 0;
+ } else {
+ taper = taper_from_name(result_argv[1]);
+ taper->state = TAPER_STATE_DONE;
+ fflush(stdout);
+ q = quote_string(result_argv[2]);
+ log_add(L_WARNING, _("Taper error: %s"), q);
+ amfree(q);
+ if (taper) {
+ taper->tape_error = newstralloc(taper->tape_error,
+ result_argv[2]);
+ }
+
+ taper_nb_wait_reply--;
+ taper_nb_scan_volume--;
+ }
+ if (taper_nb_wait_reply == 0) {
+ event_release(taper_ev_read);
+ taper_ev_read = NULL;
+ }
+ need_degraded = 1;
+ if (schedule_done && !degraded_mode) {
+ start_degraded_mode(&runq);
+ }
+ start_some_dumps(&runq);
+ break;
+
+ case PORT: /* PORT <name> <handle> <port> <dataport_list> */
+ dp = serial2disk(result_argv[2]);
+ taper = sched(dp)->taper;
+ dumper = sched(dp)->dumper;
+ dumper->output_port = atoi(result_argv[3]);
+ amfree(dp->dataport_list);
+ dp->dataport_list = stralloc(result_argv[4]);
+
+ amfree(taper->input_error);
+ amfree(taper->tape_error);
+ amfree(taper->first_label);
+ taper->written = 0;
+ taper->state |= TAPER_STATE_DUMP_TO_TAPE;
+
+ if (dp->host->pre_script == 0) {
+ for (dp1=dp->host->disks; dp1 != NULL; dp1 = dp1->hostnext) {
+ run_server_scripts(EXECUTE_ON_PRE_HOST_BACKUP,
+ get_config_name(), dp1, -1);
+ }
+ dp->host->pre_script = 1;
+ }
+ run_server_scripts(EXECUTE_ON_PRE_DLE_BACKUP,
+ get_config_name(), dp,
+ sched(dp)->level);
+ /* tell the dumper to dump to a port */
+ dumper_cmd(dumper, PORT_DUMP, dp, NULL);
+ dp->host->start_t = time(NULL) + 15;
+ amfree(dp->dataport_list);
+
+ taper->state |= TAPER_STATE_DUMP_TO_TAPE;
+
+ dumper->ev_read = event_register(dumper->fd, EV_READFD,
+ handle_dumper_result, dumper);
+ break;
case BOGUS:
- if (cmd == BOGUS) {
- log_add(L_WARNING, _("Taper protocol error"));
- taper_tape_error = newstralloc(taper_tape_error, "BOGUS");
- }
+ log_add(L_WARNING, _("Taper protocol error"));
/*
* Since we received a taper error, we can't send anything more
* to the taper. Go into degraded mode to try to get everthing
if(!nodump) {
log_add(L_WARNING,
_("going into degraded mode because of taper component error."));
- start_degraded_mode(&runq);
- }
- tapeq.head = tapeq.tail = NULL;
- taper_busy = 0;
+ }
+
+ for (taper = tapetable;
+ taper < tapetable + conf_taper_parallel_write;
+ taper++) {
+ if (taper && taper->disk && taper->result != LAST_TOK) {
+ taper->tape_error = newstralloc(taper->tape_error,"BOGUS");
+ taper->result = cmd;
+ if (taper->dumper) {
+ if (taper->dumper->result != LAST_TOK) {
+ // Dumper already returned it's result
+ dumper_taper_result(taper->disk);
+ }
+ } else {
+ file_taper_result(taper->disk);
+ }
+ }
+
+ }
+ taper = NULL;
+
if(taper_ev_read != NULL) {
event_release(taper_ev_read);
taper_ev_read = NULL;
+ taper_nb_wait_reply = 0;
}
- if(cmd != TAPE_ERROR) aclose(taper);
- taper_result = cmd;
+ start_degraded_mode(&runq);
+ tapeq.head = tapeq.tail = NULL;
+ aclose(taper_fd);
break;
g_strfreev(result_argv);
- if (taper_result != LAST_TOK) {
- if(taper_dumper) {
- if (taper_dumper->result != LAST_TOK) {
+ if (taper && taper->disk && taper->result != LAST_TOK) {
+ if(taper->dumper) {
+ if (taper->dumper->result != LAST_TOK) {
// Dumper already returned it's result
- dumper_taper_result(taper_disk);
+ dumper_taper_result(taper->disk);
}
} else {
- file_taper_result(taper_disk);
+ file_taper_result(taper->disk);
}
}
-
- } while(areads_dataready(taper));
+
+ } while(areads_dataready(taper_fd));
+ startaflush();
}
file_taper_result(
disk_t *dp)
{
+ taper_t *taper;
char *qname = quote_string(dp->name);
- if (taper_result == DONE) {
- update_info_taper(dp, taper_first_label, taper_first_fileno,
+ taper = sched(dp)->taper;
+ if (taper->result == DONE) {
+ update_info_taper(dp, taper->first_label, taper->first_fileno,
sched(dp)->level);
}
sched(dp)->taper_attempted += 1;
- if (taper_input_error) {
+ if (taper->input_error) {
g_printf("driver: taper failed %s %s: %s\n",
- dp->host->hostname, qname, taper_input_error);
+ dp->host->hostname, qname, taper->input_error);
if (strcmp(sched(dp)->datestamp, driver_timestamp) == 0) {
if(sched(dp)->taper_attempted >= 2) {
log_add(L_FAIL, _("%s %s %s %d [too many taper retries after holding disk error: %s]"),
dp->host->hostname, qname, sched(dp)->datestamp,
- sched(dp)->level, taper_input_error);
+ sched(dp)->level, taper->input_error);
g_printf("driver: taper failed %s %s, too many taper retry after holding disk error\n",
dp->host->hostname, qname);
amfree(sched(dp)->destname);
} else {
log_add(L_INFO, _("%s %s %s %d [Will retry dump because of holding disk error: %s]"),
dp->host->hostname, qname, sched(dp)->datestamp,
- sched(dp)->level, taper_input_error);
+ sched(dp)->level, taper->input_error);
g_printf("driver: taper will retry %s %s because of holding disk error\n",
dp->host->hostname, qname);
if (dp->to_holdingdisk != HOLD_REQUIRED) {
amfree(sched(dp)->datestamp);
amfree(dp->up);
}
- } else if (taper_tape_error) {
+ } else if (taper->tape_error) {
g_printf("driver: taper failed %s %s with tape error: %s\n",
- dp->host->hostname, qname, taper_tape_error);
+ dp->host->hostname, qname, taper->tape_error);
if(sched(dp)->taper_attempted >= 2) {
log_add(L_FAIL, _("%s %s %s %d [too many taper retries]"),
dp->host->hostname, qname, sched(dp)->datestamp,
/* Re-insert into taper queue. */
headqueue_disk(&tapeq, dp);
}
- } else if (taper_result != DONE) {
+ } else if (taper->result != DONE) {
g_printf("driver: taper failed %s %s without error\n",
dp->host->hostname, qname);
} else {
amfree(qname);
- taper_busy = 0;
- amfree(taper_input_error);
- amfree(taper_tape_error);
- taper_disk = NULL;
-
+ taper->state &= ~TAPER_STATE_FILE_TO_TAPE;
+ taper->state |= TAPER_STATE_IDLE;
+ amfree(taper->input_error);
+ amfree(taper->tape_error);
+ taper->disk = NULL;
+ taper_nb_wait_reply--;
+ if (taper_nb_wait_reply == 0) {
+ event_release(taper_ev_read);
+ taper_ev_read = NULL;
+ }
+
/* continue with those dumps waiting for diskspace */
continue_port_dumps();
start_some_dumps(&runq);
disk_t *dp)
{
dumper_t *dumper;
+ taper_t *taper;
int is_partial;
char *qname;
dumper = sched(dp)->dumper;
+ taper = sched(dp)->taper;
free_serial_dp(dp);
- if(dumper->result == DONE && taper_result == DONE) {
+ if(dumper->result == DONE && taper->result == DONE) {
update_info_dumper(dp, sched(dp)->origsize,
sched(dp)->dumpsize, sched(dp)->dumptime);
- update_info_taper(dp, taper_first_label, taper_first_fileno,
+ update_info_taper(dp, taper->first_label, taper->first_fileno,
sched(dp)->level);
qname = quote_string(dp->name); /*quote to take care of spaces*/
sched(dp)->est_kps);
amfree(qname);
} else {
- update_failed_dump_to_tape(dp);
+ update_failed_dump(dp);
}
- is_partial = dumper->result != DONE || taper_result != DONE;
+ is_partial = dumper->result != DONE || taper->result != DONE;
sched(dp)->dump_attempted += 1;
sched(dp)->taper_attempted += 1;
- if((dumper->result != DONE || taper_result != DONE) &&
+ if((dumper->result != DONE || taper->result != DONE) &&
sched(dp)->dump_attempted <= 1 &&
sched(dp)->taper_attempted <= 1) {
enqueue_disk(&directq, dp);
event_release(dumper->ev_read);
dumper->ev_read = NULL;
}
- if(taper_ev_read != NULL) {
+ taper_nb_wait_reply--;
+ if (taper_nb_wait_reply == 0 && taper_ev_read != NULL) {
event_release(taper_ev_read);
taper_ev_read = NULL;
}
- taper_busy = 0;
- amfree(taper_input_error);
- amfree(taper_tape_error);
+ taper->state &= ~TAPER_STATE_DUMP_TO_TAPE;
+ taper->state |= TAPER_STATE_IDLE;
+ amfree(taper->input_error);
+ amfree(taper->tape_error);
dumper->busy = 0;
dp->host->inprogress -= 1;
dp->inprogress = 0;
deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
+ taper->dumper = NULL;
+ sched(dp)->dumper = NULL;
+ sched(dp)->taper = NULL;
+ start_some_dumps(&runq);
}
-static dumper_t *
-idle_dumper(void)
+static taper_t *
+idle_taper(void)
{
- dumper_t *dumper;
+ taper_t *taper;
+
+ /* Use an already started taper first */
+ for (taper = tapetable; taper < tapetable + conf_taper_parallel_write;
+ taper++) {
+ if ((taper->state & TAPER_STATE_IDLE) &&
+ (taper->state & TAPER_STATE_TAPE_STARTED) &&
+ !(taper->state & TAPER_STATE_DONE) &&
+ !(taper->state & TAPER_STATE_FILE_TO_TAPE) &&
+ !(taper->state & TAPER_STATE_FILE_TO_TAPE))
+ return taper;
+ }
+ for (taper = tapetable; taper < tapetable + conf_taper_parallel_write;
+ taper++) {
+ if ((taper->state & TAPER_STATE_IDLE) &&
+ (taper->state & TAPER_STATE_RESERVATION) &&
+ !(taper->state & TAPER_STATE_DONE) &&
+ !(taper->state & TAPER_STATE_FILE_TO_TAPE) &&
+ !(taper->state & TAPER_STATE_FILE_TO_TAPE))
+ return taper;
+ }
+ return NULL;
+}
+
+static taper_t *
+taper_from_name(
+ char *name)
+{
+ taper_t *taper;
- for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
- if(!dumper->busy && !dumper->down) return dumper;
+ for (taper = tapetable; taper < tapetable+conf_taper_parallel_write;
+ taper++)
+ if (strcmp(taper->name, name) == 0) return taper;
return NULL;
}
(long long)sched(dp)->est_csize,
sched(dp)->est_kps);
amfree(qname);
+ } else {
+ update_failed_dump(dp);
}
deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
is_partial = dumper->result != DONE || chunker->result != DONE;
rename_tmp_holding(sched(dp)->destname, !is_partial);
+ holding_set_origsize(sched(dp)->destname, sched(dp)->origsize);
dummy = (off_t)0;
for( i = 0, h = sched(dp)->holdp; i < activehd; i++ ) {
aclose(chunker->fd);
chunker->fd = -1;
chunker->down = 1;
-
+
dp = NULL;
if (chunker->result == ABORT_FINISHED)
pending_aborts--;
{
/* uses global pending_aborts */
dumper_t *dumper = cookie;
+ taper_t *taper;
disk_t *dp, *sdp, *dp1;
cmd_t cmd;
int result_argc;
if (cmd != BOGUS) {
int last_dump = 1;
+ dumper_t *dumper;
+
run_server_scripts(EXECUTE_ON_POST_DLE_BACKUP,
get_config_name(), dp, sched(dp)->level);
+ /* check dump not yet started */
for (dp1=runq.head; dp1 != NULL; dp1 = dp1->next) {
- if (dp1 != dp) last_dump = 0;
+ if (dp1->host == dp->host)
+ last_dump = 0;
+ }
+ /* check direct to tape dump */
+ for (dp1=directq.head; dp1 != NULL; dp1 = dp1->next) {
+ if (dp1->host == dp->host)
+ last_dump = 0;
+ }
+ /* check dumping dle */
+ for (dumper = dmptable; dumper < dmptable + inparallel; dumper++) {
+ if (dumper->busy && dumper->dp != dp &&
+ dumper->dp->host == dp->host)
+ last_dump = 0;
}
if (last_dump && dp->host->post_script == 0) {
if (dp->host->post_script == 0) {
}
}
+ taper = sched(dp)->taper;
/* send the dumper result to the chunker */
if (dumper->chunker) {
if (dumper->chunker->down == 0 && dumper->chunker->fd != -1 &&
dumper->chunker->result != LAST_TOK)
dumper_chunker_result(dp);
} else { /* send the dumper result to the taper */
- if (taper_sendresult) {
+ if (taper->sendresult) {
if (cmd == DONE) {
- taper_cmd(DONE, driver_timestamp, NULL, 0, NULL);
+ taper_cmd(DONE, dp, NULL, 0, NULL);
} else {
- taper_cmd(FAILED, driver_timestamp, NULL, 0, NULL);
+ taper_cmd(FAILED, dp, NULL, 0, NULL);
}
- taper_sendresult = 0;
+ taper->sendresult = 0;
+ }
+ if (taper->dumper && taper->result != LAST_TOK) {
+ dumper_taper_result(dp);
}
- }
- if (taper_dumper && taper_result != LAST_TOK) {
- dumper_taper_result(dp);
}
} while(areads_dataready(dumper->fd));
}
* determined in continue_port_dumps(). */
enqueue_disk( &roomq, dp );
continue_port_dumps();
+ /* continue flush waiting for new tape */
+ startaflush();
} else {
/* OK, allocate space for disk and have chunker continue */
sched(dp)->activehd = assign_holdingdisk( h, dp );
}
-static disklist_t
-read_flush(void)
+static void
+read_flush(
+ void * cookie)
{
sched_t *sp;
disk_t *dp;
char *command;
char *s;
int ch;
- disklist_t tq;
char *qname = NULL;
char *qdestname = NULL;
+ char *conf_infofile;
- tq.head = tq.tail = NULL;
+ (void)cookie; /* Quiet unused parameter warning */
+
+ event_release(flush_ev_read);
+ flush_ev_read = NULL;
+
+ /* read schedule from stdin */
+ conf_infofile = config_dir_relative(getconf_str(CNF_INFOFILE));
+ if (open_infofile(conf_infofile)) {
+ error(_("could not open info db \"%s\""), conf_infofile);
+ /*NOTREACHED*/
+ }
+ amfree(conf_infofile);
for(line = 0; (inpline = agets(stdin)) != NULL; free(inpline)) {
dumpfile_t file;
sp->est_csize = (off_t)0;
sp->est_time = 0;
sp->est_kps = 10;
+ sp->origsize = file.orig_size;
sp->priority = 0;
sp->degr_level = -1;
sp->dump_attempted = 0;
sp->holdp = build_diskspace(destname);
if(sp->holdp == NULL) continue;
sp->dumper = NULL;
+ sp->taper = NULL;
sp->timestamp = (time_t)0;
dp1->up = (char *)sp;
- enqueue_disk(&tq, dp1);
+ enqueue_disk(&tapeq, dp1);
dumpfile_free_data(&file);
}
amfree(inpline);
+ close_infofile();
- /*@i@*/ return tq;
+ startaflush();
+ if (!nodump) {
+ schedule_ev_read = event_register((event_id_t)0, EV_READFD,
+ read_schedule, NULL);
+ }
}
static void
long long csize_;
long long degr_nsize_;
long long degr_csize_;
+ GPtrArray *errarray;
(void)cookie; /* Quiet unused parameter warning */
event_release(schedule_ev_read);
+ schedule_ev_read = NULL;
/* read schedule from stdin */
sp->holdp = NULL;
sp->activehd = -1;
sp->dumper = NULL;
+ sp->taper = NULL;
sp->timestamp = (time_t)0;
sp->destname = NULL;
sp->no_space = 0;
dp->up = (char *) sp;
if(dp->host->features == NULL) {
dp->host->features = am_string_to_feature(features);
+ if (!dp->host->features) {
+ log_add(L_WARNING,
+ _("Invalid feature string from client '%s'"),
+ features);
+ dp->host->features = am_set_default_feature_set();
+ }
}
remove_disk(&waitq, dp);
- if (dp->to_holdingdisk == HOLD_NEVER) {
- enqueue_disk(&directq, dp);
+
+ errarray = validate_optionstr(dp);
+ if (errarray->len > 0) {
+ guint i;
+ for (i=0; i < errarray->len; i++) {
+ log_add(L_FAIL, _("%s %s %s 0 [%s]"),
+ dp->host->hostname, qname,
+ sp->datestamp,
+ (char *)g_ptr_array_index(errarray, i));
+ }
+ amfree(qname);
} else {
- enqueue_disk(&runq, dp);
+
+ if (dp->data_path == DATA_PATH_DIRECTTCP &&
+ dp->to_holdingdisk == HOLD_AUTO) {
+ dp->to_holdingdisk = HOLD_NEVER;
+ }
+
+ if (dp->to_holdingdisk == HOLD_NEVER) {
+ enqueue_disk(&directq, dp);
+ } else {
+ enqueue_disk(&runq, dp);
+ }
+ flush_size += sp->act_size;
}
- flush_size += sp->act_size;
amfree(diskname);
}
g_printf(_("driver: flush size %lld\n"), (long long)flush_size);
if(need_degraded==1) start_degraded_mode(&runq);
schedule_done = 1;
start_some_dumps(&runq);
+ startaflush();
}
static unsigned long
}
static void
-update_failed_dump_to_tape(
+update_failed_dump(
disk_t * dp)
{
-/* JLM
- * should simply set no_bump
- */
-
time_t save_timestamp = sched(dp)->timestamp;
/* setting timestamp to 0 removes the current level from the
* database, so that we ensure that it will not be bumped to the
}
/* ------------------- */
-static void
-dump_to_tape(
- disk_t * dp)
-{
- dumper_t *dumper;
- cmd_t cmd;
- int result_argc;
- char **result_argv;
- char *qname;
- disk_t *dp1;
-
- qname = quote_string(dp->name);
- g_printf(_("driver: dumping %s:%s directly to tape\n"),
- dp->host->hostname, qname);
- fflush(stdout);
-
- /* pick a dumper and fail if there are no idle dumpers */
-
- dumper = idle_dumper();
- if (!dumper) {
- g_printf(_("driver: no idle dumpers for %s:%s.\n"),
- dp->host->hostname, qname);
- fflush(stdout);
- log_add(L_WARNING, _("no idle dumpers for %s:%s.\n"),
- dp->host->hostname, qname);
- amfree(qname);
- return; /* fatal problem */
- }
-
- /* tell the taper to read from a port number of its choice */
-
- taper_cmd(PORT_WRITE, dp, NULL, sched(dp)->level, sched(dp)->datestamp);
- cmd = getresult(taper, 1, &result_argc, &result_argv);
- if(cmd != PORT) {
- g_printf(_("driver: did not get PORT from taper for %s:%s\n"),
- dp->host->hostname, qname);
- fflush(stdout);
- log_add(L_WARNING, _("driver: did not get PORT from taper for %s:%s.\n"),
- dp->host->hostname, qname);
- amfree(qname);
- return; /* fatal problem */
- }
- amfree(qname);
-
- /* copy port number */
- dumper->output_port = atoi(result_argv[1]);
-
- dumper->dp = dp;
- dumper->chunker = NULL;
- dumper->result = LAST_TOK;
- taper_result = LAST_TOK;
- sched(dp)->dumper = dumper;
-
- if (dp->host->pre_script == 0) {
- for (dp1=dp->host->disks; dp1 != NULL; dp1 = dp1->hostnext) {
- run_server_scripts(EXECUTE_ON_PRE_HOST_BACKUP,
- get_config_name(), dp1, -1);
- }
- dp->host->pre_script = 1;
- }
- run_server_scripts(EXECUTE_ON_PRE_DLE_BACKUP, get_config_name(), dp,
- sched(dp)->level);
-
- /* tell the dumper to dump to a port */
- dumper_cmd(dumper, PORT_DUMP, dp, NULL);
- dp->host->start_t = time(NULL) + 15;
-
- /* update statistics & print state */
-
- taper_busy = dumper->busy = 1;
- taper_input_error = NULL;
- taper_tape_error = NULL;
- taper_dumper = dumper;
- taper_disk = dp;
- taper_first_label = NULL;
- taper_written = 0;
- taper_state |= TAPER_STATE_DUMP_TO_TAPE;
- sched(dp)->act_size = sched(dp)->est_size;
- dp->host->inprogress += 1;
- dp->inprogress = 1;
- sched(dp)->timestamp = time((time_t *)0);
- allocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
- idle_reason = NOT_IDLE;
-
- short_dump_state();
-
- dumper->ev_read = event_register(dumper->fd, EV_READFD,
- handle_dumper_result, dumper);
- taper_ev_read = event_register(taper, EV_READFD,
- handle_taper_result, NULL);
-
- g_strfreev(result_argv);
-}
static int
queue_length(
free_kps(NULL),
(long long)free_space());
if(degraded_mode) g_printf(_("DOWN"));
- else if(!taper_busy) g_printf(_("idle"));
- else g_printf(_("writing"));
+ else {
+ taper_t *taper;
+ int writing = 0;
+ for(taper = tapetable; taper < tapetable+conf_taper_parallel_write;
+ taper++) {
+ if (taper->state & TAPER_STATE_DUMP_TO_TAPE ||
+ taper->state & TAPER_STATE_FILE_TO_TAPE)
+ writing = 1;
+ }
+ if(writing)
+ g_printf(_("writing"));
+ else
+ g_printf(_("idle"));
+ }
nidle = 0;
for(i = 0; i < inparallel; i++) if(!dmptable[i].busy) nidle++;
g_printf(_(" idle-dumpers: %d"), nidle);
fflush(stdout);
}
-static TapeAction tape_action(char **why_no_new_tape)
+static TapeAction
+tape_action(
+ taper_t *taper,
+ char **why_no_new_tape)
{
TapeAction result = TAPE_ACTION_NO_ACTION;
dumper_t *dumper;
+ taper_t *taper1;
disk_t *dp;
off_t dumpers_size;
off_t runq_size;
off_t sched_size;
off_t dump_to_disk_size;
int dump_to_disk_terminated;
+ off_t my_flush_threshold_dumped;
+ off_t my_flush_threshold_scheduled;
+ off_t my_taperflush;
+ int nb_taper_active = nb_sent_new_tape;
+ int nb_taper_flushing = 0;
dumpers_size = 0;
for(dumper = dmptable; dumper < (dmptable+inparallel); dumper++) {
- if (dumper->busy)
+ if (dumper->busy && !sched(dumper->dp)->taper)
dumpers_size += sched(dumper->dp)->est_size;
}
driver_debug(1, _("dumpers_size: %lld\n"), (long long)dumpers_size);
}
driver_debug(1, _("directq_size: %lld\n"), (long long)directq_size);
- tapeq_size = 0;
+ tapeq_size = directq_size;
for(dp = tapeq.head; dp != NULL; dp = dp->next) {
tapeq_size += sched(dp)->act_size;
}
- if (taper_disk) {
- tapeq_size += sched(taper_disk)->act_size - taper_written;
+
+ for (taper1 = tapetable; taper1 < tapetable+conf_taper_parallel_write;
+ taper1++) {
+ if (taper1->state & TAPER_STATE_FILE_TO_TAPE ||
+ taper1->state & TAPER_STATE_DUMP_TO_TAPE) {
+ nb_taper_flushing++;
+ }
+ }
+
+ /* Add what is currently written to tape and in the go. */
+ for (taper1 = tapetable; taper1 < tapetable+conf_taper_parallel_write;
+ taper1++) {
+ if (taper1->state & TAPER_STATE_TAPE_STARTED) {
+ tapeq_size -= taper1->left;
+ }
+ if (taper1->disk) {
+ if (taper1->dumper) {
+ tapeq_size += sched(taper1->disk)->est_size - taper1->written;
+ } else {
+ tapeq_size += sched(taper1->disk)->act_size - taper1->written;
+ }
+ }
}
driver_debug(1, _("tapeq_size: %lld\n"), (long long)tapeq_size);
- sched_size = runq_size + tapeq_size + dumpers_size;
+ sched_size = runq_size + directq_size + tapeq_size + dumpers_size;
driver_debug(1, _("sched_size: %lld\n"), (long long)sched_size);
- dump_to_disk_size = dumpers_size + runq_size;
+ dump_to_disk_size = dumpers_size + runq_size + directq_size;
driver_debug(1, _("dump_to_disk_size: %lld\n"), (long long)dump_to_disk_size);
dump_to_disk_terminated = schedule_done && dump_to_disk_size == 0;
+ for (taper1 = tapetable; taper1 < tapetable + conf_taper_parallel_write;
+ taper1++) {
+ if (taper1->state & TAPER_STATE_TAPE_STARTED) {
+ nb_taper_active++;
+ }
+ }
+ if (nb_taper_active >= 1) {
+ my_flush_threshold_dumped = flush_threshold_dumped +
+ (nb_taper_active-nb_taper_active) * tape_length;
+ my_flush_threshold_scheduled = flush_threshold_scheduled +
+ (nb_taper_active-nb_taper_active) * tape_length;
+ my_taperflush = taperflush + (nb_taper_active-nb_taper_active) * tape_length;
+ } else {
+ my_flush_threshold_dumped = flush_threshold_dumped +
+ nb_taper_active * tape_length;
+ my_flush_threshold_scheduled = flush_threshold_scheduled +
+ nb_taper_active * tape_length;
+ my_taperflush = taperflush + nb_taper_active * tape_length;
+ }
+
// Changing conditionals can produce a driver hang, take care.
//
// when to start writting to a new tape
- if ((taper_state & TAPER_STATE_WAIT_FOR_TAPE) &&
- ((taper_state & TAPER_STATE_DUMP_TO_TAPE) || // for dump to tape
+ if (taper->state & TAPER_STATE_TAPE_REQUESTED) {
+ if (current_tape >= conf_runtapes && taper_nb_scan_volume == 0 &&
+ nb_taper_active == 0) {
+ *why_no_new_tape = g_strdup_printf(_("%d tapes filled; runtapes=%d "
+ "does not allow additional tapes"), current_tape, conf_runtapes);
+ result |= TAPE_ACTION_NO_NEW_TAPE;
+ } else if (current_tape < conf_runtapes &&
+ taper_nb_scan_volume == 0 &&
+ ((my_flush_threshold_dumped < tapeq_size &&
+ my_flush_threshold_scheduled < sched_size) ||
+ nb_taper_active == 0) &&
+ (last_started_taper == NULL ||
+ last_started_taper == taper)) {
+ result |= TAPE_ACTION_SCAN;
+ } else {
+ result |= TAPE_ACTION_MOVE;
+ }
+ } else if ((taper->state & TAPER_STATE_WAIT_FOR_TAPE) &&
+ ((taper->state & TAPER_STATE_DUMP_TO_TAPE) || // for dump to tape
!empty(directq) || // if a dle is waiting for a dump to tape
!empty(roomq) || // holding disk constraint
idle_reason == IDLE_NO_DISKSPACE || // holding disk constraint
- (flush_threshold_dumped < tapeq_size && // flush-threshold-dumped &&
- flush_threshold_scheduled < sched_size) || // flush-threshold-scheduled
- (taperflush < tapeq_size && // taperflush
+ (my_flush_threshold_dumped < tapeq_size && // flush-threshold-dumped &&
+ my_flush_threshold_scheduled < sched_size) || // flush-threshold-scheduled
+ (my_taperflush < tapeq_size && // taperflush
(force_flush == 1 || // if force_flush
dump_to_disk_terminated)) // or all dump to disk terminated
)) {
result |= TAPE_ACTION_NEW_TAPE;
// when to stop using new tape
- } else if ((taper_state & TAPER_STATE_WAIT_FOR_TAPE) &&
- (taperflush >= tapeq_size && // taperflush criteria not meet
- (force_flush == 1 || // if force_flush
- dump_to_disk_terminated)) // or all dump to disk terminated
+ } else if ((taper->state & TAPER_STATE_WAIT_FOR_TAPE) &&
+ (my_taperflush >= tapeq_size && // taperflush criteria
+ (force_flush == 1 || // if force_flush
+ dump_to_disk_terminated)) // or all dump to disk
) {
- result |= TAPE_ACTION_NO_NEW_TAPE;
- if (flush_threshold_dumped >= tapeq_size) {
- *why_no_new_tape = _("flush-threshold-dumped criteria not met");
- } else if (flush_threshold_scheduled >= sched_size) {
- *why_no_new_tape = _("flush-threshold-scheduled criteria not met");
- } else {
- *why_no_new_tape = _("taperflush criteria not met");
+ if (nb_taper_active <= 0) {
+ if (current_tape >= conf_runtapes) {
+ *why_no_new_tape = g_strdup_printf(_("%d tapes filled; runtapes=%d "
+ "does not allow additional tapes"), current_tape, conf_runtapes);
+ } else {
+ *why_no_new_tape = _("taperflush criteria not met");
+ }
+ result |= TAPE_ACTION_NO_NEW_TAPE;
}
}
// We don't start a flush if taper_tape_started == 1 && dump_to_disk_terminated && force_flush == 0,
// it is a criteria need to exit the first event_loop without flushing everything to tape,
// they will be flush in another event_loop.
- if (!degraded_mode && !taper_busy && !empty(tapeq) &&
- (!((taper_state & TAPER_STATE_TAPE_STARTED) &&
- dump_to_disk_terminated && force_flush == 0) || // if tape already started and dump to disk not terminated
- ((taper_state & TAPER_STATE_TAPE_STARTED) &&
- force_flush == 1) || // if tape already started and force_flush
- !empty(roomq) || // holding disk constraint
- idle_reason == IDLE_NO_DISKSPACE || // holding disk constraint
- (flush_threshold_dumped < tapeq_size && // flush-threshold-dumped &&
- flush_threshold_scheduled < sched_size) || // flush-threshold-scheduled
- (force_flush == 1 && taperflush < tapeq_size))) { // taperflush if force_flush
- result |= TAPE_ACTION_START_A_FLUSH;
+ if (taper->state & TAPER_STATE_IDLE) {
+ if (!degraded_mode && (!empty(tapeq) || !empty(directq)) &&
+ (((taper->state & TAPER_STATE_TAPE_STARTED) &&
+ force_flush == 1) || // if tape already started and force_flush
+ !empty(roomq) || // holding disk constraint
+ idle_reason == IDLE_NO_DISKSPACE || // holding disk constraint
+ (my_flush_threshold_dumped < tapeq_size && // flush-threshold-dumped &&
+ my_flush_threshold_scheduled < sched_size) || // flush-threshold-scheduled
+ (force_flush == 1 && my_taperflush < tapeq_size))) { // taperflush if force_flush
+
+ if (nb_taper_flushing == 0) {
+ result |= TAPE_ACTION_START_A_FLUSH;
+ } else {
+ result |= TAPE_ACTION_START_A_FLUSH_FIT;
+ }
+ }
}
return result;
}
+static int
+all_taper_idle(void)
+{
+ taper_t *taper;
+
+ for (taper = tapetable; taper < tapetable + conf_taper_parallel_write;
+ taper++) {
+ if (taper->state & TAPER_STATE_DUMP_TO_TAPE ||
+ taper->state & TAPER_STATE_FILE_TO_TAPE)
+ return 0;
+ }
+ return 1;
+}
+
#if 0
static void
dump_state(
free_kps(NULL),
(long long)free_space());
if(degraded_mode) g_printf(_("taper: DOWN\n"));
- else if(!taper_busy) g_printf(_("taper: idle\n"));
+ else if(taper->status == TAPER_IDLE) g_printf(_("taper: idle\n"));
else g_printf(_("taper: writing %s:%s.%d est size %lld\n"),
- taper_disk->host->hostname, taper_disk->name,
- sched(taper_disk)->level,
- (long long)sched(taper_disk)->est_size);
+ taper->disk->host->hostname, taper->disk->name,
+ sched(taper->disk)->level,
+ (long long)sched(taper->disk)->est_size);
for(i = 0; i < inparallel; i++) {
dp = dmptable[i].dp;
if(!dmptable[i].busy)