/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 1991-2000 University of Maryland at College Park
+ * Copyright (c) 1991-1998 University of Maryland at College Park
* All Rights Reserved.
*
* Permission to use, copy, modify, distribute, and sell this software and its
* file named AUTHORS, in the root directory of this distribution.
*/
/*
- * $Id: driver.c,v 1.58.2.31.2.8.2.20.2.16 2005/09/20 21:31:52 jrjackson Exp $
+ * $Id: driver.c,v 1.164 2006/03/22 15:07:08 martinea Exp $
*
* controlling process for the Amanda backup system
*/
* tape. Probably not effective though, should do this in planner.
*/
+/*#define HOLD_DEBUG*/
+
#include "amanda.h"
#include "clock.h"
#include "conffile.h"
#include "diskfile.h"
+#include "event.h"
#include "holding.h"
#include "infofile.h"
#include "logfile.h"
#include "driverio.h"
#include "server_util.h"
-disklist_t waitq, runq, tapeq, roomq;
-int pending_aborts, inside_dump_to_tape;
-disk_t *taper_disk;
-int degraded_mode;
-unsigned long reserved_space;
-unsigned long total_disksize;
-char *dumper_program;
-int inparallel;
-int nodump = 0;
-unsigned long tape_length, tape_left = 0;
-int conf_taperalgo;
-am_host_t *flushhost = NULL;
-
-int client_constrained P((disk_t *dp));
-int sort_by_priority_reversed P((disk_t *a, disk_t *b));
-int sort_by_time P((disk_t *a, disk_t *b));
-int start_some_dumps P((disklist_t *rq));
-void dump_schedule P((disklist_t *qp, char *str));
-void start_degraded_mode P((disklist_t *queuep));
-void handle_taper_result P((void));
-dumper_t *idle_dumper P((void));
-int some_dumps_in_progress P((void));
-int num_busy_dumpers P((void));
-dumper_t *lookup_dumper P((int fd));
-void handle_dumper_result P((int fd));
-void read_flush P((disklist_t *tapeqp));
-void read_schedule P((disklist_t *waitqp, disklist_t *runqp));
-int free_kps P((interface_t *ip));
-void interface_state P((char *time_str));
-void allocate_bandwidth P((interface_t *ip, int kps));
-void deallocate_bandwidth P((interface_t *ip, int kps));
-unsigned long free_space P((void));
-assignedhd_t **find_diskspace P((unsigned long size, int *cur_idle, assignedhd_t *preferred));
-char *diskname2filename P((char *dname));
-int assign_holdingdisk P((assignedhd_t **holdp, disk_t *diskp));
+static disklist_t waitq, runq, tapeq, roomq;
+static int pending_aborts;
+static disk_t *taper_disk;
+static int degraded_mode;
+static unsigned long reserved_space;
+static unsigned long total_disksize;
+static char *dumper_program;
+static char *chunker_program;
+static int inparallel;
+static int nodump = 0;
+static unsigned long tape_length, tape_left = 0;
+static int current_tape = 1;
+static int conf_taperalgo;
+static int conf_runtapes;
+static time_t sleep_time;
+static int idle_reason;
+static char *datestamp;
+static char *timestamp;
+static am_host_t *flushhost = NULL;
+static int need_degraded=0;
+
+static event_handle_t *dumpers_ev_time = NULL;
+static event_handle_t *schedule_ev_read = NULL;
+
+static void allocate_bandwidth P((interface_t *ip, int kps));
+static int assign_holdingdisk P((assignedhd_t **holdp, disk_t *diskp));
static void adjust_diskspace P((disk_t *diskp, cmd_t cmd));
static void delete_diskspace P((disk_t *diskp));
-assignedhd_t **build_diskspace P((char *destname));
-void holdingdisk_state P((char *time_str));
-int dump_to_tape P((disk_t *dp));
-int queue_length P((disklist_t q));
-void short_dump_state P((void));
-void dump_state P((char *str));
-void startaflush P((void));
+static assignedhd_t **build_diskspace P((char *destname));
+static int client_constrained P((disk_t *dp));
+static void deallocate_bandwidth P((interface_t *ip, int kps));
+static void dump_schedule P((disklist_t *qp, char *str));
+static int dump_to_tape P((disk_t *dp));
+static assignedhd_t **find_diskspace P((unsigned long size, int *cur_idle,
+ assignedhd_t *preferred));
+static int free_kps P((interface_t *ip));
+static unsigned long free_space P((void));
+static void dumper_result P((disk_t *dp));
+static void handle_dumper_result P((void *));
+static void handle_chunker_result P((void *));
+static void handle_dumpers_time P((void *));
+static void handle_taper_result P((void *));
+static void holdingdisk_state P((char *time_str));
+static dumper_t *idle_dumper P((void));
+static void interface_state P((char *time_str));
+static int num_busy_dumpers P((void));
+static int queue_length P((disklist_t q));
+static disklist_t read_flush P((void));
+static void read_schedule P((void *cookie));
+static void short_dump_state P((void));
+static void startaflush P((void));
+static void start_degraded_mode P((disklist_t *queuep));
+static void start_some_dumps P((disklist_t *rq));
+static void continue_port_dumps();
+static void update_failed_dump_to_tape P((disk_t *));
+#if 0
+static void dump_state P((const char *str));
+#endif
int main P((int main_argc, char **main_argv));
-static int idle_reason;
-char *datestamp;
-char *timestamp;
-
-char *idle_strings[] = {
+static const char *idle_strings[] = {
#define NOT_IDLE 0
"not-idle",
-#define IDLE_START_WAIT 1
- "start-wait",
-#define IDLE_NO_DUMPERS 2
+#define IDLE_NO_DUMPERS 1
"no-dumpers",
+#define IDLE_START_WAIT 2
+ "start-wait",
#define IDLE_NO_HOLD 3
"no-hold",
#define IDLE_CLIENT_CONSTRAINED 4
"taper-wait",
};
-#define SLEEP_MAX (24*3600)
-struct timeval sleep_time = { SLEEP_MAX, 0 };
-/* enabled if any disks are in start-wait: */
-int any_delayed_disk = 0;
-
-int main(main_argc, main_argv)
+int
+main(main_argc, main_argv)
int main_argc;
char **main_argv;
{
- disklist_t *origqp;
+ disklist_t origq;
disk_t *diskp;
- fd_set selectset;
- int fd, dsk;
+ int dsk;
dumper_t *dumper;
char *newdir = NULL;
generic_fs_stats_t fs;
set_pname("driver");
+ /* Don't die when child closes pipe */
signal(SIGPIPE, SIG_IGN);
malloc_size_1 = malloc_inuse(&malloc_hist_1);
set_logerror(logerror);
startclock();
- FD_ZERO(&readset);
printf("%s: pid %ld executable %s version %s\n",
get_pname(), (long) getpid(), main_argv[0], version());
taper_program = vstralloc(libexecdir, "/", "taper", versionsuffix(), NULL);
dumper_program = vstralloc(libexecdir, "/", "dumper", versionsuffix(),
NULL);
+ chunker_program = vstralloc(libexecdir, "/", "chunker", versionsuffix(),
+ NULL);
conf_taperalgo = getconf_int(CNF_TAPERALGO);
conf_tapetype = getconf_str(CNF_TAPETYPE);
+ conf_runtapes = getconf_int(CNF_RUNTAPES);
tape = lookup_tapetype(conf_tapetype);
tape_length = tape->length;
printf("driver: tape size %ld\n", tape_length);
/* taper takes a while to get going, so start it up right away */
init_driverio();
- startup_tape_process(taper_program);
- taper_cmd(START_TAPER, datestamp, NULL, 0, NULL);
+ if(conf_runtapes > 0) {
+ startup_tape_process(taper_program);
+ taper_cmd(START_TAPER, datestamp, NULL, 0, NULL);
+ }
/* start initializing: read in databases */
} else {
conf_diskfile = stralloc2(config_dir, conf_diskfile);
}
- if((origqp = read_diskfile(conf_diskfile)) == NULL) {
+ if (read_diskfile(conf_diskfile, &origq) < 0)
error("could not load disklist \"%s\"", conf_diskfile);
- }
amfree(conf_diskfile);
/* set up any configuration-dependent variables */
newdir = newvstralloc(newdir,
hdp->diskdir, "/", timestamp,
NULL);
- if(!mkholdingdir(newdir)) {
+ if(!mkholdingdir(newdir)) {
hdp->disksize = 0L;
}
total_disksize += hdp->disksize;
if(inparallel > MAX_DUMPERS) inparallel = MAX_DUMPERS;
/* fire up the dumpers now while we are waiting */
-
if(!nodump) startup_dump_processes(dumper_program, inparallel);
/*
* in parallel with the planner.
*/
- waitq = *origqp;
- tapeq.head = tapeq.tail = NULL;
- roomq.head = roomq.tail = NULL;
- runq.head = runq.tail = NULL;
+ runq.head = NULL;
+ runq.tail = NULL;
+ waitq = origq;
+ tapeq = read_flush();
- read_flush(&tapeq);
+ roomq.head = roomq.tail = NULL;
log_add(L_STATS, "startup time %s", walltime_str(curclock()));
getconf_str(CNF_DUMPORDER));
fflush(stdout);
- /* Let's see if the tape is ready */
+ /* ok, planner is done, now lets see if the tape is ready */
- cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1);
+ if(conf_runtapes > 0) {
+ cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1);
- if(cmd != TAPER_OK) {
- /* no tape, go into degraded mode: dump to holding disk */
- start_degraded_mode(&runq);
- FD_CLR(taper,&readset);
+ if(cmd != TAPER_OK) {
+ /* no tape, go into degraded mode: dump to holding disk */
+ need_degraded=1;
+ }
+ }
+ else {
+ need_degraded=1;
}
-
- short_dump_state(); /* for amstatus */
tape_left = tape_length;
taper_busy = 0;
taper_disk = NULL;
+ taper_ev_read = NULL;
+ if(!need_degraded) startaflush();
- /* Start autoflush while waiting for dump schedule */
- if(!nodump) {
- /* Start any autoflush tape writes */
- if (!empty(tapeq)) {
- startaflush();
- short_dump_state(); /* for amstatus */
-
- /* Process taper results until the schedule arrives */
- while (1) {
- FD_ZERO(&selectset);
- FD_SET(0, &selectset);
- FD_SET(taper, &selectset);
-
- if(select(taper+1, (SELECT_ARG_TYPE *)(&selectset), NULL, NULL,
- &sleep_time) == -1)
- error("select: %s", strerror(errno));
- if (FD_ISSET(0, &selectset)) break; /* schedule arrived */
- if (FD_ISSET(taper, &selectset)) handle_taper_result();
- short_dump_state(); /* for amstatus */
- }
-
- }
-
- /* Read the dump schedule */
- read_schedule(&waitq, &runq);
- }
-
- /* Start any needed flushes */
- startaflush();
-
- while(start_some_dumps(&runq) || some_dumps_in_progress() ||
- any_delayed_disk) {
- short_dump_state();
-
- /* wait for results */
-
- memcpy(&selectset, &readset, sizeof(fd_set));
- if(select(maxfd+1, (SELECT_ARG_TYPE *)(&selectset),
- NULL, NULL, &sleep_time) == -1)
- error("select: %s", strerror(errno));
-
- /* handle any results that have come in */
-
- for(fd = 0; fd <= maxfd; fd++) {
- /*
- * The first pass through the following loop, we have
- * data ready for areads (called by getresult, called by
- * handle_.*_result). But that may read more than one record,
- * so we need to keep processing as long as areads has data.
- * We will get control back after each record and the buffer
- * will go empty (indicated by areads_dataready(fd) == 0)
- * after the last one available has been processed.
- */
- while(FD_ISSET(fd, &selectset) || areads_dataready(fd) > 0) {
- if(fd == taper) handle_taper_result();
- else handle_dumper_result(fd);
- FD_CLR(fd, &selectset);
- }
- }
+ if(!nodump)
+ schedule_ev_read = event_register(0, EV_READFD, read_schedule, NULL);
- }
+ short_dump_state();
+ event_loop(0);
/* handle any remaining dumps by dumping directly to tape, if possible */
- while(!empty(runq)) {
+ while(!empty(runq) && taper > 0) {
diskp = dequeue_disk(&runq);
if(!degraded_mode) {
int rc = dump_to_tape(diskp);
}
else
log_add(L_FAIL, "%s %s %s %d [%s]",
- diskp->host->hostname, diskp->name,
- sched(diskp)->datestamp, sched(diskp)->level,
+ diskp->host->hostname, diskp->name, sched(diskp)->datestamp,
+ sched(diskp)->level,
diskp->no_hold ?
"can't dump no-hold disk in degraded mode" :
"no more holding disk space");
who = stralloc("taper");
}
if(what != NULL && who == NULL) {
- ap_snprintf(number, sizeof(number), "%ld", (long)pid);
+ snprintf(number, sizeof(number), "%ld", (long)pid);
who = stralloc2("unknown pid ", number);
}
if(who && what) {
}
amfree(newdir);
+ check_unfree_serial();
printf("driver: FINISHED time %s\n", walltime_str(curclock()));
fflush(stdout);
log_add(L_FINISH,"date %s time %s", datestamp, walltime_str(curclock()));
return 0;
}
-void startaflush() {
+static void
+startaflush()
+{
disk_t *dp = NULL;
disk_t *fit = NULL;
char *datestamp;
-
+ unsigned int extra_tapes = 0;
if(!degraded_mode && !taper_busy && !empty(tapeq)) {
+
datestamp = sched(tapeq.head)->datestamp;
switch(conf_taperalgo) {
case ALGO_FIRST:
dp = dequeue_disk(&tapeq);
break;
- case ALGO_FIRSTFIT:
+ case ALGO_FIRSTFIT:
fit = tapeq.head;
while (fit != NULL) {
- if(sched(fit)->act_size <= tape_left &&
+ extra_tapes = (fit->tape_splitsize > 0) ?
+ conf_runtapes - current_tape : 0;
+ if(sched(fit)->act_size <= (tape_left + tape_length*extra_tapes) &&
strcmp(sched(fit)->datestamp, datestamp) <= 0) {
dp = fit;
fit = NULL;
}
if(dp) remove_disk(&tapeq, dp);
break;
- case ALGO_LARGEST:
+ case ALGO_LARGEST:
fit = dp = tapeq.head;
while (fit != NULL) {
if(sched(fit)->act_size > sched(dp)->act_size &&
}
if(dp) remove_disk(&tapeq, dp);
break;
- case ALGO_LARGESTFIT:
+ case ALGO_LARGESTFIT:
fit = tapeq.head;
while (fit != NULL) {
- if(sched(fit)->act_size <= tape_left &&
+ extra_tapes = (fit->tape_splitsize > 0) ?
+ conf_runtapes - current_tape : 0;
+ if(sched(fit)->act_size <= (tape_left + tape_length*extra_tapes) &&
(!dp || sched(fit)->act_size > sched(dp)->act_size) &&
strcmp(sched(fit)->datestamp, datestamp) <= 0) {
dp = fit;
}
if(dp) remove_disk(&tapeq, dp);
break;
- case ALGO_SMALLEST:
+ case ALGO_SMALLEST:
break;
case ALGO_LAST:
dp = tapeq.tail;
strcmp(sched(fit)->datestamp, datestamp) <= 0) {
dp = fit;
}
- fit = fit->next;
+ fit = fit->next;
}
if(dp) remove_disk(&tapeq, dp);
}
- taper_disk = dp;
- taper_busy = 1;
- taper_cmd(FILE_WRITE, dp, sched(dp)->destname, sched(dp)->level,
+ if(taper_ev_read == NULL) {
+ taper_ev_read = event_register(taper, EV_READFD,
+ handle_taper_result, NULL);
+ }
+ if (dp != NULL) {
+ taper_disk = dp;
+ taper_busy = 1;
+ taper_cmd(FILE_WRITE, dp, sched(dp)->destname, sched(dp)->level,
sched(dp)->datestamp);
- fprintf(stderr,"driver: startaflush: %s %s %s %ld %ld\n",
- taperalgo2str(conf_taperalgo), dp->host->hostname,
- dp->name, sched(taper_disk)->act_size, tape_left);
- if(sched(dp)->act_size <= tape_left)
- tape_left -= sched(dp)->act_size;
- else
- tape_left = 0;
+ fprintf(stderr,"driver: startaflush: %s %s %s %ld %ld\n",
+ taperalgo2str(conf_taperalgo), dp->host->hostname,
+ dp->name, sched(taper_disk)->act_size, tape_left);
+ if(sched(dp)->act_size <= tape_left)
+ tape_left -= sched(dp)->act_size;
+ else
+ tape_left = 0;
+ } else {
+ error("FATAL: Taper marked busy and no work found.");
+ /*NOTREACHED*/
+ }
+ } else if(!taper_busy && taper_ev_read != NULL) {
+ event_release(taper_ev_read);
+ taper_ev_read = NULL;
}
}
-int client_constrained(dp)
-disk_t *dp;
+static int
+client_constrained(dp)
+ disk_t *dp;
{
disk_t *dp2;
return 0;
}
-int start_some_dumps(rq)
-disklist_t *rq;
+static void
+start_some_dumps(rq)
+ disklist_t *rq;
{
- int total, cur_idle;
- disk_t *diskp, *diskp_accept;
- dumper_t *dumper;
+ int cur_idle;
+ disk_t *diskp, *delayed_diskp, *diskp_accept;
assignedhd_t **holdp=NULL, **holdp_accept;
- time_t now = time(NULL);
+ const time_t now = time(NULL);
+ cmd_t cmd;
+ int result_argc;
+ char *result_argv[MAX_ARGS+1];
+ chunker_t *chunker;
+ dumper_t *dumper;
+ char dumptype;
+ char *dumporder;
- total = 0;
idle_reason = IDLE_NO_DUMPERS;
- sleep_time.tv_sec = SLEEP_MAX;
- sleep_time.tv_usec = 0;
- any_delayed_disk = 0;
+ sleep_time = 0;
- if(rq->head == NULL) {
- idle_reason = 0;
- return 0;
+ if(dumpers_ev_time != NULL) {
+ event_release(dumpers_ev_time);
+ dumpers_ev_time = NULL;
}
- /*
- * A potential problem with starting from the bottom of the dump time
- * distribution is that a slave host will have both one of the shortest
- * and one of the longest disks, so starting its shortest disk first will
- * tie up the host and eliminate its longest disk from consideration the
- * first pass through. This could cause a big delay in starting that long
- * disk, which could drag out the whole night's dumps.
- *
- * While starting from the top of the dump time distribution solves the
- * above problem, this turns out to be a bad idea, because the big dumps
- * will almost certainly pack the holding disk completely, leaving no
- * room for even one small dump to start. This ends up shutting out the
- * small-end dumpers completely (they stay idle).
- *
- * The introduction of multiple simultaneous dumps to one host alleviates
- * the biggest&smallest dumps problem: both can be started at the
- * beginning.
- */
- for(dumper = dmptable; dumper < dmptable+inparallel; dumper++) {
- if(dumper->busy || dumper->down) continue;
- /* found an idle dumper, now find a disk for it */
- diskp = rq->head;
+ for (dumper = dmptable; dumper < dmptable+inparallel; dumper++) {
+
+ if( dumper->busy ) {
+ continue;
+ }
+
+ if (dumper->ev_read != NULL) {
+/* assert(dumper->ev_read == NULL);*/
+ event_release(dumper->ev_read);
+ dumper->ev_read = NULL;
+ }
+
+ /*
+ * A potential problem with starting from the bottom of the dump time
+ * distribution is that a slave host will have both one of the shortest
+ * and one of the longest disks, so starting its shortest disk first will
+ * tie up the host and eliminate its longest disk from consideration the
+ * first pass through. This could cause a big delay in starting that long
+ * disk, which could drag out the whole night's dumps.
+ *
+ * While starting from the top of the dump time distribution solves the
+ * above problem, this turns out to be a bad idea, because the big dumps
+ * will almost certainly pack the holding disk completely, leaving no
+ * room for even one small dump to start. This ends up shutting out the
+ * small-end dumpers completely (they stay idle).
+ *
+ * The introduction of multiple simultaneous dumps to one host alleviates
+ * the biggest&smallest dumps problem: both can be started at the
+ * beginning.
+ */
+
diskp_accept = NULL;
holdp_accept = NULL;
-
- if(idle_reason == IDLE_NO_DUMPERS)
- idle_reason = NOT_IDLE;
+ delayed_diskp = NULL;
cur_idle = NOT_IDLE;
- while(diskp) {
+ dumporder = getconf_str(CNF_DUMPORDER);
+ if(strlen(dumporder) > (dumper-dmptable)) {
+ dumptype = dumporder[dumper-dmptable];
+ }
+ else {
+ if(dumper-dmptable < 3)
+ dumptype = 't';
+ else
+ dumptype = 'T';
+ }
+
+ for(diskp = rq->head; diskp != NULL; diskp = diskp->next) {
assert(diskp->host != NULL && sched(diskp) != NULL);
/* round estimate to next multiple of DISK_BLOCK_KB */
sched(diskp)->est_size = am_round(sched(diskp)->est_size,
DISK_BLOCK_KB);
- if(diskp->host->start_t > now) {
+ if (diskp->host->start_t > now) {
cur_idle = max(cur_idle, IDLE_START_WAIT);
- sleep_time.tv_sec = min(diskp->host->start_t - now,
- sleep_time.tv_sec);
- any_delayed_disk = 1;
- }
- else if(diskp->start_t > now) {
+ if (delayed_diskp == NULL || sleep_time > diskp->host->start_t) {
+ delayed_diskp = diskp;
+ sleep_time = diskp->host->start_t;
+ }
+ } else if(diskp->start_t > now) {
cur_idle = max(cur_idle, IDLE_START_WAIT);
- sleep_time.tv_sec = min(diskp->start_t - now,
- sleep_time.tv_sec);
- any_delayed_disk = 1;
- }
- else if(diskp->host->netif->curusage > 0 &&
- sched(diskp)->est_kps > free_kps(diskp->host->netif))
+ if (delayed_diskp == NULL || sleep_time > diskp->start_t) {
+ delayed_diskp = diskp;
+ sleep_time = diskp->start_t;
+ }
+ } else if (diskp->host->netif->curusage > 0 &&
+ sched(diskp)->est_kps > free_kps(diskp->host->netif)) {
cur_idle = max(cur_idle, IDLE_NO_BANDWIDTH);
- else if(sched(diskp)->no_space)
+ } else if(sched(diskp)->no_space) {
cur_idle = max(cur_idle, IDLE_NO_DISKSPACE);
- else if((holdp = find_diskspace(sched(diskp)->est_size,&cur_idle,NULL)) == NULL)
+ } else if ((holdp =
+ find_diskspace(sched(diskp)->est_size,&cur_idle,NULL)) == NULL) {
cur_idle = max(cur_idle, IDLE_NO_DISKSPACE);
- else if(diskp->no_hold) {
+ } else if (diskp->no_hold) {
free_assignedhd(holdp);
cur_idle = max(cur_idle, IDLE_NO_HOLD);
- } else if(client_constrained(diskp)) {
+ } else if (client_constrained(diskp)) {
free_assignedhd(holdp);
cur_idle = max(cur_idle, IDLE_CLIENT_CONSTRAINED);
} else {
/* disk fits, dump it */
int accept = !diskp_accept;
if(!accept) {
- char dumptype;
- char *dumporder = getconf_str(CNF_DUMPORDER);
- if(strlen(dumporder) <= (dumper-dmptable)) {
- if(dumper-dmptable < 3)
- dumptype = 't';
- else
- dumptype = 'T';
- }
- else {
- dumptype = dumporder[dumper-dmptable];
- }
switch(dumptype) {
case 's': accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size);
break;
break;
case 'B': accept = (sched(diskp)->est_kps > sched(diskp_accept)->est_kps);
break;
- default: log_add(L_WARNING, "Unknown dumporder character \'%c\', using 's'.\n",
+ default: log_add(L_WARNING, "Unknown dumporder character \'%c\', using 's'.\n",
dumptype);
accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size);
break;
free_assignedhd(holdp);
}
}
- diskp = diskp->next;
}
diskp = diskp_accept;
holdp = holdp_accept;
- if(diskp) {
- cur_idle = NOT_IDLE;
+
+ idle_reason = max(idle_reason, cur_idle);
+
+ /*
+ * If we have no disk at this point, and there are disks that
+ * are delayed, then schedule a time event to call this dumper
+ * with the disk with the shortest delay.
+ */
+ if (diskp == NULL && delayed_diskp != NULL) {
+ assert(sleep_time > now);
+ sleep_time -= now;
+ dumpers_ev_time = event_register(sleep_time, EV_TIME,
+ handle_dumpers_time, &runq);
+ return;
+ } else if (diskp != NULL) {
sched(diskp)->act_size = 0;
allocate_bandwidth(diskp->host->netif, sched(diskp)->est_kps);
sched(diskp)->activehd = assign_holdingdisk(holdp, diskp);
amfree(holdp);
- diskp->host->inprogress += 1; /* host is now busy */
+ sched(diskp)->destname = newstralloc(sched(diskp)->destname,
+ sched(diskp)->holdp[0]->destname);
+ diskp->host->inprogress++; /* host is now busy */
diskp->inprogress = 1;
sched(diskp)->dumper = dumper;
- sched(diskp)->timestamp = time((time_t *)0);
+ sched(diskp)->timestamp = now;
+ dumper->ev_read = event_register(dumper->fd, EV_READFD,
+ handle_dumper_result, dumper);
dumper->busy = 1; /* dumper is now busy */
dumper->dp = diskp; /* link disk to dumper */
- total++;
remove_disk(rq, diskp); /* take it off the run queue */
- dumper_cmd(dumper, FILE_DUMP, diskp);
- diskp->host->start_t = time(NULL) + 15;
+
+ sched(diskp)->origsize = -1;
+ sched(diskp)->dumpsize = -1;
+ sched(diskp)->dumptime = -1;
+ sched(diskp)->tapetime = -1;
+ chunker = dumper->chunker;
+ chunker->result = LAST_TOK;
+ dumper->result = LAST_TOK;
+ startup_chunk_process(chunker,chunker_program);
+ chunker->dumper = dumper;
+ chunker_cmd(chunker, PORT_WRITE, diskp);
+ cmd = getresult(chunker->fd, 1, &result_argc, result_argv, MAX_ARGS+1);
+ if(cmd != PORT) {
+ printf("driver: did not get PORT from %s for %s:%s\n",
+ chunker->name, diskp->host->hostname, diskp->name);
+ fflush(stdout);
+ return ; /* fatal problem */
+ }
+ chunker->ev_read = event_register(chunker->fd, EV_READFD,
+ handle_chunker_result, chunker);
+ dumper->output_port = atoi(result_argv[2]);
+
+ dumper_cmd(dumper, PORT_DUMP, diskp);
+
+ diskp->host->start_t = now + 15;
+ } else if (/* cur_idle != NOT_IDLE && */
+ (num_busy_dumpers() > 0 || taper_busy)) {
+ /*
+ * We are constrained.
+ */
}
- idle_reason = max(idle_reason, cur_idle);
}
- return total;
-}
-
-int sort_by_priority_reversed(a, b)
-disk_t *a, *b;
-{
- if(sched(b)->priority - sched(a)->priority != 0)
- return sched(b)->priority - sched(a)->priority;
- else
- return sort_by_time(a, b);
}
-int sort_by_time(a, b)
-disk_t *a, *b;
+/*
+ * This gets called when a dumper is delayed for some reason. It may
+ * be because a disk has a delayed start, or amanda is constrained
+ * by network or disk limits.
+ */
+static void
+handle_dumpers_time(cookie)
+ void *cookie;
{
- long diff;
-
- if ((diff = sched(a)->est_time - sched(b)->est_time) < 0) {
- return -1;
- } else if (diff > 0) {
- return 1;
- } else {
- return 0;
- }
+ disklist_t *runq = cookie;
+ event_release(dumpers_ev_time);
+ dumpers_ev_time = NULL;
+ start_some_dumps(runq);
}
-void dump_schedule(qp, str)
-disklist_t *qp;
-char *str;
+static void
+dump_schedule(qp, str)
+ disklist_t *qp;
+ char *str;
{
disk_t *dp;
printf("--------\n");
}
-
-void start_degraded_mode(queuep)
-disklist_t *queuep;
+static void
+start_degraded_mode(queuep)
+ disklist_t *queuep;
{
disk_t *dp;
disklist_t newq;
unsigned long est_full_size;
+ if (taper_ev_read != NULL) {
+ event_release(taper_ev_read);
+ taper_ev_read = NULL;
+ }
+
newq.head = newq.tail = 0;
dump_schedule(queuep, "before start degraded mode");
if(sched(dp)->level != 0)
/* go ahead and do the disk as-is */
- insert_disk(&newq, dp, sort_by_priority_reversed);
+ enqueue_disk(&newq, dp);
else {
if (reserved_space + est_full_size + sched(dp)->est_size
<= total_disksize) {
- insert_disk(&newq, dp, sort_by_priority_reversed);
+ enqueue_disk(&newq, dp);
est_full_size += sched(dp)->est_size;
}
else if(sched(dp)->degr_level != -1) {
sched(dp)->est_size = sched(dp)->degr_size;
sched(dp)->est_time = sched(dp)->degr_time;
sched(dp)->est_kps = sched(dp)->degr_kps;
- insert_disk(&newq, dp, sort_by_priority_reversed);
+ enqueue_disk(&newq, dp);
}
else {
- log_add(L_FAIL, "%s %s %s %d [can't switch to incremental dump]",
- dp->host->hostname, dp->name,
- sched(dp)->datestamp, sched(dp)->level);
+ log_add(L_FAIL,"%s %s %s %d [can't switch to incremental dump]",
+ dp->host->hostname, dp->name, sched(dp)->datestamp,
+ sched(dp)->level);
}
}
}
dump_schedule(queuep, "after start degraded mode");
}
-void continue_dumps()
+
+static void continue_port_dumps()
{
-disk_t *dp, *ndp;
-assignedhd_t **h;
-int active_dumpers=0, busy_dumpers=0, i;
-dumper_t *dumper;
+ disk_t *dp, *ndp;
+ assignedhd_t **h;
+ int active_dumpers=0, busy_dumpers=0, i;
+ dumper_t *dumper;
/* First we try to grant diskspace to some dumps waiting for it. */
for( dp = roomq.head; dp; dp = ndp ) {
/* find last holdingdisk used by this dump */
for( i = 0, h = sched(dp)->holdp; h[i+1]; i++ );
/* find more space */
- h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size, &active_dumpers, h[i] );
+ h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size,
+ &active_dumpers, h[i] );
if( h ) {
for(dumper = dmptable; dumper < dmptable + inparallel &&
dumper->dp != dp; dumper++);
assert( dumper < dmptable + inparallel );
sched(dp)->activehd = assign_holdingdisk( h, dp );
- dumper_cmd( dumper, CONTINUE, dp );
+ chunker_cmd( dumper->chunker, CONTINUE, dp );
amfree(h);
remove_disk( &roomq, dp );
}
* it will be dumped directly to tape. Actually, case c is a special
* manifestation of case b) where only one dumper is busy.
*/
- for( dp=NULL, dumper = dmptable; dumper < dmptable + inparallel; dumper++) {
+ for(dp=NULL, dumper = dmptable; dumper < (dmptable+inparallel); dumper++) {
if( dumper->busy ) {
busy_dumpers++;
if( !find_disk(&roomq, dumper->dp) ) {
active_dumpers++;
- } else if( !dp || sched(dp)->est_size > sched(dumper->dp)->est_size ) {
+ } else if( !dp ||
+ sched(dp)->est_size > sched(dumper->dp)->est_size ) {
dp = dumper->dp;
}
}
}
- if( !active_dumpers && busy_dumpers > 0 &&
+ if((dp != NULL) && (active_dumpers == 0) && (busy_dumpers > 0) &&
((!taper_busy && empty(tapeq)) || degraded_mode) &&
pending_aborts == 0 ) { /* not case a */
if( busy_dumpers == 1 ) { /* case c */
* We abort that dump, hopefully not wasting too much time retrying it.
*/
remove_disk( &roomq, dp );
+ chunker_cmd( sched(dp)->dumper->chunker, ABORT, NULL );
dumper_cmd( sched(dp)->dumper, ABORT, NULL );
pending_aborts++;
}
}
-void handle_taper_result()
+
+static void
+handle_taper_result(void *cookie)
{
disk_t *dp;
int filenum;
cmd_t cmd;
int result_argc;
char *result_argv[MAX_ARGS+1];
+ int avail_tapes = 0;
+
+ assert(cookie == NULL);
+
+ do {
+
+ short_dump_state();
+
+ cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1);
+
+ switch(cmd) {
+
+ case PARTIAL:
+ case DONE: /* DONE <handle> <label> <tape file> <err mess> */
+ if(result_argc != 5) {
+ error("error: [taper DONE result_argc != 5: %d", result_argc);
+ }
+
+ dp = serial2disk(result_argv[2]);
+ free_serial(result_argv[2]);
+
+ filenum = atoi(result_argv[4]);
+ if(cmd == DONE) {
+ update_info_taper(dp, result_argv[3], filenum,
+ sched(dp)->level);
+ }
+
+ delete_diskspace(dp);
+
+ printf("driver: finished-cmd time %s taper wrote %s:%s\n",
+ walltime_str(curclock()), dp->host->hostname, dp->name);
+ fflush(stdout);
+
+ amfree(sched(dp)->destname);
+ amfree(sched(dp)->dumpdate);
+ amfree(sched(dp)->degr_dumpdate);
+ amfree(sched(dp)->datestamp);
+ amfree(dp->up);
+
+ taper_busy = 0;
+ taper_disk = NULL;
+ startaflush();
+
+ /* continue with those dumps waiting for diskspace */
+ continue_port_dumps();
+ break;
+
+ case TRYAGAIN: /* TRY-AGAIN <handle> <err mess> */
+ if (result_argc < 2) {
+ error("error [taper TRYAGAIN result_argc < 2: %d]",
+ result_argc);
+ }
+ dp = serial2disk(result_argv[2]);
+ free_serial(result_argv[2]);
+ printf("driver: taper-tryagain time %s disk %s:%s\n",
+ walltime_str(curclock()), dp->host->hostname, dp->name);
+ fflush(stdout);
+
+ /* See how many tapes we have left, but we alwyays
+ retry once (why?) */
+ current_tape++;
+ if(dp->tape_splitsize > 0)
+ avail_tapes = conf_runtapes - current_tape;
+ else
+ avail_tapes = 0;
+
+ if(sched(dp)->attempted > avail_tapes) {
+ log_add(L_FAIL, "%s %s %s %d [too many taper retries]",
+ dp->host->hostname, dp->name, sched(dp)->datestamp,
+ sched(dp)->level);
+ printf("driver: taper failed %s %s %s, too many taper retry\n",
+ result_argv[2], dp->host->hostname, dp->name);
+ }
+ else {
+ /* Re-insert into taper queue. */
+ sched(dp)->attempted++;
+ headqueue_disk(&tapeq, dp);
+ }
+
+ tape_left = tape_length;
+
+ /* run next thing from queue */
+
+ taper_busy = 0;
+ taper_disk = NULL;
+ startaflush();
+ continue_port_dumps();
+ break;
+
+ case SPLIT_CONTINUE: /* SPLIT_CONTINUE <handle> <new_label> */
+ if (result_argc != 3) {
+ error("error [taper SPLIT_CONTINUE result_argc != 3: %d]",
+ result_argc);
+ }
+
+ break;
+ case SPLIT_NEEDNEXT: /* SPLIT-NEEDNEXT <handle> <kb written> */
+ if (result_argc != 3) {
+ error("error [taper SPLIT_NEEDNEXT result_argc != 3: %d]",
+ result_argc);
+ }
+
+ /* Update our tape counter and reset tape_left */
+ current_tape++;
+ tape_left = tape_length;
+
+ /* Reduce the size of the dump by amount written and reduce
+ tape_left by the amount left over */
+ dp = serial2disk(result_argv[2]);
+ sched(dp)->act_size -= atoi(result_argv[3]);
+ if (sched(dp)->act_size < tape_left)
+ tape_left -= sched(dp)->act_size;
+ else
+ tape_length = 0;
+
+ break;
+
+ case TAPE_ERROR: /* TAPE-ERROR <handle> <err mess> */
+ dp = serial2disk(result_argv[2]);
+ free_serial(result_argv[2]);
+ printf("driver: finished-cmd time %s taper wrote %s:%s\n",
+ walltime_str(curclock()), dp->host->hostname, dp->name);
+ fflush(stdout);
+ log_add(L_WARNING, "Taper error: %s", result_argv[3]);
+ /* FALLSTHROUGH */
+
+ case BOGUS:
+ if (cmd == BOGUS) {
+ log_add(L_WARNING, "Taper protocol error");
+ }
+ /*
+ * Since we've gotten a taper error, we can't send anything more
+ * to the taper. Go into degraded mode to try to get everthing
+ * onto disk. Later, these dumps can be flushed to a new tape.
+ * The tape queue is zapped so that it appears empty in future
+ * checks. If there are dumps waiting for diskspace to be freed,
+ * cancel one.
+ */
+ if(!nodump) {
+ log_add(L_WARNING,
+ "going into degraded mode because of taper component error.");
+ start_degraded_mode(&runq);
+ }
+ tapeq.head = tapeq.tail = NULL;
+ taper_busy = 0;
+ taper_disk = NULL;
+ if(taper_ev_read != NULL) {
+ event_release(taper_ev_read);
+ taper_ev_read = NULL;
+ }
+ if(cmd != TAPE_ERROR) aclose(taper);
+ continue_port_dumps();
+ break;
+
+ default:
+ error("driver received unexpected token (%s) from taper",
+ cmdstr[cmd]);
+ }
+ /*
+ * Wakeup any dumpers that are sleeping because of network
+ * or disk constraints.
+ */
+ start_some_dumps(&runq);
+
+ } while(areads_dataready(taper));
+}
- cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1);
+static dumper_t *
+idle_dumper()
+{
+ dumper_t *dumper;
- switch(cmd) {
+ for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
+ if(!dumper->busy && !dumper->down) return dumper;
- case DONE: /* DONE <handle> <label> <tape file> <err mess> */
- if(result_argc != 5) {
- error("error: [taper DONE result_argc != 5: %d", result_argc);
- }
+ return NULL;
+}
- dp = serial2disk(result_argv[2]);
- free_serial(result_argv[2]);
+static int
+num_busy_dumpers()
+{
+ dumper_t *dumper;
+ int n;
- filenum = atoi(result_argv[4]);
- update_info_taper(dp, result_argv[3], filenum, sched(dp)->level);
+ n = 0;
+ for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
+ if(dumper->busy) n += 1;
- delete_diskspace(dp);
+ return n;
+}
- printf("driver: finished-cmd time %s taper wrote %s:%s\n",
- walltime_str(curclock()), dp->host->hostname, dp->name);
- fflush(stdout);
- amfree(sched(dp)->dumpdate);
- amfree(sched(dp)->degr_dumpdate);
- amfree(sched(dp)->datestamp);
- amfree(dp->up);
+static void
+dumper_result(dp)
+ disk_t *dp;
+{
+ dumper_t *dumper;
+ chunker_t *chunker;
+ assignedhd_t **h=NULL;
+ int activehd, i, dummy;
+ long size;
+ int is_partial;
- taper_busy = 0;
- taper_disk = NULL;
- startaflush();
- continue_dumps(); /* continue with those dumps waiting for diskspace */
- break;
+ dumper = sched(dp)->dumper;
+ chunker = dumper->chunker;
- case TRYAGAIN: /* TRY-AGAIN <handle> <err mess> */
- if (result_argc < 2) {
- error("error [taper TRYAGAIN result_argc < 2: %d]", result_argc);
- }
- dp = serial2disk(result_argv[2]);
- free_serial(result_argv[2]);
- printf("driver: taper-tryagain time %s disk %s:%s\n",
- walltime_str(curclock()), dp->host->hostname, dp->name);
- fflush(stdout);
+ free_serial_dp(dp);
- /* re-insert into taper queue */
+ h = sched(dp)->holdp;
+ activehd = sched(dp)->activehd;
- if(sched(dp)->attempted) {
- log_add(L_FAIL, "%s %s %d %s [too many taper retries]",
- dp->host->hostname, dp->name, sched(dp)->level,
- sched(dp)->datestamp);
- printf("driver: taper failed %s %s %s, too many taper retry\n", result_argv[2], dp->host->hostname, dp->name);
- }
- else {
- sched(dp)->attempted++;
- headqueue_disk(&tapeq, dp);
- }
+ if(dumper->result == DONE && chunker->result == DONE) {
+ update_info_dumper(dp, sched(dp)->origsize,
+ sched(dp)->dumpsize, sched(dp)->dumptime);
+ }
- tape_left = tape_length;
+ deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
- /* run next thing from queue */
- taper_busy = 0;
- taper_disk = NULL;
- startaflush();
- continue_dumps(); /* continue with those dumps waiting for diskspace */
+ is_partial = dumper->result != DONE || chunker->result != DONE;
+ rename_tmp_holding(sched(dp)->destname, !is_partial);
- break;
+ dummy = 0;
+ for( i = 0, h = sched(dp)->holdp; i < activehd; i++ ) {
+ dummy += h[i]->used;
+ }
- case TAPE_ERROR: /* TAPE-ERROR <handle> <err mess> */
- dp = serial2disk(result_argv[2]);
- free_serial(result_argv[2]);
- printf("driver: finished-cmd time %s taper wrote %s:%s\n",
- walltime_str(curclock()), dp->host->hostname, dp->name);
- fflush(stdout);
- /* Note: fall through code... */
+ size = size_holding_files(sched(dp)->destname, 0);
+ h[activehd]->used = size - dummy;
+ holdalloc(h[activehd]->disk)->allocated_dumpers--;
+ adjust_diskspace(dp, DONE);
- case BOGUS:
- /*
- * Since we've gotten a tape error, we can't send anything more
- * to the taper. Go into degraded mode to try to get everthing
- * onto disk. Later, these dumps can be flushed to a new tape.
- * The tape queue is zapped so that it appears empty in future
- * checks. If there are dumps waiting for diskspace to be freed,
- * cancel one.
- */
- if(!nodump) {
- log_add(L_WARNING,
- "going into degraded mode because of tape error.");
- }
- start_degraded_mode(&runq);
- taper_busy = 0;
- taper_disk = NULL;
- tapeq.head = tapeq.tail = NULL;
- FD_CLR(taper,&readset);
- if(cmd != TAPE_ERROR) aclose(taper);
- continue_dumps();
- break;
- default:
- error("driver received unexpected token (%d) from taper", cmd);
+ sched(dp)->attempted += 1;
+
+ if((dumper->result != DONE || chunker->result != DONE) &&
+ sched(dp)->attempted <= 1) {
+ delete_diskspace(dp);
+ enqueue_disk(&runq, dp);
+ }
+ else if(size > DISK_BLOCK_KB) {
+ sched(dp)->attempted = 0;
+ enqueue_disk(&tapeq, dp);
+ startaflush();
+ }
+ else {
+ delete_diskspace(dp);
}
+
+ dumper->busy = 0;
+ dp->host->inprogress -= 1;
+ dp->inprogress = 0;
+
+ waitpid(chunker->pid, NULL, 0 );
+ aclose(chunker->fd);
+ chunker->fd = -1;
+ chunker->down = 1;
+
+ dp = NULL;
+ continue_port_dumps();
+ /*
+ * Wakeup any dumpers that are sleeping because of network
+ * or disk constraints.
+ */
+ start_some_dumps(&runq);
}
-dumper_t *idle_dumper()
+static void
+handle_dumper_result(cookie)
+ void *cookie;
{
- dumper_t *dumper;
+ /*static int pending_aborts = 0;*/
+ dumper_t *dumper = cookie;
+ disk_t *dp, *sdp;
+ cmd_t cmd;
+ int result_argc;
+ char *result_argv[MAX_ARGS+1];
- for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
- if(!dumper->busy && !dumper->down) return dumper;
+ assert(dumper != NULL);
+ dp = dumper->dp;
+ assert(dp != NULL && sched(dp) != NULL);
- return NULL;
-}
+ do {
-int some_dumps_in_progress()
-{
- dumper_t *dumper;
+ short_dump_state();
- for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
- if(dumper->busy) return 1;
+ cmd = getresult(dumper->fd, 1, &result_argc, result_argv, MAX_ARGS+1);
- return taper_busy;
-}
+ if(cmd != BOGUS) {
+ /* result_argv[2] always contains the serial number */
+ sdp = serial2disk(result_argv[2]);
+ assert(sdp == dp);
+ }
-int num_busy_dumpers()
-{
- dumper_t *dumper;
- int n;
+ switch(cmd) {
- n = 0;
- for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
- if(dumper->busy) n += 1;
+ case DONE: /* DONE <handle> <origsize> <dumpsize> <dumptime> <errstr> */
+ if(result_argc != 6) {
+ error("error [dumper DONE result_argc != 6: %d]", result_argc);
+ }
- return n;
-}
+ /*free_serial(result_argv[2]);*/
-dumper_t *lookup_dumper(fd)
-int fd;
-{
- dumper_t *dumper;
+ sched(dp)->origsize = (long)atof(result_argv[3]);
+ sched(dp)->dumptime = (long)atof(result_argv[5]);
- for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
- if(dumper->outfd == fd) return dumper;
+ printf("driver: finished-cmd time %s %s dumped %s:%s\n",
+ walltime_str(curclock()), dumper->name,
+ dp->host->hostname, dp->name);
+ fflush(stdout);
- return NULL;
+ dumper->result = cmd;
+
+ break;
+
+ case TRYAGAIN: /* TRY-AGAIN <handle> <errstr> */
+ /*
+ * Requeue this disk, and fall through to the FAILED
+ * case for cleanup.
+ */
+ if(sched(dp)->attempted) {
+ log_add(L_FAIL, "%s %s %s %d [too many dumper retry: %s]",
+ dp->host->hostname, dp->name, sched(dp)->datestamp,
+ sched(dp)->level, result_argv[3]);
+ printf("driver: dump failed %s %s %s, too many dumper retry: %s\n",
+ result_argv[2], dp->host->hostname, dp->name,
+ result_argv[3]);
+ }
+ /* FALLTHROUGH */
+ case FAILED: /* FAILED <handle> <errstr> */
+ /*free_serial(result_argv[2]);*/
+ dumper->result = cmd;
+ break;
+
+ case ABORT_FINISHED: /* ABORT-FINISHED <handle> */
+ /*
+ * We sent an ABORT from the NO-ROOM case because this dump
+ * wasn't going to fit onto the holding disk. We now need to
+ * clean up the remains of this image, and try to finish
+ * other dumps that are waiting on disk space.
+ */
+ assert(pending_aborts);
+ /*free_serial(result_argv[2]);*/
+ dumper->result = cmd;
+ break;
+
+ case BOGUS:
+ /* either EOF or garbage from dumper. Turn it off */
+ log_add(L_WARNING, "%s pid %ld is messed up, ignoring it.\n",
+ dumper->name, (long)dumper->pid);
+ event_release(dumper->ev_read);
+ dumper->ev_read = NULL;
+ aclose(dumper->fd);
+ dumper->busy = 0;
+ dumper->down = 1; /* mark it down so it isn't used again */
+ if(dp) {
+ /* if it was dumping something, zap it and try again */
+ if(sched(dp)->attempted) {
+ log_add(L_FAIL, "%s %s %s %d [%s died]",
+ dp->host->hostname, dp->name, sched(dp)->datestamp,
+ sched(dp)->level, dumper->name);
+ }
+ else {
+ log_add(L_WARNING, "%s died while dumping %s:%s lev %d.",
+ dumper->name, dp->host->hostname, dp->name,
+ sched(dp)->level);
+ }
+ }
+ dumper->result = cmd;
+ break;
+
+ default:
+ assert(0);
+ }
+ /* send the dumper result to the chunker */
+ if(dumper->chunker->down == 0 && dumper->chunker->fd != -1) {
+ if(cmd == DONE) {
+ chunker_cmd(dumper->chunker, DONE, dp);
+ }
+ else {
+ chunker_cmd(dumper->chunker, FAILED, dp);
+ }
+ }
+
+ if(dumper->result != LAST_TOK && dumper->chunker->result != LAST_TOK)
+ dumper_result(dp);
+
+ } while(areads_dataready(dumper->fd));
}
-void handle_dumper_result(fd)
- int fd;
+static void
+handle_chunker_result(cookie)
+ void *cookie;
{
+ /*static int pending_aborts = 0;*/
+ chunker_t *chunker = cookie;
assignedhd_t **h=NULL;
dumper_t *dumper;
disk_t *dp, *sdp;
- long origsize;
- long dumpsize;
- long dumptime;
cmd_t cmd;
int result_argc;
char *result_argv[MAX_ARGS+1];
- int i, dummy;
+ int dummy;
int activehd = -1;
- dumper = lookup_dumper(fd);
+
+ assert(chunker != NULL);
+ dumper = chunker->dumper;
+ assert(dumper != NULL);
dp = dumper->dp;
- assert(dp && sched(dp) && sched(dp)->destname);
+ assert(dp != NULL);
+ assert(sched(dp) != NULL);
+ assert(sched(dp)->destname != NULL);
+ assert(dp != NULL && sched(dp) != NULL && sched(dp)->destname);
if(dp && sched(dp) && sched(dp)->holdp) {
h = sched(dp)->holdp;
activehd = sched(dp)->activehd;
}
- cmd = getresult(fd, 1, &result_argc, result_argv, MAX_ARGS+1);
+ do {
- if(cmd != BOGUS) {
- sdp = serial2disk(result_argv[2]); /* result_argv[2] always contains the serial number */
- assert(sdp == dp);
- }
+ short_dump_state();
- switch(cmd) {
+ cmd = getresult(chunker->fd, 1, &result_argc, result_argv, MAX_ARGS+1);
- case DONE: /* DONE <handle> <origsize> <dumpsize> <dumptime> <err str> */
- if(result_argc != 6) {
- error("error [dumper DONE result_argc != 6: %d]", result_argc);
+ if(cmd != BOGUS) {
+ /* result_argv[2] always contains the serial number */
+ sdp = serial2disk(result_argv[2]);
+ assert(sdp == dp);
}
- free_serial(result_argv[2]);
-
- origsize = (long)atof(result_argv[3]);
- dumpsize = (long)atof(result_argv[4]);
- dumptime = (long)atof(result_argv[5]);
- update_info_dumper(dp, origsize, dumpsize, dumptime);
+ switch(cmd) {
- /* adjust holdp[active]->used using the real dumpsize and all other
- * holdp[i]->used as an estimate.
- */
+ case PARTIAL: /* PARTIAL <handle> <dumpsize> <errstr> */
+ case DONE: /* DONE <handle> <dumpsize> <errstr> */
+ if(result_argc != 4) {
+ error("error [chunker %s result_argc != 4: %d]", cmdstr[cmd],
+ result_argc);
+ }
+ /*free_serial(result_argv[2]);*/
- dummy = 0;
- for( i = 0, h = sched(dp)->holdp; i < activehd; i++ ) {
- dummy += h[i]->used;
- }
+ sched(dp)->dumpsize = (long)atof(result_argv[3]);
- rename_tmp_holding(sched(dp)->destname, 1);
- assert( h && activehd >= 0 );
- h[activehd]->used = size_holding_files(sched(dp)->destname) - dummy;
- deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
- holdalloc(h[activehd]->disk)->allocated_dumpers--;
- adjust_diskspace(dp, DONE);
- dumper->busy = 0;
- dp->host->inprogress -= 1;
- dp->inprogress = 0;
- sched(dp)->attempted = 0;
- printf("driver: finished-cmd time %s %s dumped %s:%s\n",
- walltime_str(curclock()), dumper->name,
- dp->host->hostname, dp->name);
- fflush(stdout);
+ printf("driver: finished-cmd time %s %s chunked %s:%s\n",
+ walltime_str(curclock()), chunker->name,
+ dp->host->hostname, dp->name);
+ fflush(stdout);
- enqueue_disk(&tapeq, dp);
- dp = NULL;
+ event_release(chunker->ev_read);
- startaflush();
- continue_dumps();
+ chunker->result = cmd;
- break;
+ break;
- case TRYAGAIN: /* TRY-AGAIN <handle> <err str> */
- case FATAL_TRYAGAIN:
- free_serial(result_argv[2]);
+ case TRYAGAIN: /* TRY-AGAIN <handle> <errstr> */
+ assert(0);
+ event_release(chunker->ev_read);
- rename_tmp_holding(sched(dp)->destname, 0);
- deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
- assert( h && activehd >= 0 );
- holdalloc(h[activehd]->disk)->allocated_dumpers--;
- /* Because we don't know how much was written to disk the
- * following functions *must* be called together!
- */
- adjust_diskspace(dp, DONE);
- delete_diskspace(dp);
- dumper->busy = 0;
- dp->host->inprogress -= 1;
- dp->inprogress = 0;
-
- if(sched(dp)->attempted) {
- log_add(L_FAIL, "%s %s %d %s [too many dumper retry]",
- dp->host->hostname, dp->name,
- sched(dp)->level, sched(dp)->datestamp);
- printf("driver: dump failed %s %s %s, too many dumper retry\n", result_argv[2], dp->host->hostname, dp->name);
- } else {
- sched(dp)->attempted++;
- enqueue_disk(&runq, dp);
- }
- continue_dumps();
+ break;
+ case FAILED: /* FAILED <handle> <errstr> */
+ /*free_serial(result_argv[2]);*/
- if(cmd == FATAL_TRYAGAIN) {
- /* dumper is confused, start another */
- log_add(L_WARNING, "%s (pid %ld) confused, restarting it.",
- dumper->name, (long)dumper->pid);
- FD_CLR(fd,&readset);
- aclose(fd);
- startup_dump_process(dumper, dumper_program);
- }
- /* sleep in case the dumper failed because of a temporary network
- problem, as NIS or NFS... */
- sleep(15);
- break;
+ event_release(chunker->ev_read);
- case FAILED: /* FAILED <handle> <errstr> */
- free_serial(result_argv[2]);
+ chunker->result = cmd;
- rename_tmp_holding(sched(dp)->destname, 0);
- deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
- assert( h && activehd >= 0 );
- holdalloc(h[activehd]->disk)->allocated_dumpers--;
- /* Because we don't know how much was written to disk the
- * following functions *must* be called together!
- */
- adjust_diskspace(dp, DONE);
- delete_diskspace(dp);
- dumper->busy = 0;
- dp->host->inprogress -= 1;
- dp->inprogress = 0;
- continue_dumps();
-
- /* no need to log this, dumper will do it */
- /* sleep in case the dumper failed because of a temporary network
- problem, as NIS or NFS... */
- sleep(15);
- break;
+ break;
- case NO_ROOM: /* NO-ROOM <handle> <missing_size> */
- assert( h && activehd >= 0 );
- h[activehd]->used -= atoi(result_argv[3]);
- h[activehd]->reserved -= atoi(result_argv[3]);
- holdalloc(h[activehd]->disk)->allocated_space -= atoi(result_argv[3]);
- h[activehd]->disk->disksize -= atoi(result_argv[3]);
- break;
+ case NO_ROOM: /* NO-ROOM <handle> <missing_size> */
+ assert( h && activehd >= 0 );
+ h[activehd]->used -= atoi(result_argv[3]);
+ h[activehd]->reserved -= atoi(result_argv[3]);
+ holdalloc(h[activehd]->disk)->allocated_space -= atoi(result_argv[3]);
+ h[activehd]->disk->disksize -= atoi(result_argv[3]);
+ break;
- case RQ_MORE_DISK: /* RQ-MORE-DISK <handle> */
- assert( h && activehd >= 0 );
- holdalloc(h[activehd]->disk)->allocated_dumpers--;
- h[activehd]->used = h[activehd]->reserved;
- if( h[++activehd] ) { /* There's still some allocated space left. Tell
- * the dumper about it. */
- sched(dp)->activehd++;
- dumper_cmd( dumper, CONTINUE, dp );
- } else { /* !h[++activehd] - must allocate more space */
- sched(dp)->act_size = sched(dp)->est_size; /* not quite true */
- sched(dp)->est_size = sched(dp)->act_size * 21 / 20; /* +5% */
- sched(dp)->est_size = am_round(sched(dp)->est_size, DISK_BLOCK_KB);
- h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size,
- &dummy,
- h[activehd-1] );
- if( !h ) {
- /* cur_idle = max(cur_idle, IDLE_NO_DISKSPACE); */
- /* No diskspace available. The reason for this will be
- * determined in continue_dumps(). */
- enqueue_disk( &roomq, dp );
- continue_dumps();
- } else {
- /* OK, allocate space for disk and have dumper continue */
- sched(dp)->activehd = assign_holdingdisk( h, dp );
- dumper_cmd( dumper, CONTINUE, dp );
- amfree(h);
+ case RQ_MORE_DISK: /* RQ-MORE-DISK <handle> */
+ assert( h && activehd >= 0 );
+ holdalloc(h[activehd]->disk)->allocated_dumpers--;
+ h[activehd]->used = h[activehd]->reserved;
+ if( h[++activehd] ) { /* There's still some allocated space left.
+ * Tell the dumper about it. */
+ sched(dp)->activehd++;
+ chunker_cmd( chunker, CONTINUE, dp );
+ } else { /* !h[++activehd] - must allocate more space */
+ sched(dp)->act_size = sched(dp)->est_size; /* not quite true */
+ sched(dp)->est_size = sched(dp)->act_size * 21 / 20; /* +5% */
+ sched(dp)->est_size = am_round(sched(dp)->est_size, DISK_BLOCK_KB);
+ h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size,
+ &dummy,
+ h[activehd-1] );
+ if( !h ) {
+ /* No diskspace available. The reason for this will be
+ * determined in continue_port_dumps(). */
+ enqueue_disk( &roomq, dp );
+ continue_port_dumps();
+ } else {
+ /* OK, allocate space for disk and have chunker continue */
+ sched(dp)->activehd = assign_holdingdisk( h, dp );
+ chunker_cmd( chunker, CONTINUE, dp );
+ amfree(h);
+ }
}
- }
- break;
+ break;
- case ABORT_FINISHED: /* ABORT-FINISHED <handle> */
- assert(pending_aborts);
- free_serial(result_argv[2]);
+ case ABORT_FINISHED: /* ABORT-FINISHED <handle> */
+ /*
+ * We sent an ABORT from the NO-ROOM case because this dump
+ * wasn't going to fit onto the holding disk. We now need to
+ * clean up the remains of this image, and try to finish
+ * other dumps that are waiting on disk space.
+ */
+ /*assert(pending_aborts);*/
- rename_tmp_holding(sched(dp)->destname, 0);
- deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
- /* Because we don't know how much was written to disk the
- * following functions *must* be called together!
- */
- adjust_diskspace(dp, DONE);
- delete_diskspace(dp);
- sched(dp)->attempted++;
- enqueue_disk(&runq, dp); /* we'll try again later */
- dumper->busy = 0;
- dp->host->inprogress -= 1;
- dp->inprogress = 0;
- dp = NULL;
- pending_aborts--;
- continue_dumps();
- break;
+ /*free_serial(result_argv[2]);*/
- case BOGUS:
- /* either EOF or garbage from dumper. Turn it off */
- log_add(L_WARNING, "%s pid %ld is messed up, ignoring it.\n",
- dumper->name, (long)dumper->pid);
- FD_CLR(fd,&readset);
- aclose(fd);
- dumper->busy = 0;
- dumper->down = 1; /* mark it down so it isn't used again */
- if(dp) {
- /* if it was dumping something, zap it and try again */
- rename_tmp_holding(sched(dp)->destname, 0);
- deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
- assert( h && activehd >= 0 );
- holdalloc(h[activehd]->disk)->allocated_dumpers--;
- /* Because we don't know how much was written to disk the
- * following functions *must* be called together!
- */
- adjust_diskspace(dp, DONE);
- delete_diskspace(dp);
- dp->host->inprogress -= 1;
- dp->inprogress = 0;
- if(sched(dp)->attempted) {
- log_add(L_FAIL, "%s %s %d %s [%s died]",
- dp->host->hostname, dp->name,
- sched(dp)->level, sched(dp)->datestamp, dumper->name);
- }
- else {
- log_add(L_WARNING, "%s died while dumping %s:%s lev %d.",
- dumper->name, dp->host->hostname, dp->name,
- sched(dp)->level);
- sched(dp)->attempted++;
- enqueue_disk(&runq, dp);
+ event_release(chunker->ev_read);
+
+ chunker->result = cmd;
+
+ break;
+
+ case BOGUS:
+ /* either EOF or garbage from chunker. Turn it off */
+ log_add(L_WARNING, "%s pid %ld is messed up, ignoring it.\n",
+ chunker->name, (long)chunker->pid);
+
+ if(dp) {
+ /* if it was dumping something, zap it and try again */
+ assert( h && activehd >= 0 );
+ if(sched(dp)->attempted) {
+ log_add(L_FAIL, "%s %s %s %d [%s died]",
+ dp->host->hostname, dp->name, sched(dp)->datestamp,
+ sched(dp)->level, chunker->name);
+ }
+ else {
+ log_add(L_WARNING, "%s died while dumping %s:%s lev %d.",
+ chunker->name, dp->host->hostname, dp->name,
+ sched(dp)->level);
+ }
+ dp = NULL;
}
- dp = NULL;
- continue_dumps();
+
+ event_release(chunker->ev_read);
+
+ chunker->result = cmd;
+
+ break;
+
+ default:
+ assert(0);
}
- break;
- default:
- assert(0);
- }
+ if(chunker->result != LAST_TOK && chunker->dumper->result != LAST_TOK)
+ dumper_result(dp);
- return;
+ } while(areads_dataready(chunker->fd));
}
-void read_flush(tapeqp)
-disklist_t *tapeqp;
+static disklist_t
+read_flush()
{
sched_t *sp;
disk_t *dp;
char *command;
char *s;
int ch;
- long flush_size = 0;
+ disklist_t tq;
- /* read schedule from stdin */
+ tq.head = tq.tail = NULL;
for(line = 0; (inpline = agets(stdin)) != NULL; free(inpline)) {
line++;
s = inpline;
ch = *s++;
- skip_whitespace(s, ch); /* find the command */
+ skip_whitespace(s, ch); /* find the command */
if(ch == '\0') {
- error("Aflush line %d: syntax error", line);
+ error("flush line %d: syntax error (no command)", line);
continue;
}
command = s - 1;
}
if(strcmp(command,"FLUSH") != 0) {
- error("Bflush line %d: syntax error", line);
+ error("flush line %d: syntax error (%s != FLUSH)", line, command);
continue;
}
skip_whitespace(s, ch); /* find the hostname */
if(ch == '\0') {
- error("Cflush line %d: syntax error", line);
+ error("flush line %d: syntax error (no hostname)", line);
continue;
}
hostname = s - 1;
skip_whitespace(s, ch); /* find the diskname */
if(ch == '\0') {
- error("Cflush line %d: syntax error", line);
+ error("flush line %d: syntax error (no diskname)", line);
continue;
}
diskname = s - 1;
skip_whitespace(s, ch); /* find the datestamp */
if(ch == '\0') {
- error("Cflush line %d: syntax error", line);
+ error("flush line %d: syntax error (no datestamp)", line);
continue;
}
datestamp = s - 1;
skip_whitespace(s, ch); /* find the level number */
if(ch == '\0' || sscanf(s - 1, "%d", &level) != 1) {
- error("Cflush line %d: syntax error", line);
+ error("flush line %d: syntax error (bad level)", line);
continue;
}
skip_integer(s, ch);
skip_whitespace(s, ch); /* find the filename */
if(ch == '\0') {
- error("Cflush line %d: syntax error", line);
+ error("flush line %d: syntax error (no filename)", line);
continue;
}
destname = s - 1;
sp->degr_level = -1;
sp->est_kps = 10;
sp->attempted = 0;
- sp->act_size = size_holding_files(destname);
- /*sp->holdp = NULL; JLM: must be build*/
+ sp->act_size = size_holding_files(destname, 0);
sp->holdp = build_diskspace(destname);
- if(sp->holdp == NULL) continue;
+ if(sp->holdp == NULL) continue;
sp->dumper = NULL;
sp->timestamp = (time_t)0;
dp1->up = (char *)sp;
- enqueue_disk(tapeqp, dp1);
- flush_size += sp->act_size;
+ enqueue_disk(&tq, dp1);
}
- printf("driver: flush size %ld\n", flush_size);
amfree(inpline);
-}
+ return tq;
+}
-void read_schedule(waitqp, runqp)
-disklist_t *waitqp, *runqp;
+static void
+read_schedule(cookie)
+ void *cookie;
{
sched_t *sp;
disk_t *dp;
+ disklist_t rq;
int level, line, priority;
char *dumpdate, *degr_dumpdate;
int degr_level;
char *command;
char *s;
int ch;
+ long flush_size = 0;
+
+ rq.head = rq.tail = NULL;
+
+ event_release(schedule_ev_read);
/* read schedule from stdin */
if(dp->host->features == NULL) {
dp->host->features = am_string_to_feature(features);
}
- remove_disk(waitqp, dp);
- insert_disk(&runq, dp, sort_by_time);
+ remove_disk(&waitq, dp);
+ enqueue_disk(&runq, dp);
+ flush_size += sp->act_size;
}
+ printf("driver: flush size %ld\n", flush_size);
amfree(inpline);
if(line == 0)
log_add(L_WARNING, "WARNING: got empty schedule from planner");
+ if(need_degraded==1) start_degraded_mode(&runq);
+ start_some_dumps(&runq);
}
-int free_kps(ip)
-interface_t *ip;
+static int
+free_kps(ip)
+ interface_t *ip;
{
int res;
return res;
}
-void interface_state(time_str)
-char *time_str;
+static void
+interface_state(time_str)
+ char *time_str;
{
interface_t *ip;
printf("\n");
}
-void allocate_bandwidth(ip, kps)
-interface_t *ip;
-int kps;
+static void
+allocate_bandwidth(ip, kps)
+ interface_t *ip;
+ int kps;
{
ip->curusage += kps;
}
-void deallocate_bandwidth(ip, kps)
-interface_t *ip;
-int kps;
+static void
+deallocate_bandwidth(ip, kps)
+ interface_t *ip;
+ int kps;
{
assert(kps <= ip->curusage);
ip->curusage -= kps;
}
/* ------------ */
-unsigned long free_space()
+static unsigned long
+free_space()
{
holdingdisk_t *hdp;
unsigned long total_free;
return total_free;
}
-assignedhd_t **find_diskspace(size, cur_idle, pref)
-unsigned long size;
-int *cur_idle;
-assignedhd_t *pref;
-/* Rewrite by Peter Conrad <conrad@opus5.de>, June '99:
- * - enable splitting a dump across several holding disks
- * - allocate only as much as size tells us, dumpers may request more later
- * We return an array of pointers to assignedhd_t. The array contains at
+static assignedhd_t **
+find_diskspace(size, cur_idle, pref)
+ unsigned long size;
+ int *cur_idle;
+ assignedhd_t *pref;
+/* We return an array of pointers to assignedhd_t. The array contains at
* most one entry per holding disk. The list of pointers is terminated by
* a NULL pointer. Each entry contains a pointer to a holdingdisk and
* how much diskspace to use on that disk. Later on, assign_holdingdisk
* will allocate the given amount of space.
* If there is not enough room on the holdingdisks, NULL is returned.
*/
+
{
-assignedhd_t **result = NULL;
+ assignedhd_t **result = NULL;
holdingdisk_t *minp, *hdp;
int i=0, num_holdingdisks=0; /* are we allowed to use the global thing? */
int j, minj;
size = am_round(size, DISK_BLOCK_KB);
#ifdef HOLD_DEBUG
- printf("find diskspace: want %lu K\n", size );
+ printf("%s: want %lu K\n", debug_prefix_time(": find_diskspace"), size);
fflush(stdout);
#endif
break;
}
else if( holdalloc(hdp)->allocated_space <= hdp->disksize - 2*DISK_BLOCK_KB &&
- !used[j] &&
+ !used[j] &&
(!minp ||
holdalloc(hdp)->allocated_dumpers < holdalloc(minp)->allocated_dumpers ||
(holdalloc(hdp)->allocated_dumpers == holdalloc(minp)->allocated_dumpers &&
minj = j;
}
}
+
pref = NULL;
if( !minp ) { break; } /* all holding disks are full */
used[minj] = 1;
halloc = dalloc + (((dalloc-1)/minp->chunksize)+1) * DISK_BLOCK_KB;
#ifdef HOLD_DEBUG
- fprintf(stdout,"find diskspace: size %ld hf %ld df %ld da %ld ha %ld\n", size, hfree, dfree, dalloc, halloc);
+ printf("%s: find diskspace: size %ld hf %ld df %ld da %ld ha %ld\n",
+ debug_prefix_time(": find_diskspace"),
+ size, hfree, dfree, dalloc, halloc);
fflush(stdout);
#endif
size -= dalloc;
amfree(used);
if( size ) { /* not enough space available */
-#ifdef HOLD_DEBUG
printf("find diskspace: not enough diskspace. Left with %lu K\n", size);
fflush(stdout);
-#endif
free_assignedhd(result);
result = NULL;
}
#ifdef HOLD_DEBUG
for( i = 0; result && result[i]; i++ ) {
- printf("find diskspace: selected %s free %ld reserved %ld dumpers %d\n",
- result[i]->disk->diskdir,
- result[i]->disk->disksize - holdalloc(result[i]->disk)->allocated_space,
- result[i]->reserved,
- holdalloc(result[i]->disk)->allocated_dumpers);
+ printf("%s: find diskspace: selected %s free %ld reserved %ld dumpers %d\n",
+ debug_prefix_time(": find_diskspace"),
+ result[i]->disk->diskdir,
+ result[i]->disk->disksize - holdalloc(result[i]->disk)->allocated_space,
+ result[i]->reserved,
+ holdalloc(result[i]->disk)->allocated_dumpers);
}
fflush(stdout);
#endif
return result;
}
-int assign_holdingdisk(holdp, diskp)
-assignedhd_t **holdp;
-disk_t *diskp;
+static int
+assign_holdingdisk(holdp, diskp)
+ assignedhd_t **holdp;
+ disk_t *diskp;
{
-/* Modified by Peter Conrad <conrad@opus5.de>, June '99
- * Modifications for splitting dumps across holding disks:
- * sched(diskp)->holdp now contains an array of pointers to assignedhd_t.
- */
int i, j, c, l=0;
unsigned long size;
char *sfn = sanitise_filename(diskp->name);
char lvl[64];
assignedhd_t **new_holdp;
- ap_snprintf( lvl, sizeof(lvl), "%d", sched(diskp)->level );
+ snprintf( lvl, sizeof(lvl), "%d", sched(diskp)->level );
size = am_round(sched(diskp)->est_size - sched(diskp)->act_size,
DISK_BLOCK_KB);
holdalloc(holdp[0]->disk)->allocated_space += holdp[0]->reserved;
size = (holdp[0]->reserved>size) ? 0 : size-holdp[0]->reserved;
#ifdef HOLD_DEBUG
- printf("merging holding disk %s to disk %s:%s, add %lu for reserved %lu, left %lu\n",
+ printf("%s: merging holding disk %s to disk %s:%s, add %lu for reserved %lu, left %lu\n",
+ debug_prefix_time(": assign_holdingdisk"),
sched(diskp)->holdp[j-1]->disk->diskdir,
diskp->host->hostname, diskp->name,
holdp[0]->reserved, sched(diskp)->holdp[j-1]->reserved,
holdalloc(holdp[i]->disk)->allocated_space += holdp[i]->reserved;
size = (holdp[i]->reserved>size) ? 0 : size-holdp[i]->reserved;
#ifdef HOLD_DEBUG
- printf("assigning holding disk %s to disk %s:%s, reserved %lu, left %lu\n",
- holdp[i]->disk->diskdir, diskp->host->hostname, diskp->name,
- holdp[i]->reserved, size );
- fflush(stdout);
+ printf("%s: %d assigning holding disk %s to disk %s:%s, reserved %lu, left %lu\n",
+ debug_prefix_time(": assign_holdingdisk"),
+ i, holdp[i]->disk->diskdir, diskp->host->hostname, diskp->name,
+ holdp[i]->reserved, size );
+ fflush(stdout);
#endif
holdp[i] = NULL; /* so it doesn't get free()d... */
}
sched(diskp)->holdp[j] = NULL;
- sched(diskp)->destname = newstralloc(sched(diskp)->destname,sched(diskp)->holdp[0]->destname);
amfree(sfn);
return l;
}
-static void adjust_diskspace(diskp, cmd)
-disk_t *diskp;
-cmd_t cmd;
+static void
+adjust_diskspace(diskp, cmd)
+ disk_t *diskp;
+ cmd_t cmd;
{
-/* Re-write by Peter Conrad <conrad@opus5.de>, March '99
- * Modifications for splitting dumps across holding disks:
- * Dumpers no longer write more than they've allocated, therefore an
- * adjustment may only free some allocated space.
- * 08/99: Jean-Louis suggested that dumpers tell us how much they've written.
- * We just believe them and don't stat all the files but rely on the used
- * field.
- */
-
assignedhd_t **holdp;
unsigned long total=0;
long diff;
int i;
#ifdef HOLD_DEBUG
- printf("adjust: %s:%s %s\n", diskp->host->hostname, diskp->name,
- sched(diskp)->destname );
+ printf("%s: %s:%s %s\n",
+ debug_prefix_time(": adjust_diskspace"),
+ diskp->host->hostname, diskp->name, sched(diskp)->destname);
fflush(stdout);
#endif
diff = holdp[i]->used - holdp[i]->reserved;
total += holdp[i]->used;
holdalloc(holdp[i]->disk)->allocated_space += diff;
+
#ifdef HOLD_DEBUG
- printf("adjust: hdisk %s done, reserved %ld used %ld diff %ld alloc %ld dumpers %d\n",
+ printf("%s: hdisk %s done, reserved %ld used %ld diff %ld alloc %ld dumpers %d\n",
+ debug_prefix_time(": adjust_diskspace"),
holdp[i]->disk->name, holdp[i]->reserved, holdp[i]->used, diff,
holdalloc(holdp[i]->disk)->allocated_space,
holdalloc(holdp[i]->disk)->allocated_dumpers );
- fflush(stdout);
+ fflush(stdout);
#endif
holdp[i]->reserved += diff;
}
sched(diskp)->act_size = total;
+
#ifdef HOLD_DEBUG
- printf("adjust: after: disk %s:%s used %ld\n", diskp->host->hostname,
- diskp->name, sched(diskp)->act_size );
+ printf("%s: after: disk %s:%s used %ld\n",
+ debug_prefix_time(": adjust_diskspace"),
+ diskp->host->hostname, diskp->name, sched(diskp)->act_size );
fflush(stdout);
#endif
+
}
-static void delete_diskspace(diskp)
-disk_t *diskp;
+static void
+delete_diskspace(diskp)
+ disk_t *diskp;
{
-/* Re-write by Peter Conrad <conrad@opus5.de>, March '99
- * Modifications for splitting dumps across holding disks:
- * After implementing Jean-Louis' suggestion (see above) this looks much
- * simpler... again, we rely on assignedhd_s containing correct info
- */
assignedhd_t **holdp;
int i;
assert(holdp);
for( i = 0; holdp[i]; i++ ) { /* for each disk */
- /* find all files of this dump on that disk, and subtract their
- * reserved sizes from the disk's allocated space
- */
+ /* find all files of this dump on that disk, and subtract their
+ * reserved sizes from the disk's allocated space
+ */
holdalloc(holdp[i]->disk)->allocated_space -= holdp[i]->used;
}
- unlink_holding_files(holdp[0]->destname); /* no need for the entire list,
- because unlink_holding_files
- will walk through all files
- using cont_filename */
-
+ unlink_holding_files(holdp[0]->destname); /* no need for the entire list,
+ * because unlink_holding_files
+ * will walk through all files
+ * using cont_filename */
free_assignedhd(sched(diskp)->holdp);
sched(diskp)->holdp = NULL;
sched(diskp)->act_size = 0;
- amfree(sched(diskp)->destname);
}
-assignedhd_t **build_diskspace(destname)
+static assignedhd_t **build_diskspace(destname)
char *destname;
{
int i, j;
filename, strerror(errno));
return NULL;
}
- buflen = fullread(fd, buffer, sizeof(buffer));
- parse_file_header(buffer, &file, buflen);
+ if ((buflen = fullread(fd, buffer, sizeof(buffer))) > 0) {;
+ parse_file_header(buffer, &file, buflen);
+ }
close(fd);
filename = file.cont_filename;
}
return result;
}
-
-void holdingdisk_state(time_str)
-char *time_str;
+static void
+holdingdisk_state(time_str)
+ char *time_str;
{
holdingdisk_t *hdp;
int dsk;
printf("\n");
}
-static void update_failed_dump_to_tape(dp)
-disk_t *dp;
+static void
+update_failed_dump_to_tape(dp)
+ disk_t *dp;
{
+/* JLM
+ * should simply set no_bump
+ */
+
time_t save_timestamp = sched(dp)->timestamp;
/* setting timestamp to 0 removes the current level from the
* database, so that we ensure that it will not be bumped to the
}
/* ------------------- */
-int dump_to_tape(dp)
- disk_t *dp;
+static int
+dump_to_tape(dp)
+ disk_t *dp;
{
dumper_t *dumper;
int failed = 0;
long origsize = 0;
long dumpsize = 0;
long dumptime = 0;
+ float tapetime = 0;
cmd_t cmd;
int result_argc;
char *result_argv[MAX_ARGS+1];
int dumper_tryagain = 0;
- inside_dump_to_tape = 1; /* for simulator */
-
printf("driver: dumping %s:%s directly to tape\n",
dp->host->hostname, dp->name);
fflush(stdout);
fflush(stdout);
log_add(L_WARNING, "no idle dumpers for %s:%s.\n",
dp->host->hostname, dp->name);
- inside_dump_to_tape = 0;
return 2; /* fatal problem */
}
printf("driver: did not get PORT from taper for %s:%s\n",
dp->host->hostname, dp->name);
fflush(stdout);
- inside_dump_to_tape = 0;
return 2; /* fatal problem */
}
/* copy port number */
- sched(dp)->destname = newvstralloc(sched(dp)->destname, result_argv[2], NULL );
+ dumper->output_port = atoi(result_argv[2]);
/* tell the dumper to dump to a port */
dp->inprogress = 1;
sched(dp)->timestamp = time((time_t *)0);
allocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
- idle_reason = 0;
+ idle_reason = NOT_IDLE;
short_dump_state();
/* wait for result from dumper */
- cmd = getresult(dumper->outfd, 1, &result_argc, result_argv, MAX_ARGS+1);
+ cmd = getresult(dumper->fd, 1, &result_argc, result_argv, MAX_ARGS+1);
if(cmd != BOGUS)
free_serial(result_argv[2]);
failed = 1; /* dump failed, must still finish up with taper */
break;
- case DONE: /* DONE <handle> <origsize> <dumpsize> <dumptime> <err str> */
+ case DONE: /* DONE <handle> <origsize> <dumpsize> <dumptime> <errstr> */
/* everything went fine */
origsize = (long)atof(result_argv[3]);
- dumpsize = (long)atof(result_argv[4]);
+ /*dumpsize = (long)atof(result_argv[4]);*/
dumptime = (long)atof(result_argv[5]);
break;
case NO_ROOM: /* NO-ROOM <handle> */
dumper_cmd(dumper, ABORT, dp);
- cmd = getresult(dumper->outfd, 1, &result_argc, result_argv, MAX_ARGS+1);
+ cmd = getresult(dumper->fd, 1, &result_argc, result_argv, MAX_ARGS+1);
if(cmd != BOGUS)
free_serial(result_argv[2]);
assert(cmd == ABORT_FINISHED);
- case TRYAGAIN: /* TRY-AGAIN <handle> <err str> */
+ case TRYAGAIN: /* TRY-AGAIN <handle> <errstr> */
default:
/* dump failed, but we must still finish up with taper */
/* problem with dump, possibly nonfatal, retry one time */
* "no space on device", etc., since taper closed the port first.
*/
+ continue_port_dump:
+
cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1);
switch(cmd) {
+ case PARTIAL:
case DONE: /* DONE <handle> <label> <tape file> <err mess> */
if(result_argc != 5) {
error("error [dump to tape DONE result_argc != 5: %d]", result_argc);
free_serial(result_argv[2]);
- /* every thing went fine */
- update_info_dumper(dp, origsize, dumpsize, dumptime);
- filenum = atoi(result_argv[4]);
- update_info_taper(dp, result_argv[3], filenum, sched(dp)->level);
- /* note that update_info_dumper() must be run before
- update_info_taper(), since update_info_dumper overwrites
- tape information. */
+ sscanf(result_argv[5],"[sec %f kb %ld ", &tapetime, &dumpsize);
+
+ if(cmd == DONE) {
+ /* every thing went fine */
+ update_info_dumper(dp, origsize, dumpsize, dumptime);
+ filenum = atoi(result_argv[4]);
+ update_info_taper(dp, result_argv[3], filenum, sched(dp)->level);
+ /* note that update_info_dumper() must be run before
+ update_info_taper(), since update_info_dumper overwrites
+ tape information. */
+ }
break;
case TRYAGAIN: /* TRY-AGAIN <handle> <err mess> */
+ tape_left = tape_length;
+ current_tape++;
if(dumper_tryagain == 0) {
sched(dp)->attempted++;
if(sched(dp)->attempted > failed)
failed_dumper:
update_failed_dump_to_tape(dp);
free_serial(result_argv[2]);
- tape_left = tape_length;
break;
-
+ case SPLIT_CONTINUE: /* SPLIT_CONTINUE <handle> <new_label> */
+ if (result_argc != 3) {
+ error("error [taper SPLIT_CONTINUE result_argc != 3: %d]", result_argc);
+ }
+ fprintf(stderr, "driver: Got SPLIT_CONTINUE %s %s\n", result_argv[2], result_argv[3]);
+
+ goto continue_port_dump;
+ break;
+ case SPLIT_NEEDNEXT:
+ fprintf(stderr, "driver: Got SPLIT_NEEDNEXT %s %s\n", result_argv[2], result_argv[3]);
+
+ goto continue_port_dump;
+ break;
case TAPE_ERROR: /* TAPE-ERROR <handle> <err mess> */
case BOGUS:
default:
dp->inprogress = 0;
deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
- inside_dump_to_tape = 0;
return failed;
}
-int queue_length(q)
-disklist_t q;
+static int
+queue_length(q)
+ disklist_t q;
{
disk_t *p;
int len;
return len;
}
-
-void short_dump_state()
+static void
+short_dump_state()
{
int i, nidle;
char *wall_time;
printf(" qlen tapeq: %d", queue_length(tapeq));
printf(" runq: %d", queue_length(runq));
printf(" roomq: %d", queue_length(roomq));
- printf(" wakeup: %d", (int)sleep_time.tv_sec);
+ printf(" wakeup: %d", (int)sleep_time);
printf(" driver-idle: %s\n", idle_strings[idle_reason]);
interface_state(wall_time);
holdingdisk_state(wall_time);
fflush(stdout);
}
-void dump_state(str)
-char *str;
+#if 0
+static void
+dump_state(str)
+ const char *str;
{
int i;
disk_t *dp;
printf("================\n");
fflush(stdout);
}
+#endif