2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 1991-1998 University of Maryland at College Park
6 * Permission to use, copy, modify, distribute, and sell this software and its
7 * documentation for any purpose is hereby granted without fee, provided that
8 * the above copyright notice appear in all copies and that both that
9 * copyright notice and this permission notice appear in supporting
10 * documentation, and that the name of U.M. not be used in advertising or
11 * publicity pertaining to distribution of the software without specific,
12 * written prior permission. U.M. makes no representations about the
13 * suitability of this software for any purpose. It is provided "as is"
14 * without express or implied warranty.
16 * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20 * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21 * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
23 * Authors: the Amanda Development Team. Its members are listed in a
24 * file named AUTHORS, in the root directory of this distribution.
27 * $Id: driver.c 6512 2007-05-24 17:00:24Z ian $
29 * controlling process for the Amanda backup system
33 * XXX possibly modify tape queue to be cognizant of how much room is left on
34 * tape. Probably not effective though, should do this in planner.
48 #include "server_util.h"
49 #include "timestamp.h"
51 #define driver_debug(i, ...) do { \
52 if ((i) <= debug_driver) { \
53 dbprintf(__VA_ARGS__); \
57 #define hold_debug(i, ...) do { \
58 if ((i) <= debug_holding) { \
59 dbprintf(__VA_ARGS__); \
63 static disklist_t waitq; // dle waiting estimate result
64 static disklist_t runq; // dle waiting to be dumped to holding disk
65 static disklist_t directq; // dle waiting to be dumped directly to tape
66 static disklist_t tapeq; // dle on holding disk waiting to be written
68 static disklist_t roomq; // dle waiting for more space on holding disk
69 static int pending_aborts;
70 static disk_t *taper_disk;
71 static int degraded_mode;
72 static off_t reserved_space;
73 static off_t total_disksize;
74 static char *dumper_program;
75 static char *chunker_program;
76 static int inparallel;
77 static int nodump = 0;
78 static off_t tape_length = (off_t)0;
79 static off_t tape_left = (off_t)0;
80 static int current_tape = 0;
81 static int conf_taperalgo;
82 static int conf_runtapes;
83 static time_t sleep_time;
84 static int idle_reason;
85 static char *driver_timestamp;
86 static char *hd_driver_timestamp;
87 static am_host_t *flushhost = NULL;
88 static int need_degraded=0;
89 static holdalloc_t *holdalloc;
90 static int num_holdalloc;
91 static event_handle_t *dumpers_ev_time = NULL;
92 static event_handle_t *schedule_ev_read = NULL;
93 static int conf_flush_threshold_dumped;
94 static int conf_flush_threshold_scheduled;
95 static int conf_taperflush;
96 static off_t flush_threshold_dumped;
97 static off_t flush_threshold_scheduled;
98 static off_t taperflush;
99 static int schedule_done; // 1 if we don't wait for a
100 // schedule from the planner
101 static int force_flush; // All dump are terminated, we
102 // must now respect taper_flush
104 static int wait_children(int count);
105 static void wait_for_children(void);
106 static void allocate_bandwidth(netif_t *ip, unsigned long kps);
107 static int assign_holdingdisk(assignedhd_t **holdp, disk_t *diskp);
108 static void adjust_diskspace(disk_t *diskp, cmd_t cmd);
109 static void delete_diskspace(disk_t *diskp);
110 static assignedhd_t **build_diskspace(char *destname);
111 static int client_constrained(disk_t *dp);
112 static void deallocate_bandwidth(netif_t *ip, unsigned long kps);
113 static void dump_schedule(disklist_t *qp, char *str);
114 static void dump_to_tape(disk_t *dp);
115 static assignedhd_t **find_diskspace(off_t size, int *cur_idle,
116 assignedhd_t *preferred);
117 static unsigned long free_kps(netif_t *ip);
118 static off_t free_space(void);
119 static void dumper_chunker_result(disk_t *dp);
120 static void dumper_taper_result(disk_t *dp);
121 static void file_taper_result(disk_t *dp);
122 static void handle_dumper_result(void *);
123 static void handle_chunker_result(void *);
124 static void handle_dumpers_time(void *);
125 static void handle_taper_result(void *);
127 static void holdingdisk_state(char *time_str);
128 static dumper_t *idle_dumper(void);
129 static void interface_state(char *time_str);
130 static int queue_length(disklist_t q);
131 static disklist_t read_flush(void);
132 static void read_schedule(void *cookie);
133 static void short_dump_state(void);
134 static void startaflush(void);
135 static void start_degraded_mode(disklist_t *queuep);
136 static void start_some_dumps(disklist_t *rq);
137 static void continue_port_dumps(void);
138 static void update_failed_dump_to_tape(disk_t *);
141 TAPE_ACTION_NO_ACTION = 0,
142 TAPE_ACTION_NEW_TAPE = (1 << 0),
143 TAPE_ACTION_NO_NEW_TAPE = (1 << 1),
144 TAPE_ACTION_START_A_FLUSH = (1 << 2)
147 static TapeAction tape_action(void);
149 static const char *idle_strings[] = {
152 #define IDLE_NO_DUMPERS 1
154 #define IDLE_START_WAIT 2
156 #define IDLE_NO_HOLD 3
158 #define IDLE_CLIENT_CONSTRAINED 4
159 T_("client-constrained"),
160 #define IDLE_NO_BANDWIDTH 5
162 #define IDLE_NO_DISKSPACE 6
176 struct fs_usage fsusage;
178 unsigned long reserve = 100;
182 char *result_argv[MAX_ARGS+1];
189 config_overwrites_t *cfg_ovr = NULL;
190 char *cfg_opt = NULL;
191 holdalloc_t *ha, *ha_last;
194 * Configure program for internationalization:
195 * 1) Only set the message locale for now.
196 * 2) Set textdomain for all amanda related programs to "amanda"
197 * We don't want to be forced to support dozens of message catalogs.
199 setlocale(LC_MESSAGES, "C");
200 textdomain("amanda");
204 setvbuf(stdout, (char *)NULL, (int)_IOLBF, 0);
205 setvbuf(stderr, (char *)NULL, (int)_IOLBF, 0);
209 dbopen(DBG_SUBDIR_SERVER);
211 atexit(wait_for_children);
213 /* Don't die when child closes pipe */
214 signal(SIGPIPE, SIG_IGN);
216 malloc_size_1 = malloc_inuse(&malloc_hist_1);
218 erroutput_type = (ERR_AMANDALOG|ERR_INTERACTIVE);
219 set_logerror(logerror);
223 cfg_ovr = extract_commandline_config_overwrites(&argc, &argv);
227 config_init(CONFIG_INIT_EXPLICIT_NAME | CONFIG_INIT_USE_CWD | CONFIG_INIT_FATAL,
229 apply_config_overwrites(cfg_ovr);
231 g_printf(_("%s: pid %ld executable %s version %s\n"),
232 get_pname(), (long) getpid(), argv[0], version());
235 if(strncmp(argv[2], "nodump", 6) == 0) {
240 safe_cd(); /* do this *after* config_init */
242 check_running_as(RUNNING_AS_DUMPUSER);
244 dbrename(config_name, DBG_SUBDIR_SERVER);
246 amfree(driver_timestamp);
247 /* read timestamp from stdin */
248 while ((line = agets(stdin)) != NULL) {
253 if ( line == NULL ) {
254 error(_("Did not get DATE line from planner"));
257 driver_timestamp = alloc(15);
258 strncpy(driver_timestamp, &line[5], 14);
259 driver_timestamp[14] = '\0';
261 log_add(L_START,_("date %s"), driver_timestamp);
263 gethostname(hostname, SIZEOF(hostname));
264 log_add(L_STATS,_("hostname %s"), hostname);
266 /* check that we don't do many dump in a day and usetimestamps is off */
267 if(strlen(driver_timestamp) == 8) {
269 char *conf_logdir = getconf_str(CNF_LOGDIR);
270 char *logfile = vstralloc(conf_logdir, "/log.",
271 driver_timestamp, ".0", NULL);
272 char *oldlogfile = vstralloc(conf_logdir, "/oldlog/log.",
273 driver_timestamp, ".0", NULL);
274 if(access(logfile, F_OK) == 0 || access(oldlogfile, F_OK) == 0) {
275 log_add(L_WARNING, _("WARNING: This is not the first amdump run today. Enable the usetimestamps option in the configuration file if you want to run amdump more than once per calendar day."));
280 hd_driver_timestamp = get_timestamp_from_time(0);
283 hd_driver_timestamp = stralloc(driver_timestamp);
286 taper_program = vstralloc(amlibexecdir, "/", "taper", versionsuffix(), NULL);
287 dumper_program = vstralloc(amlibexecdir, "/", "dumper", versionsuffix(),
289 chunker_program = vstralloc(amlibexecdir, "/", "chunker", versionsuffix(),
292 conf_taperalgo = getconf_taperalgo(CNF_TAPERALGO);
293 conf_tapetype = getconf_str(CNF_TAPETYPE);
294 conf_runtapes = getconf_int(CNF_RUNTAPES);
295 tape = lookup_tapetype(conf_tapetype);
296 tape_length = tapetype_get_length(tape);
297 conf_flush_threshold_dumped = getconf_int(CNF_FLUSH_THRESHOLD_DUMPED);
298 conf_flush_threshold_scheduled = getconf_int(CNF_FLUSH_THRESHOLD_SCHEDULED);
299 conf_taperflush = getconf_int(CNF_TAPERFLUSH);
301 flush_threshold_dumped = (conf_flush_threshold_dumped * tape_length) / 100;
302 flush_threshold_scheduled = (conf_flush_threshold_scheduled * tape_length) / 100;
303 taperflush = (conf_taperflush *tape_length) / 100;
305 driver_debug(1, _("flush_threshold_dumped: %lld\n"), (long long)flush_threshold_dumped);
306 driver_debug(1, _("flush_threshold_scheduled: %lld\n"), (long long)flush_threshold_scheduled);
307 driver_debug(1, _("taperflush: %lld\n"), (long long)taperflush);
309 /* start initializing: read in databases */
311 conf_diskfile = config_dir_relative(getconf_str(CNF_DISKFILE));
312 if (read_diskfile(conf_diskfile, &origq) < 0) {
313 error(_("could not load disklist \"%s\""), conf_diskfile);
316 amfree(conf_diskfile);
318 /* set up any configuration-dependent variables */
320 inparallel = getconf_int(CNF_INPARALLEL);
322 reserve = (unsigned long)getconf_int(CNF_RESERVE);
324 total_disksize = (off_t)0;
327 for(hdp = getconf_holdingdisks(), dsk = 0; hdp != NULL; hdp = holdingdisk_next(hdp), dsk++) {
328 ha = alloc(SIZEOF(holdalloc_t));
331 /* link the list in the same order as getconf_holdingdisks's results */
340 ha->allocated_dumpers = 0;
341 ha->allocated_space = (off_t)0;
342 ha->disksize = holdingdisk_get_disksize(hdp);
345 if(get_fs_usage(holdingdisk_get_diskdir(hdp), NULL, &fsusage) == -1
346 || access(holdingdisk_get_diskdir(hdp), W_OK) == -1) {
347 log_add(L_WARNING, _("WARNING: ignoring holding disk %s: %s\n"),
348 holdingdisk_get_diskdir(hdp), strerror(errno));
353 /* do the division first to avoid potential integer overflow */
354 if (fsusage.fsu_bavail_top_bit_set)
357 kb_avail = fsusage.fsu_bavail / 1024 * fsusage.fsu_blocksize;
359 if(ha->disksize > (off_t)0) {
360 if(ha->disksize > kb_avail) {
362 _("WARNING: %s: %lld KB requested, "
363 "but only %lld KB available."),
364 holdingdisk_get_diskdir(hdp),
365 (long long)ha->disksize,
366 (long long)kb_avail);
367 ha->disksize = kb_avail;
370 /* ha->disksize is negative; use all but that amount */
371 else if(kb_avail < -ha->disksize) {
373 _("WARNING: %s: not %lld KB free."),
374 holdingdisk_get_diskdir(hdp),
375 (long long)-ha->disksize);
376 ha->disksize = (off_t)0;
380 ha->disksize += kb_avail;
382 g_printf(_("driver: adding holding disk %d dir %s size %lld chunksize %lld\n"),
383 dsk, holdingdisk_get_diskdir(hdp),
384 (long long)ha->disksize,
385 (long long)(holdingdisk_get_chunksize(hdp)));
387 newdir = newvstralloc(newdir,
388 holdingdisk_get_diskdir(hdp), "/", hd_driver_timestamp,
390 if(!mkholdingdir(newdir)) {
391 ha->disksize = (off_t)0;
393 total_disksize += ha->disksize;
396 reserved_space = total_disksize * (off_t)(reserve / 100);
398 g_printf(_("reserving %lld out of %lld for degraded-mode dumps\n"),
399 (long long)reserved_space, (long long)free_space());
403 if(inparallel > MAX_DUMPERS) inparallel = MAX_DUMPERS;
405 /* taper takes a while to get going, so start it up right away */
408 if(conf_runtapes > 0) {
409 startup_tape_process(taper_program);
410 taper_cmd(START_TAPER, driver_timestamp, NULL, 0, NULL);
413 /* fire up the dumpers now while we are waiting */
414 if(!nodump) startup_dump_processes(dumper_program, inparallel, driver_timestamp);
417 * Read schedule from stdin. Usually, this is a pipe from planner,
418 * so the effect is that we wait here for the planner to
419 * finish, but meanwhile the taper is rewinding the tape, reading
420 * the label, checking it, writing a new label and all that jazz
421 * in parallel with the planner.
429 taper_state = TAPER_STATE_DEFAULT;
430 tapeq = read_flush();
432 roomq.head = roomq.tail = NULL;
434 log_add(L_STATS, _("startup time %s"), walltime_str(curclock()));
436 g_printf(_("driver: start time %s inparallel %d bandwidth %lu diskspace %lld "), walltime_str(curclock()), inparallel,
437 free_kps(NULL), (long long)free_space());
438 g_printf(_(" dir %s datestamp %s driver: drain-ends tapeq %s big-dumpers %s\n"),
439 "OBSOLETE", driver_timestamp, taperalgo2str(conf_taperalgo),
440 getconf_str(CNF_DUMPORDER));
443 /* ok, planner is done, now lets see if the tape is ready */
445 if (conf_runtapes > 0) {
446 cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1);
447 if (cmd != TAPER_OK) {
448 /* no tape, go into degraded mode: dump to holding disk */
455 tape_left = tape_length;
457 taper_input_error = NULL;
458 taper_tape_error = NULL;
460 taper_ev_read = NULL;
462 schedule_done = nodump;
465 if(!need_degraded) startaflush();
468 schedule_ev_read = event_register((event_id_t)0, EV_READFD, read_schedule, NULL);
475 /* mv runq to directq */
476 while (!empty(runq)) {
477 diskp = dequeue_disk(&runq);
478 headqueue_disk(&directq, diskp);
481 /* handle any remaining dumps by dumping directly to tape, if possible */
482 while(!empty(directq) && taper > 0) {
483 diskp = dequeue_disk(&directq);
484 if (diskp->to_holdingdisk == HOLD_REQUIRED) {
485 char *qname = quote_string(diskp->name);
486 log_add(L_FAIL, _("%s %s %s %d [%s]"),
487 diskp->host->hostname, qname, sched(diskp)->datestamp,
489 _("can't dump required holdingdisk"));
492 else if (!degraded_mode) {
493 taper_state |= TAPER_STATE_DUMP_TO_TAPE;
496 taper_state &= !TAPER_STATE_DUMP_TO_TAPE;
499 char *qname = quote_string(diskp->name);
500 log_add(L_FAIL, _("%s %s %s %d [%s]"),
501 diskp->host->hostname, qname, sched(diskp)->datestamp,
503 diskp->to_holdingdisk == HOLD_AUTO ?
504 _("no more holding disk space") :
505 _("can't dump no-hold disk in degraded mode"));
510 /* fill up the tape or start new one for taperflush */
514 short_dump_state(); /* for amstatus */
516 g_printf(_("driver: QUITTING time %s telling children to quit\n"),
517 walltime_str(curclock()));
521 for(dumper = dmptable; dumper < dmptable + inparallel; dumper++) {
523 dumper_cmd(dumper, QUIT, NULL);
528 taper_cmd(QUIT, NULL, NULL, 0, NULL);
531 /* wait for all to die */
535 holding_cleanup(NULL, NULL);
539 check_unfree_serial();
540 g_printf(_("driver: FINISHED time %s\n"), walltime_str(curclock()));
542 log_add(L_FINISH,_("date %s time %s"), driver_timestamp, walltime_str(curclock()));
543 amfree(driver_timestamp);
545 free_new_argv(new_argc, new_argv);
546 amfree(dumper_program);
547 amfree(taper_program);
554 /* sleep up to count seconds, and wait for terminating child process */
555 /* if sleep is negative, this function will not timeout */
556 /* exit once all child process are finished or the timout expired */
557 /* return 0 if no more children to wait */
558 /* return 1 if some children are still alive */
560 wait_children(int count)
572 pid = waitpid((pid_t)-1, &retstat, WNOHANG);
576 if (! WIFEXITED(retstat)) {
578 code = WTERMSIG(retstat);
579 } else if (WEXITSTATUS(retstat) != 0) {
581 code = WEXITSTATUS(retstat);
584 for (dumper = dmptable; dumper < dmptable + inparallel;
586 if (pid == dumper->pid) {
587 who = stralloc(dumper->name);
591 if (dumper->chunker && pid == dumper->chunker->pid) {
592 who = stralloc(dumper->chunker->name);
593 dumper->chunker->pid = -1;
597 if (who == NULL && pid == taper_pid) {
598 who = stralloc("taper");
601 if(what != NULL && who == NULL) {
602 who = stralloc("unknown");
605 log_add(L_WARNING, _("%s pid %u exited with %s %d\n"), who,
606 (unsigned)pid, what, code);
607 g_printf(_("driver: %s pid %u exited with %s %d\n"), who,
608 (unsigned)pid, what, code);
612 } while (pid > 0 || wait_errno == EINTR);
617 } while ((errno != ECHILD) && (count != 0));
618 return (errno != ECHILD);
622 kill_children(int signal)
627 for(dumper = dmptable; dumper < dmptable + inparallel; dumper++) {
628 if (!dumper->down && dumper->pid > 1) {
629 g_printf(_("driver: sending signal %d to %s pid %u\n"), signal,
630 dumper->name, (unsigned)dumper->pid);
631 if (kill(dumper->pid, signal) == -1 && errno == ESRCH) {
633 dumper->chunker->pid = 0;
635 if (dumper->chunker && dumper->chunker->pid > 1) {
636 g_printf(_("driver: sending signal %d to %s pid %u\n"), signal,
637 dumper->chunker->name,
638 (unsigned)dumper->chunker->pid);
639 if (kill(dumper->chunker->pid, signal) == -1 &&
641 dumper->chunker->pid = 0;
648 g_printf(_("driver: sending signal %d to %s pid %u\n"), signal,
649 "taper", (unsigned)taper_pid);
650 if (kill(taper_pid, signal) == -1 && errno == ESRCH)
655 wait_for_children(void)
660 for(dumper = dmptable; dumper < dmptable + inparallel; dumper++) {
661 if (dumper->pid > 1 && dumper->fd >= 0) {
662 dumper_cmd(dumper, QUIT, NULL);
663 if (dumper->chunker && dumper->chunker->pid > 1 &&
664 dumper->chunker->fd >= 0)
665 chunker_cmd(dumper->chunker, QUIT, NULL);
670 if(taper_pid > 1 && taper > 0) {
671 taper_cmd(QUIT, NULL, NULL, 0, NULL);
674 if(wait_children(60) == 0)
677 kill_children(SIGHUP);
678 if(wait_children(60) == 0)
681 kill_children(SIGKILL);
682 if(wait_children(-1) == 0)
695 TapeAction result_tape_action;
697 result_tape_action = tape_action();
699 if (result_tape_action & TAPE_ACTION_NEW_TAPE) {
700 taper_state &= !TAPER_STATE_WAIT_FOR_TAPE;
701 taper_cmd(NEW_TAPE, NULL, NULL, 0, NULL);
702 } else if (result_tape_action & TAPE_ACTION_NO_NEW_TAPE) {
703 taper_state &= !TAPER_STATE_WAIT_FOR_TAPE;
704 taper_cmd(NO_NEW_TAPE, NULL, NULL, 0, NULL);
707 if (!degraded_mode && !taper_busy && !empty(tapeq) &&
708 (result_tape_action & TAPE_ACTION_START_A_FLUSH)) {
710 datestamp = sched(tapeq.head)->datestamp;
711 switch(conf_taperalgo) {
713 dp = dequeue_disk(&tapeq);
717 while (fit != NULL) {
718 extra_tapes = (fit->tape_splitsize > (off_t)0) ?
719 conf_runtapes - current_tape : 0;
720 if(sched(fit)->act_size <= (tape_left +
721 tape_length * (off_t)extra_tapes) &&
722 strcmp(sched(fit)->datestamp, datestamp) <= 0) {
730 if(dp) remove_disk(&tapeq, dp);
733 fit = dp = tapeq.head;
734 while (fit != NULL) {
735 if(sched(fit)->act_size > sched(dp)->act_size &&
736 strcmp(sched(fit)->datestamp, datestamp) <= 0) {
741 if(dp) remove_disk(&tapeq, dp);
743 case ALGO_LARGESTFIT:
745 while (fit != NULL) {
746 extra_tapes = (fit->tape_splitsize > (off_t)0) ?
747 conf_runtapes - current_tape : 0;
748 if(sched(fit)->act_size <=
749 (tape_left + tape_length * (off_t)extra_tapes) &&
750 (!dp || sched(fit)->act_size > sched(dp)->act_size) &&
751 strcmp(sched(fit)->datestamp, datestamp) <= 0) {
756 if(dp) remove_disk(&tapeq, dp);
762 remove_disk(&tapeq, dp);
765 if(!dp) { /* ALGO_SMALLEST, or default if nothing fit. */
766 if(conf_taperalgo != ALGO_SMALLEST) {
768 _("driver: startaflush: Using SMALLEST because nothing fit\n"));
770 fit = dp = tapeq.head;
771 while (fit != NULL) {
772 if(sched(fit)->act_size < sched(dp)->act_size &&
773 strcmp(sched(fit)->datestamp, datestamp) <= 0) {
778 if(dp) remove_disk(&tapeq, dp);
780 if(taper_ev_read == NULL) {
781 taper_ev_read = event_register((event_id_t)taper, EV_READFD,
782 handle_taper_result, NULL);
787 taper_input_error = NULL;
788 taper_tape_error = NULL;
789 taper_result = LAST_TOK;
790 taper_sendresult = 0;
791 taper_first_label = NULL;
793 taper_state &= !TAPER_STATE_DUMP_TO_TAPE;
795 qname = quote_string(dp->name);
796 taper_cmd(FILE_WRITE, dp, sched(dp)->destname, sched(dp)->level,
797 sched(dp)->datestamp);
798 g_fprintf(stderr,_("driver: startaflush: %s %s %s %lld %lld\n"),
799 taperalgo2str(conf_taperalgo), dp->host->hostname, qname,
800 (long long)sched(taper_disk)->act_size,
801 (long long)tape_left);
802 if(sched(dp)->act_size <= tape_left)
803 tape_left -= sched(dp)->act_size;
805 tape_left = (off_t)0;
808 error(_("FATAL: Taper marked busy and no work found."));
811 } else if(!taper_busy && taper_ev_read != NULL) {
812 event_release(taper_ev_read);
813 taper_ev_read = NULL;
823 /* first, check if host is too busy */
825 if(dp->host->inprogress >= dp->host->maxdumps) {
829 /* next, check conflict with other dumps on same spindle */
831 if(dp->spindle == -1) { /* but spindle -1 never conflicts by def. */
835 for(dp2 = dp->host->disks; dp2 != NULL; dp2 = dp2->hostnext)
836 if(dp2->inprogress && dp2->spindle == dp->spindle) {
848 disk_t *diskp, *delayed_diskp, *diskp_accept;
849 assignedhd_t **holdp=NULL, **holdp_accept;
850 const time_t now = time(NULL);
853 char *result_argv[MAX_ARGS+1];
858 int busy_dumpers = 0;
860 idle_reason = IDLE_NO_DUMPERS;
863 if(dumpers_ev_time != NULL) {
864 event_release(dumpers_ev_time);
865 dumpers_ev_time = NULL;
868 for(dumper = dmptable; dumper < (dmptable+inparallel); dumper++) {
874 for (dumper = dmptable; dumper < dmptable+inparallel; dumper++) {
876 if( dumper->busy || dumper->down) {
880 if (dumper->ev_read != NULL) {
881 event_release(dumper->ev_read);
882 dumper->ev_read = NULL;
886 * A potential problem with starting from the bottom of the dump time
887 * distribution is that a slave host will have both one of the shortest
888 * and one of the longest disks, so starting its shortest disk first will
889 * tie up the host and eliminate its longest disk from consideration the
890 * first pass through. This could cause a big delay in starting that long
891 * disk, which could drag out the whole night's dumps.
893 * While starting from the top of the dump time distribution solves the
894 * above problem, this turns out to be a bad idea, because the big dumps
895 * will almost certainly pack the holding disk completely, leaving no
896 * room for even one small dump to start. This ends up shutting out the
897 * small-end dumpers completely (they stay idle).
899 * The introduction of multiple simultaneous dumps to one host alleviates
900 * the biggest&smallest dumps problem: both can be started at the
906 delayed_diskp = NULL;
910 dumporder = getconf_str(CNF_DUMPORDER);
911 if(strlen(dumporder) > (size_t)(dumper-dmptable)) {
912 dumptype = dumporder[dumper-dmptable];
915 if(dumper-dmptable < 3)
921 for(diskp = rq->head; diskp != NULL; diskp = diskp->next) {
922 assert(diskp->host != NULL && sched(diskp) != NULL);
924 if (diskp->host->start_t > now) {
925 cur_idle = max(cur_idle, IDLE_START_WAIT);
926 if (delayed_diskp == NULL || sleep_time > diskp->host->start_t) {
927 delayed_diskp = diskp;
928 sleep_time = diskp->host->start_t;
930 } else if(diskp->start_t > now) {
931 cur_idle = max(cur_idle, IDLE_START_WAIT);
932 if (delayed_diskp == NULL || sleep_time > diskp->start_t) {
933 delayed_diskp = diskp;
934 sleep_time = diskp->start_t;
936 } else if (diskp->host->netif->curusage > 0 &&
937 sched(diskp)->est_kps > free_kps(diskp->host->netif)) {
938 cur_idle = max(cur_idle, IDLE_NO_BANDWIDTH);
939 } else if(sched(diskp)->no_space) {
940 cur_idle = max(cur_idle, IDLE_NO_DISKSPACE);
941 } else if (diskp->to_holdingdisk == HOLD_NEVER) {
942 cur_idle = max(cur_idle, IDLE_NO_HOLD);
944 find_diskspace(sched(diskp)->est_size, &cur_idle, NULL)) == NULL) {
945 cur_idle = max(cur_idle, IDLE_NO_DISKSPACE);
946 if (empty(tapeq) && busy_dumpers == 0) {
947 remove_disk(rq, diskp);
948 enqueue_disk(&directq, diskp);
950 } else if (client_constrained(diskp)) {
951 free_assignedhd(holdp);
952 cur_idle = max(cur_idle, IDLE_CLIENT_CONSTRAINED);
955 /* disk fits, dump it */
956 int accept = !diskp_accept;
959 case 's': accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size);
961 case 'S': accept = (sched(diskp)->est_size > sched(diskp_accept)->est_size);
963 case 't': accept = (sched(diskp)->est_time < sched(diskp_accept)->est_time);
965 case 'T': accept = (sched(diskp)->est_time > sched(diskp_accept)->est_time);
967 case 'b': accept = (sched(diskp)->est_kps < sched(diskp_accept)->est_kps);
969 case 'B': accept = (sched(diskp)->est_kps > sched(diskp_accept)->est_kps);
971 default: log_add(L_WARNING, _("Unknown dumporder character \'%c\', using 's'.\n"),
973 accept = (sched(diskp)->est_size < sched(diskp_accept)->est_size);
978 if( !diskp_accept || !degraded_mode || diskp->priority >= diskp_accept->priority) {
979 if(holdp_accept) free_assignedhd(holdp_accept);
980 diskp_accept = diskp;
981 holdp_accept = holdp;
984 free_assignedhd(holdp);
988 free_assignedhd(holdp);
993 diskp = diskp_accept;
994 holdp = holdp_accept;
996 idle_reason = max(idle_reason, cur_idle);
999 * If we have no disk at this point, and there are disks that
1000 * are delayed, then schedule a time event to call this dumper
1001 * with the disk with the shortest delay.
1003 if (diskp == NULL && delayed_diskp != NULL) {
1004 assert(sleep_time > now);
1006 dumpers_ev_time = event_register((event_id_t)sleep_time, EV_TIME,
1007 handle_dumpers_time, &runq);
1009 } else if (diskp != NULL) {
1010 sched(diskp)->act_size = (off_t)0;
1011 allocate_bandwidth(diskp->host->netif, sched(diskp)->est_kps);
1012 sched(diskp)->activehd = assign_holdingdisk(holdp, diskp);
1014 sched(diskp)->destname = newstralloc(sched(diskp)->destname,
1015 sched(diskp)->holdp[0]->destname);
1016 diskp->host->inprogress++; /* host is now busy */
1017 diskp->inprogress = 1;
1018 sched(diskp)->dumper = dumper;
1019 sched(diskp)->timestamp = now;
1021 dumper->busy = 1; /* dumper is now busy */
1022 dumper->dp = diskp; /* link disk to dumper */
1023 remove_disk(rq, diskp); /* take it off the run queue */
1025 sched(diskp)->origsize = (off_t)-1;
1026 sched(diskp)->dumpsize = (off_t)-1;
1027 sched(diskp)->dumptime = (time_t)0;
1028 sched(diskp)->tapetime = (time_t)0;
1029 chunker = dumper->chunker;
1030 chunker->result = LAST_TOK;
1031 dumper->result = LAST_TOK;
1032 startup_chunk_process(chunker,chunker_program);
1033 chunker_cmd(chunker, START, (void *)driver_timestamp);
1034 chunker->dumper = dumper;
1035 chunker_cmd(chunker, PORT_WRITE, diskp);
1036 cmd = getresult(chunker->fd, 1, &result_argc, result_argv, MAX_ARGS+1);
1038 assignedhd_t **h=NULL;
1040 char *qname = quote_string(diskp->name);
1042 g_printf(_("driver: did not get PORT from %s for %s:%s\n"),
1043 chunker->name, diskp->host->hostname, qname);
1047 deallocate_bandwidth(diskp->host->netif, sched(diskp)->est_kps);
1048 h = sched(diskp)->holdp;
1049 activehd = sched(diskp)->activehd;
1050 h[activehd]->used = 0;
1051 h[activehd]->disk->allocated_dumpers--;
1052 adjust_diskspace(diskp, DONE);
1053 delete_diskspace(diskp);
1054 diskp->host->inprogress--;
1055 diskp->inprogress = 0;
1056 sched(diskp)->dumper = NULL;
1059 sched(diskp)->dump_attempted++;
1060 free_serial_dp(diskp);
1061 if(sched(diskp)->dump_attempted < 2)
1062 enqueue_disk(rq, diskp);
1065 dumper->ev_read = event_register((event_id_t)dumper->fd, EV_READFD,
1066 handle_dumper_result, dumper);
1067 chunker->ev_read = event_register((event_id_t)chunker->fd, EV_READFD,
1068 handle_chunker_result, chunker);
1069 dumper->output_port = atoi(result_argv[2]);
1071 dumper_cmd(dumper, PORT_DUMP, diskp);
1073 diskp->host->start_t = now + 15;
1079 * This gets called when a dumper is delayed for some reason. It may
1080 * be because a disk has a delayed start, or amanda is constrained
1081 * by network or disk limits.
1085 handle_dumpers_time(
1088 disklist_t *runq = cookie;
1089 event_release(dumpers_ev_time);
1090 dumpers_ev_time = NULL;
1091 start_some_dumps(runq);
1102 g_printf(_("dump of driver schedule %s:\n--------\n"), str);
1104 for(dp = qp->head; dp != NULL; dp = dp->next) {
1105 qname = quote_string(dp->name);
1106 g_printf(" %-20s %-25s lv %d t %5lu s %lld p %d\n",
1107 dp->host->hostname, qname, sched(dp)->level,
1108 sched(dp)->est_time,
1109 (long long)sched(dp)->est_size, sched(dp)->priority);
1112 g_printf("--------\n");
1116 start_degraded_mode(
1117 /*@keep@*/ disklist_t *queuep)
1121 off_t est_full_size;
1124 if (taper_ev_read != NULL) {
1125 event_release(taper_ev_read);
1126 taper_ev_read = NULL;
1129 newq.head = newq.tail = 0;
1131 dump_schedule(queuep, _("before start degraded mode"));
1133 est_full_size = (off_t)0;
1134 while(!empty(*queuep)) {
1135 dp = dequeue_disk(queuep);
1137 qname = quote_string(dp->name);
1138 if(sched(dp)->level != 0)
1139 /* go ahead and do the disk as-is */
1140 enqueue_disk(&newq, dp);
1142 if (reserved_space + est_full_size + sched(dp)->est_size
1143 <= total_disksize) {
1144 enqueue_disk(&newq, dp);
1145 est_full_size += sched(dp)->est_size;
1147 else if(sched(dp)->degr_level != -1) {
1148 sched(dp)->level = sched(dp)->degr_level;
1149 sched(dp)->dumpdate = sched(dp)->degr_dumpdate;
1150 sched(dp)->est_nsize = sched(dp)->degr_nsize;
1151 sched(dp)->est_csize = sched(dp)->degr_csize;
1152 sched(dp)->est_time = sched(dp)->degr_time;
1153 sched(dp)->est_kps = sched(dp)->degr_kps;
1154 enqueue_disk(&newq, dp);
1157 log_add(L_FAIL,_("%s %s %s %d [can't switch to incremental dump]"),
1158 dp->host->hostname, qname, sched(dp)->datestamp,
1165 /*@i@*/ *queuep = newq;
1168 dump_schedule(queuep, _("after start degraded mode"));
1173 continue_port_dumps(void)
1177 int active_dumpers=0, busy_dumpers=0, i;
1180 /* First we try to grant diskspace to some dumps waiting for it. */
1181 for( dp = roomq.head; dp; dp = ndp ) {
1183 /* find last holdingdisk used by this dump */
1184 for( i = 0, h = sched(dp)->holdp; h[i+1]; i++ ) {
1185 (void)h; /* Quiet lint */
1187 /* find more space */
1188 h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size,
1189 &active_dumpers, h[i] );
1191 for(dumper = dmptable; dumper < dmptable + inparallel &&
1192 dumper->dp != dp; dumper++) {
1193 (void)dp; /* Quiet lint */
1195 assert( dumper < dmptable + inparallel );
1196 sched(dp)->activehd = assign_holdingdisk( h, dp );
1197 chunker_cmd( dumper->chunker, CONTINUE, dp );
1199 remove_disk( &roomq, dp );
1203 /* So for some disks there is less holding diskspace available than
1204 * was asked for. Possible reasons are
1205 * a) diskspace has been allocated for other dumps which are
1206 * still running or already being written to tape
1207 * b) all other dumps have been suspended due to lack of diskspace
1208 * c) this dump doesn't fit on all the holding disks
1209 * Case a) is not a problem. We just wait for the diskspace to
1210 * be freed by moving the current disk to a queue.
1211 * If case b) occurs, we have a deadlock situation. We select
1212 * a dump from the queue to be aborted and abort it. It will
1213 * be retried later dumping to disk.
1214 * If case c) is detected, the dump is aborted. Next time
1215 * it will be dumped directly to tape. Actually, case c is a special
1216 * manifestation of case b) where only one dumper is busy.
1218 for(dp=NULL, dumper = dmptable; dumper < (dmptable+inparallel); dumper++) {
1219 if( dumper->busy ) {
1221 if( !find_disk(&roomq, dumper->dp) ) {
1224 sched(dp)->est_size > sched(dumper->dp)->est_size ) {
1229 if((dp != NULL) && (active_dumpers == 0) && (busy_dumpers > 0) &&
1230 ((!taper_busy && empty(tapeq)) || degraded_mode) &&
1231 pending_aborts == 0 ) { /* not case a */
1232 if( busy_dumpers == 1 ) { /* case c */
1233 sched(dp)->no_space = 1;
1236 /* At this time, dp points to the dump with the smallest est_size.
1237 * We abort that dump, hopefully not wasting too much time retrying it.
1239 remove_disk( &roomq, dp );
1240 chunker_cmd( sched(dp)->dumper->chunker, ABORT, NULL);
1241 dumper_cmd( sched(dp)->dumper, ABORT, NULL );
1248 handle_taper_result(
1254 char *result_argv[MAX_ARGS+1];
1257 (void)cookie; /* Quiet unused parameter warning */
1259 assert(cookie == NULL);
1265 cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1);
1269 case FAILED: /* FAILED <handle> INPUT-* TAPE-* <input err mesg> <tape err mesg> */
1270 if(result_argc != 6) {
1271 error(_("error: [taper FAILED result_argc != 6: %d"), result_argc);
1275 dp = serial2disk(result_argv[2]);
1276 assert(dp == taper_disk);
1278 free_serial(result_argv[2]);
1280 qname = quote_string(dp->name);
1281 g_printf(_("driver: finished-cmd time %s taper wrote %s:%s\n"),
1282 walltime_str(curclock()), dp->host->hostname, qname);
1286 if (strcmp(result_argv[3], "INPUT-ERROR") == 0) {
1287 taper_input_error = stralloc(result_argv[5]);
1289 if (strcmp(result_argv[4], "TAPE-ERROR") == 0) {
1290 taper_tape_error = stralloc(result_argv[6]);
1297 case PARTIAL: /* PARTIAL <handle> INPUT-* TAPE-* <stat mess> <input err mesg> <tape err mesg>*/
1298 case DONE: /* DONE <handle> INPUT-GOOD TAPE-GOOD <stat mess> <input err mesg> <tape err mesg> */
1299 if(result_argc != 7) {
1300 error(_("error: [taper PARTIAL result_argc != 7: %d"), result_argc);
1304 dp = serial2disk(result_argv[2]);
1305 assert(dp == taper_disk);
1307 free_serial(result_argv[2]);
1309 qname = quote_string(dp->name);
1310 g_printf(_("driver: finished-cmd time %s taper wrote %s:%s\n"),
1311 walltime_str(curclock()), dp->host->hostname, qname);
1315 if (strcmp(result_argv[3], "INPUT-ERROR") == 0) {
1316 taper_input_error = stralloc(result_argv[5]);
1318 if (strcmp(result_argv[4], "TAPE-ERROR") == 0) {
1319 taper_tape_error = stralloc(result_argv[6]);
1326 case PARTDONE: /* PARTDONE <handle> <label> <fileno> <stat> */
1327 dp = serial2disk(result_argv[2]);
1328 assert(dp == taper_disk);
1329 if (result_argc != 6) {
1330 error(_("error [taper PARTDONE result_argc != 5: %d]"),
1334 if (!taper_first_label) {
1335 taper_first_label = stralloc(result_argv[3]);
1336 taper_first_fileno = OFF_T_ATOI(result_argv[4]);
1338 taper_written = OFF_T_ATOI(result_argv[5]);
1339 if (taper_written > sched(taper_disk)->act_size)
1340 sched(taper_disk)->act_size = taper_written;
1344 case REQUEST_NEW_TAPE: /* REQUEST-NEW-TAPE */
1345 if (result_argc != 2) {
1346 error(_("error [taper REQUEST_NEW_TAPE result_argc != 2: %d]"),
1350 taper_state &= !TAPER_STATE_TAPE_STARTED;
1352 if (current_tape >= conf_runtapes) {
1353 taper_cmd(NO_NEW_TAPE, NULL, NULL, 0, NULL);
1355 _("Out of tapes; going into degraded mode."));
1356 start_degraded_mode(&runq);
1358 TapeAction result_tape_action;
1360 taper_state |= TAPER_STATE_WAIT_FOR_TAPE;
1361 result_tape_action = tape_action();
1362 if (result_tape_action & TAPE_ACTION_NEW_TAPE) {
1363 taper_cmd(NEW_TAPE, NULL, NULL, 0, NULL);
1364 taper_state &= !TAPER_STATE_WAIT_FOR_TAPE;
1365 } else if (result_tape_action & TAPE_ACTION_NO_NEW_TAPE) {
1366 taper_cmd(NO_NEW_TAPE, NULL, NULL, 0, NULL);
1367 taper_state &= !TAPER_STATE_WAIT_FOR_TAPE;
1372 case NEW_TAPE: /* NEW-TAPE <handle> <label> */
1373 if (result_argc != 3) {
1374 error(_("error [taper NEW_TAPE result_argc != 3: %d]"),
1379 /* Update our tape counter and reset tape_left */
1381 tape_left = tape_length;
1382 taper_state |= TAPER_STATE_TAPE_STARTED;
1385 case NO_NEW_TAPE: /* NO-NEW-TAPE <handle> */
1386 if (result_argc != 2) {
1387 error(_("error [taper NO_NEW_TAPE result_argc != 2: %d]"),
1393 case DUMPER_STATUS: /* DUMPER-STATUS <handle> */
1394 if (result_argc != 2) {
1395 error(_("error [taper NO_NEW_TAPE result_argc != 2: %d]"),
1399 if (taper_dumper->result == LAST_TOK) {
1400 taper_sendresult = 1;
1402 if( taper_dumper->result == DONE) {
1403 taper_cmd(DONE, NULL, NULL, 0, NULL);
1405 taper_cmd(FAILED, NULL, NULL, 0, NULL);
1410 case TAPE_ERROR: /* TAPE-ERROR <handle> <err mess> */
1411 dp = serial2disk(result_argv[2]);
1413 free_serial(result_argv[2]);
1414 qname = quote_string(dp->name);
1415 g_printf(_("driver: finished-cmd time %s taper wrote %s:%s\n"),
1416 walltime_str(curclock()), dp->host->hostname, qname);
1419 log_add(L_WARNING, _("Taper error: %s"), result_argv[3]);
1420 taper_tape_error = stralloc(result_argv[3]);
1425 log_add(L_WARNING, _("Taper protocol error"));
1426 taper_tape_error = stralloc("BOGUS");
1429 * Since we received a taper error, we can't send anything more
1430 * to the taper. Go into degraded mode to try to get everthing
1431 * onto disk. Later, these dumps can be flushed to a new tape.
1432 * The tape queue is zapped so that it appears empty in future
1433 * checks. If there are dumps waiting for diskspace to be freed,
1438 _("going into degraded mode because of taper component error."));
1439 start_degraded_mode(&runq);
1441 tapeq.head = tapeq.tail = NULL;
1443 if(taper_ev_read != NULL) {
1444 event_release(taper_ev_read);
1445 taper_ev_read = NULL;
1447 if(cmd != TAPE_ERROR) aclose(taper);
1453 error(_("driver received unexpected token (%s) from taper"),
1458 if (taper_result != LAST_TOK) {
1460 if (taper_dumper->result != LAST_TOK) {
1461 // Dumper already returned it's result
1462 dumper_taper_result(taper_disk);
1465 file_taper_result(taper_disk);
1469 } while(areads_dataready(taper));
1477 char *qname = quote_string(dp->name);
1479 if (taper_result == DONE) {
1480 update_info_taper(dp, taper_first_label, taper_first_fileno,
1484 sched(dp)->taper_attempted += 1;
1486 if (taper_input_error) {
1487 g_printf("driver: taper failed %s %s: %s\n",
1488 dp->host->hostname, qname, taper_input_error);
1489 if (strcmp(sched(dp)->datestamp, driver_timestamp) == 0) {
1490 if(sched(dp)->taper_attempted >= 2) {
1491 log_add(L_FAIL, _("%s %s %s %d [too many taper retries after holding disk error: %s]"),
1492 dp->host->hostname, qname, sched(dp)->datestamp,
1493 sched(dp)->level, taper_input_error);
1494 g_printf("driver: taper failed %s %s, too many taper retry after holding disk error\n",
1495 dp->host->hostname, qname);
1496 amfree(sched(dp)->destname);
1497 amfree(sched(dp)->dumpdate);
1498 amfree(sched(dp)->degr_dumpdate);
1499 amfree(sched(dp)->datestamp);
1502 log_add(L_INFO, _("%s %s %s %d [Will retry dump because of holding disk error: %s]"),
1503 dp->host->hostname, qname, sched(dp)->datestamp,
1504 sched(dp)->level, taper_input_error);
1505 g_printf("driver: taper will retry %s %s because of holding disk error\n",
1506 dp->host->hostname, qname);
1507 if (dp->to_holdingdisk != HOLD_REQUIRED) {
1508 dp->to_holdingdisk = HOLD_NEVER;
1509 sched(dp)->dump_attempted -= 1;
1510 headqueue_disk(&directq, dp);
1512 amfree(sched(dp)->destname);
1513 amfree(sched(dp)->dumpdate);
1514 amfree(sched(dp)->degr_dumpdate);
1515 amfree(sched(dp)->datestamp);
1520 amfree(sched(dp)->destname);
1521 amfree(sched(dp)->dumpdate);
1522 amfree(sched(dp)->degr_dumpdate);
1523 amfree(sched(dp)->datestamp);
1526 } else if (taper_tape_error) {
1527 if(sched(dp)->taper_attempted >= 2) {
1528 log_add(L_FAIL, _("%s %s %s %d [too many taper retries]"),
1529 dp->host->hostname, qname, sched(dp)->datestamp,
1531 g_printf("driver: taper failed %s %s, too many taper retry\n",
1532 dp->host->hostname, qname);
1533 amfree(sched(dp)->destname);
1534 amfree(sched(dp)->dumpdate);
1535 amfree(sched(dp)->degr_dumpdate);
1536 amfree(sched(dp)->datestamp);
1539 g_printf("driver: taper will retry %s %s\n",
1540 dp->host->hostname, qname);
1541 /* Re-insert into taper queue. */
1542 headqueue_disk(&tapeq, dp);
1545 delete_diskspace(dp);
1546 amfree(sched(dp)->destname);
1547 amfree(sched(dp)->dumpdate);
1548 amfree(sched(dp)->degr_dumpdate);
1549 amfree(sched(dp)->datestamp);
1556 taper_input_error = NULL;
1557 taper_tape_error = NULL;
1560 /* continue with those dumps waiting for diskspace */
1561 continue_port_dumps();
1562 start_some_dumps(&runq);
1567 dumper_taper_result(
1574 dumper = sched(dp)->dumper;
1577 if(dumper->result == DONE && taper_result == DONE) {
1578 update_info_dumper(dp, sched(dp)->origsize,
1579 sched(dp)->dumpsize, sched(dp)->dumptime);
1580 update_info_taper(dp, taper_first_label, taper_first_fileno,
1582 qname = quote_string(dp->name); /*quote to take care of spaces*/
1584 log_add(L_STATS, _("estimate %s %s %s %d [sec %ld nkb %lld ckb %lld kps %lu]"),
1585 dp->host->hostname, qname, sched(dp)->datestamp,
1587 sched(dp)->est_time, (long long)sched(dp)->est_nsize,
1588 (long long)sched(dp)->est_csize,
1589 sched(dp)->est_kps);
1592 update_failed_dump_to_tape(dp);
1595 is_partial = dumper->result != DONE || taper_result != DONE;
1597 sched(dp)->dump_attempted += 1;
1598 sched(dp)->taper_attempted += 1;
1600 if((dumper->result != DONE || taper_result != DONE) &&
1601 sched(dp)->dump_attempted <= 1 &&
1602 sched(dp)->taper_attempted <= 1) {
1603 enqueue_disk(&directq, dp);
1606 if(dumper->ev_read != NULL) {
1607 event_release(dumper->ev_read);
1608 dumper->ev_read = NULL;
1610 if(taper_ev_read != NULL) {
1611 event_release(taper_ev_read);
1612 taper_ev_read = NULL;
1615 taper_input_error = NULL;
1616 taper_tape_error = NULL;
1618 dp->host->inprogress -= 1;
1620 deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
1629 for(dumper = dmptable; dumper < dmptable+inparallel; dumper++)
1630 if(!dumper->busy && !dumper->down) return dumper;
1636 dumper_chunker_result(
1641 assignedhd_t **h=NULL;
1648 dumper = sched(dp)->dumper;
1649 chunker = dumper->chunker;
1653 h = sched(dp)->holdp;
1654 activehd = sched(dp)->activehd;
1656 if(dumper->result == DONE && chunker->result == DONE) {
1657 update_info_dumper(dp, sched(dp)->origsize,
1658 sched(dp)->dumpsize, sched(dp)->dumptime);
1659 qname = quote_string(dp->name);/*quote to take care of spaces*/
1661 log_add(L_STATS, _("estimate %s %s %s %d [sec %ld nkb %lld ckb %lld kps %lu]"),
1662 dp->host->hostname, qname, sched(dp)->datestamp,
1664 sched(dp)->est_time, (long long)sched(dp)->est_nsize,
1665 (long long)sched(dp)->est_csize,
1666 sched(dp)->est_kps);
1670 deallocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
1672 is_partial = dumper->result != DONE || chunker->result != DONE;
1673 rename_tmp_holding(sched(dp)->destname, !is_partial);
1676 for( i = 0, h = sched(dp)->holdp; i < activehd; i++ ) {
1677 dummy += h[i]->used;
1680 size = holding_file_size(sched(dp)->destname, 0);
1681 h[activehd]->used = size - dummy;
1682 h[activehd]->disk->allocated_dumpers--;
1683 adjust_diskspace(dp, DONE);
1685 sched(dp)->dump_attempted += 1;
1687 if((dumper->result != DONE || chunker->result != DONE) &&
1688 sched(dp)->dump_attempted <= 1) {
1689 delete_diskspace(dp);
1690 if (sched(dp)->no_space) {
1691 enqueue_disk(&directq, dp);
1693 enqueue_disk(&runq, dp);
1696 else if(size > (off_t)DISK_BLOCK_KB) {
1697 enqueue_disk(&tapeq, dp);
1700 delete_diskspace(dp);
1704 dp->host->inprogress -= 1;
1707 waitpid(chunker->pid, NULL, 0 );
1708 aclose(chunker->fd);
1713 if (chunker->result == ABORT_FINISHED)
1715 continue_port_dumps();
1717 * Wakeup any dumpers that are sleeping because of network
1718 * or disk constraints.
1720 start_some_dumps(&runq);
1726 handle_dumper_result(
1729 /*static int pending_aborts = 0;*/
1730 dumper_t *dumper = cookie;
1735 char *result_argv[MAX_ARGS+1];
1737 assert(dumper != NULL);
1740 assert(sched(dp) != NULL);
1745 cmd = getresult(dumper->fd, 1, &result_argc, result_argv, MAX_ARGS+1);
1748 /* result_argv[2] always contains the serial number */
1749 sdp = serial2disk(result_argv[2]);
1751 error(_("Invalid serial number %s"), result_argv[2]);
1752 g_assert_not_reached();
1756 qname = quote_string(dp->name);
1759 case DONE: /* DONE <handle> <origsize> <dumpsize> <dumptime> <errstr> */
1760 if(result_argc != 6) {
1761 error(_("error [dumper DONE result_argc != 6: %d]"), result_argc);
1765 sched(dp)->origsize = OFF_T_ATOI(result_argv[3]);
1766 sched(dp)->dumptime = TIME_T_ATOI(result_argv[5]);
1768 g_printf(_("driver: finished-cmd time %s %s dumped %s:%s\n"),
1769 walltime_str(curclock()), dumper->name,
1770 dp->host->hostname, qname);
1773 dumper->result = cmd;
1777 case TRYAGAIN: /* TRY-AGAIN <handle> <errstr> */
1779 * Requeue this disk, and fall through to the FAILED
1782 if(sched(dp)->dump_attempted) {
1783 char *qname = quote_string(dp->name);
1784 log_add(L_FAIL, _("%s %s %s %d [too many dumper retry: %s]"),
1785 dp->host->hostname, qname, sched(dp)->datestamp,
1786 sched(dp)->level, result_argv[3]);
1787 g_printf(_("driver: dump failed %s %s %s, too many dumper retry: %s\n"),
1788 result_argv[2], dp->host->hostname, qname,
1793 case FAILED: /* FAILED <handle> <errstr> */
1794 /*free_serial(result_argv[2]);*/
1795 dumper->result = cmd;
1798 case ABORT_FINISHED: /* ABORT-FINISHED <handle> */
1800 * We sent an ABORT from the NO-ROOM case because this dump
1801 * wasn't going to fit onto the holding disk. We now need to
1802 * clean up the remains of this image, and try to finish
1803 * other dumps that are waiting on disk space.
1805 assert(pending_aborts);
1806 /*free_serial(result_argv[2]);*/
1807 dumper->result = cmd;
1811 /* either EOF or garbage from dumper. Turn it off */
1812 log_add(L_WARNING, _("%s pid %ld is messed up, ignoring it.\n"),
1813 dumper->name, (long)dumper->pid);
1814 if (dumper->ev_read) {
1815 event_release(dumper->ev_read);
1816 dumper->ev_read = NULL;
1820 dumper->down = 1; /* mark it down so it isn't used again */
1822 /* if it was dumping something, zap it and try again */
1823 if(sched(dp)->dump_attempted) {
1824 log_add(L_FAIL, _("%s %s %s %d [%s died]"),
1825 dp->host->hostname, qname, sched(dp)->datestamp,
1826 sched(dp)->level, dumper->name);
1829 log_add(L_WARNING, _("%s died while dumping %s:%s lev %d."),
1830 dumper->name, dp->host->hostname, qname,
1834 dumper->result = cmd;
1842 /* send the dumper result to the chunker */
1843 if (dumper->chunker) {
1844 if (dumper->chunker->down == 0 && dumper->chunker->fd != -1 &&
1845 dumper->chunker->result == LAST_TOK) {
1847 chunker_cmd(dumper->chunker, DONE, dp);
1850 chunker_cmd(dumper->chunker, FAILED, dp);
1853 if( dumper->result != LAST_TOK &&
1854 dumper->chunker->result != LAST_TOK)
1855 dumper_chunker_result(dp);
1856 } else { /* send the dumper result to the taper */
1857 if (taper_sendresult) {
1859 taper_cmd(DONE, driver_timestamp, NULL, 0, NULL);
1861 taper_cmd(FAILED, driver_timestamp, NULL, 0, NULL);
1863 taper_sendresult = 0;
1866 if (taper_dumper && taper_result != LAST_TOK) {
1867 dumper_taper_result(dp);
1869 } while(areads_dataready(dumper->fd));
1874 handle_chunker_result(
1877 /*static int pending_aborts = 0;*/
1878 chunker_t *chunker = cookie;
1879 assignedhd_t **h=NULL;
1884 char *result_argv[MAX_ARGS+1];
1889 assert(chunker != NULL);
1890 dumper = chunker->dumper;
1891 assert(dumper != NULL);
1894 assert(sched(dp) != NULL);
1895 assert(sched(dp)->destname != NULL);
1896 assert(dp != NULL && sched(dp) != NULL && sched(dp)->destname);
1898 if(dp && sched(dp) && sched(dp)->holdp) {
1899 h = sched(dp)->holdp;
1900 activehd = sched(dp)->activehd;
1907 cmd = getresult(chunker->fd, 1, &result_argc, result_argv, MAX_ARGS+1);
1910 /* result_argv[2] always contains the serial number */
1911 sdp = serial2disk(result_argv[2]);
1913 error(_("Invalid serial number %s"), result_argv[2]);
1914 g_assert_not_reached();
1920 case PARTIAL: /* PARTIAL <handle> <dumpsize> <errstr> */
1921 case DONE: /* DONE <handle> <dumpsize> <errstr> */
1922 if(result_argc != 4) {
1923 error(_("error [chunker %s result_argc != 4: %d]"), cmdstr[cmd],
1927 /*free_serial(result_argv[2]);*/
1929 sched(dp)->dumpsize = (off_t)atof(result_argv[3]);
1931 qname = quote_string(dp->name);
1932 g_printf(_("driver: finished-cmd time %s %s chunked %s:%s\n"),
1933 walltime_str(curclock()), chunker->name,
1934 dp->host->hostname, qname);
1938 event_release(chunker->ev_read);
1940 chunker->result = cmd;
1944 case TRYAGAIN: /* TRY-AGAIN <handle> <errstr> */
1945 event_release(chunker->ev_read);
1947 chunker->result = cmd;
1950 case FAILED: /* FAILED <handle> <errstr> */
1951 /*free_serial(result_argv[2]);*/
1953 event_release(chunker->ev_read);
1955 chunker->result = cmd;
1959 case NO_ROOM: /* NO-ROOM <handle> <missing_size> */
1960 if (!h || activehd < 0) { /* should never happen */
1961 error(_("!h || activehd < 0"));
1964 h[activehd]->used -= OFF_T_ATOI(result_argv[3]);
1965 h[activehd]->reserved -= OFF_T_ATOI(result_argv[3]);
1966 h[activehd]->disk->allocated_space -= OFF_T_ATOI(result_argv[3]);
1967 h[activehd]->disk->disksize -= OFF_T_ATOI(result_argv[3]);
1970 case RQ_MORE_DISK: /* RQ-MORE-DISK <handle> */
1971 if (!h || activehd < 0) { /* should never happen */
1972 error(_("!h || activehd < 0"));
1975 h[activehd]->disk->allocated_dumpers--;
1976 h[activehd]->used = h[activehd]->reserved;
1977 if( h[++activehd] ) { /* There's still some allocated space left.
1978 * Tell the dumper about it. */
1979 sched(dp)->activehd++;
1980 chunker_cmd( chunker, CONTINUE, dp );
1981 } else { /* !h[++activehd] - must allocate more space */
1982 sched(dp)->act_size = sched(dp)->est_size; /* not quite true */
1983 sched(dp)->est_size = (sched(dp)->act_size/(off_t)20) * (off_t)21; /* +5% */
1984 sched(dp)->est_size = am_round(sched(dp)->est_size, (off_t)DISK_BLOCK_KB);
1985 if (sched(dp)->est_size < sched(dp)->act_size + 2*DISK_BLOCK_KB)
1986 sched(dp)->est_size += 2 * DISK_BLOCK_KB;
1987 h = find_diskspace( sched(dp)->est_size - sched(dp)->act_size,
1991 /* No diskspace available. The reason for this will be
1992 * determined in continue_port_dumps(). */
1993 enqueue_disk( &roomq, dp );
1994 continue_port_dumps();
1996 /* OK, allocate space for disk and have chunker continue */
1997 sched(dp)->activehd = assign_holdingdisk( h, dp );
1998 chunker_cmd( chunker, CONTINUE, dp );
2004 case ABORT_FINISHED: /* ABORT-FINISHED <handle> */
2006 * We sent an ABORT from the NO-ROOM case because this dump
2007 * wasn't going to fit onto the holding disk. We now need to
2008 * clean up the remains of this image, and try to finish
2009 * other dumps that are waiting on disk space.
2011 /*assert(pending_aborts);*/
2013 /*free_serial(result_argv[2]);*/
2015 event_release(chunker->ev_read);
2017 chunker->result = cmd;
2022 /* either EOF or garbage from chunker. Turn it off */
2023 log_add(L_WARNING, _("%s pid %ld is messed up, ignoring it.\n"),
2024 chunker->name, (long)chunker->pid);
2027 /* if it was dumping something, zap it and try again */
2028 if (!h || activehd < 0) { /* should never happen */
2029 error(_("!h || activehd < 0"));
2032 qname = quote_string(dp->name);
2033 if(sched(dp)->dump_attempted) {
2034 log_add(L_FAIL, _("%s %s %s %d [%s died]"),
2035 dp->host->hostname, qname, sched(dp)->datestamp,
2036 sched(dp)->level, chunker->name);
2039 log_add(L_WARNING, _("%s died while dumping %s:%s lev %d."),
2040 chunker->name, dp->host->hostname, qname,
2047 event_release(chunker->ev_read);
2049 chunker->result = cmd;
2057 if(chunker->result != LAST_TOK && chunker->dumper->result != LAST_TOK)
2058 dumper_chunker_result(dp);
2060 } while(areads_dataready(chunker->fd));
2071 char *hostname, *diskname, *datestamp;
2075 char *inpline = NULL;
2081 char *qdestname = NULL;
2083 tq.head = tq.tail = NULL;
2085 for(line = 0; (inpline = agets(stdin)) != NULL; free(inpline)) {
2087 if (inpline[0] == '\0')
2093 skip_whitespace(s, ch); /* find the command */
2095 error(_("flush line %d: syntax error (no command)"), line);
2099 skip_non_whitespace(s, ch);
2102 if(strcmp(command,"ENDFLUSH") == 0) {
2106 if(strcmp(command,"FLUSH") != 0) {
2107 error(_("flush line %d: syntax error (%s != FLUSH)"), line, command);
2111 skip_whitespace(s, ch); /* find the hostname */
2113 error(_("flush line %d: syntax error (no hostname)"), line);
2117 skip_non_whitespace(s, ch);
2120 skip_whitespace(s, ch); /* find the diskname */
2122 error(_("flush line %d: syntax error (no diskname)"), line);
2126 skip_quoted_string(s, ch);
2127 s[-1] = '\0'; /* terminate the disk name */
2128 diskname = unquote_string(qname);
2130 skip_whitespace(s, ch); /* find the datestamp */
2132 error(_("flush line %d: syntax error (no datestamp)"), line);
2136 skip_non_whitespace(s, ch);
2139 skip_whitespace(s, ch); /* find the level number */
2140 if(ch == '\0' || sscanf(s - 1, "%d", &level) != 1) {
2141 error(_("flush line %d: syntax error (bad level)"), line);
2144 skip_integer(s, ch);
2146 skip_whitespace(s, ch); /* find the filename */
2148 error(_("flush line %d: syntax error (no filename)"), line);
2152 skip_quoted_string(s, ch);
2154 destname = unquote_string(qdestname);
2156 holding_file_get_dumpfile(destname, &file);
2157 if( file.type != F_DUMPFILE) {
2158 if( file.type != F_CONT_DUMPFILE )
2159 log_add(L_INFO, _("%s: ignoring cruft file."), destname);
2165 if(strcmp(hostname, file.name) != 0 ||
2166 strcmp(diskname, file.disk) != 0 ||
2167 strcmp(datestamp, file.datestamp) != 0) {
2168 log_add(L_INFO, _("disk %s:%s not consistent with file %s"),
2169 hostname, diskname, destname);
2176 dp = lookup_disk(file.name, file.disk);
2179 log_add(L_INFO, _("%s: disk %s:%s not in database, skipping it."),
2180 destname, file.name, file.disk);
2185 if(file.dumplevel < 0 || file.dumplevel > 9) {
2186 log_add(L_INFO, _("%s: ignoring file with bogus dump level %d."),
2187 destname, file.dumplevel);
2192 if (holding_file_size(destname,1) <= 0) {
2193 log_add(L_INFO, "%s: removing file with no data.", destname);
2194 holding_file_unlink(destname);
2199 dp1 = (disk_t *)alloc(SIZEOF(disk_t));
2201 dp1->next = dp1->prev = NULL;
2203 /* add it to the flushhost list */
2205 flushhost = alloc(SIZEOF(am_host_t));
2206 flushhost->next = NULL;
2207 flushhost->hostname = stralloc("FLUSHHOST");
2208 flushhost->up = NULL;
2209 flushhost->features = NULL;
2211 dp1->hostnext = flushhost->disks;
2212 flushhost->disks = dp1;
2214 sp = (sched_t *) alloc(SIZEOF(sched_t));
2215 sp->destname = destname;
2216 sp->level = file.dumplevel;
2217 sp->dumpdate = NULL;
2218 sp->degr_dumpdate = NULL;
2219 sp->datestamp = stralloc(file.datestamp);
2220 sp->est_nsize = (off_t)0;
2221 sp->est_csize = (off_t)0;
2225 sp->degr_level = -1;
2226 sp->dump_attempted = 0;
2227 sp->taper_attempted = 0;
2228 sp->act_size = holding_file_size(destname, 0);
2229 sp->holdp = build_diskspace(destname);
2230 if(sp->holdp == NULL) continue;
2232 sp->timestamp = (time_t)0;
2234 dp1->up = (char *)sp;
2236 enqueue_disk(&tq, dp1);
2249 int level, line, priority;
2250 char *dumpdate, *degr_dumpdate;
2252 time_t time, degr_time;
2253 time_t *time_p = &time;
2254 time_t *degr_time_p = °r_time;
2255 off_t nsize, csize, degr_nsize, degr_csize;
2256 unsigned long kps, degr_kps;
2257 char *hostname, *features, *diskname, *datestamp, *inpline = NULL;
2261 off_t flush_size = (off_t)0;
2266 long long degr_nsize_;
2267 long long degr_csize_;
2269 (void)cookie; /* Quiet unused parameter warning */
2271 event_release(schedule_ev_read);
2273 /* read schedule from stdin */
2275 for(line = 0; (inpline = agets(stdin)) != NULL; free(inpline)) {
2276 if (inpline[0] == '\0')
2283 skip_whitespace(s, ch); /* find the command */
2285 error(_("schedule line %d: syntax error (no command)"), line);
2289 skip_non_whitespace(s, ch);
2292 if(strcmp(command,"DUMP") != 0) {
2293 error(_("schedule line %d: syntax error (%s != DUMP)"), line, command);
2297 skip_whitespace(s, ch); /* find the host name */
2299 error(_("schedule line %d: syntax error (no host name)"), line);
2303 skip_non_whitespace(s, ch);
2306 skip_whitespace(s, ch); /* find the feature list */
2308 error(_("schedule line %d: syntax error (no feature list)"), line);
2312 skip_non_whitespace(s, ch);
2315 skip_whitespace(s, ch); /* find the disk name */
2317 error(_("schedule line %d: syntax error (no disk name)"), line);
2321 skip_quoted_string(s, ch);
2322 s[-1] = '\0'; /* terminate the disk name */
2323 diskname = unquote_string(qname);
2325 skip_whitespace(s, ch); /* find the datestamp */
2327 error(_("schedule line %d: syntax error (no datestamp)"), line);
2331 skip_non_whitespace(s, ch);
2334 skip_whitespace(s, ch); /* find the priority number */
2335 if(ch == '\0' || sscanf(s - 1, "%d", &priority) != 1) {
2336 error(_("schedule line %d: syntax error (bad priority)"), line);
2339 skip_integer(s, ch);
2341 skip_whitespace(s, ch); /* find the level number */
2342 if(ch == '\0' || sscanf(s - 1, "%d", &level) != 1) {
2343 error(_("schedule line %d: syntax error (bad level)"), line);
2346 skip_integer(s, ch);
2348 skip_whitespace(s, ch); /* find the dump date */
2350 error(_("schedule line %d: syntax error (bad dump date)"), line);
2354 skip_non_whitespace(s, ch);
2357 skip_whitespace(s, ch); /* find the native size */
2359 if(ch == '\0' || sscanf(s - 1, "%lld", &nsize_) != 1) {
2360 error(_("schedule line %d: syntax error (bad nsize)"), line);
2363 nsize = (off_t)nsize_;
2364 skip_integer(s, ch);
2366 skip_whitespace(s, ch); /* find the compressed size */
2368 if(ch == '\0' || sscanf(s - 1, "%lld", &csize_) != 1) {
2369 error(_("schedule line %d: syntax error (bad csize)"), line);
2372 csize = (off_t)csize_;
2373 skip_integer(s, ch);
2375 skip_whitespace(s, ch); /* find the time number */
2376 if(ch == '\0' || sscanf(s - 1, "%lld", &time_) != 1) {
2377 error(_("schedule line %d: syntax error (bad estimated time)"), line);
2380 *time_p = (time_t)time_;
2381 skip_integer(s, ch);
2383 skip_whitespace(s, ch); /* find the kps number */
2384 if(ch == '\0' || sscanf(s - 1, "%lu", &kps) != 1) {
2385 error(_("schedule line %d: syntax error (bad kps)"), line);
2388 skip_integer(s, ch);
2390 degr_dumpdate = NULL; /* flag if degr fields found */
2391 skip_whitespace(s, ch); /* find the degr level number */
2393 if(sscanf(s - 1, "%d", °r_level) != 1) {
2394 error(_("schedule line %d: syntax error (bad degr level)"), line);
2397 skip_integer(s, ch);
2399 skip_whitespace(s, ch); /* find the degr dump date */
2401 error(_("schedule line %d: syntax error (bad degr dump date)"), line);
2404 degr_dumpdate = s - 1;
2405 skip_non_whitespace(s, ch);
2408 skip_whitespace(s, ch); /* find the degr native size */
2409 degr_nsize_ = (off_t)0;
2410 if(ch == '\0' || sscanf(s - 1, "%lld", °r_nsize_) != 1) {
2411 error(_("schedule line %d: syntax error (bad degr nsize)"), line);
2414 degr_nsize = (off_t)degr_nsize_;
2415 skip_integer(s, ch);
2417 skip_whitespace(s, ch); /* find the degr compressed size */
2418 degr_csize_ = (off_t)0;
2419 if(ch == '\0' || sscanf(s - 1, "%lld", °r_csize_) != 1) {
2420 error(_("schedule line %d: syntax error (bad degr csize)"), line);
2423 degr_csize = (off_t)degr_csize_;
2424 skip_integer(s, ch);
2426 skip_whitespace(s, ch); /* find the degr time number */
2427 if(ch == '\0' || sscanf(s - 1, "%lld", &time_) != 1) {
2428 error(_("schedule line %d: syntax error (bad degr estimated time)"), line);
2431 *degr_time_p = (time_t)time_;
2432 skip_integer(s, ch);
2434 skip_whitespace(s, ch); /* find the degr kps number */
2435 if(ch == '\0' || sscanf(s - 1, "%lu", °r_kps) != 1) {
2436 error(_("schedule line %d: syntax error (bad degr kps)"), line);
2439 skip_integer(s, ch);
2442 degr_nsize = (off_t)0;
2443 degr_csize = (off_t)0;
2444 degr_time = (time_t)0;
2448 dp = lookup_disk(hostname, diskname);
2451 _("schedule line %d: %s:'%s' not in disklist, ignored"),
2452 line, hostname, qname);
2457 sp = (sched_t *) alloc(SIZEOF(sched_t));
2460 sp->dumpdate = stralloc(dumpdate);
2461 sp->est_nsize = DISK_BLOCK_KB + nsize; /* include header */
2462 sp->est_csize = DISK_BLOCK_KB + csize; /* include header */
2463 /* round estimate to next multiple of DISK_BLOCK_KB */
2464 sp->est_csize = am_round(sp->est_csize, DISK_BLOCK_KB);
2465 sp->est_size = sp->est_csize;
2466 sp->est_time = time;
2468 sp->priority = priority;
2469 sp->datestamp = stralloc(datestamp);
2472 sp->degr_level = degr_level;
2473 sp->degr_dumpdate = stralloc(degr_dumpdate);
2474 sp->degr_nsize = DISK_BLOCK_KB + degr_nsize;
2475 sp->degr_csize = DISK_BLOCK_KB + degr_csize;
2476 /* round estimate to next multiple of DISK_BLOCK_KB */
2477 sp->degr_csize = am_round(sp->degr_csize, DISK_BLOCK_KB);
2478 sp->degr_time = degr_time;
2479 sp->degr_kps = degr_kps;
2481 sp->degr_level = -1;
2482 sp->degr_dumpdate = NULL;
2486 sp->dump_attempted = 0;
2487 sp->taper_attempted = 0;
2492 sp->timestamp = (time_t)0;
2493 sp->destname = NULL;
2496 dp->up = (char *) sp;
2497 if(dp->host->features == NULL) {
2498 dp->host->features = am_string_to_feature(features);
2500 remove_disk(&waitq, dp);
2501 if (dp->to_holdingdisk == HOLD_NEVER) {
2502 enqueue_disk(&directq, dp);
2504 enqueue_disk(&runq, dp);
2506 flush_size += sp->act_size;
2509 g_printf(_("driver: flush size %lld\n"), (long long)flush_size);
2512 log_add(L_WARNING, _("WARNING: got empty schedule from planner"));
2513 if(need_degraded==1) start_degraded_mode(&runq);
2515 start_some_dumps(&runq);
2518 static unsigned long
2526 unsigned long maxusage=0;
2527 unsigned long curusage=0;
2528 for(p = disklist_netifs(); p != NULL; p = p->next) {
2529 maxusage += interface_get_maxusage(p->config);
2530 curusage += p->curusage;
2532 if (maxusage >= curusage)
2533 res = maxusage - curusage;
2538 if ((unsigned long)interface_get_maxusage(ip->config) >= ip->curusage)
2539 res = interface_get_maxusage(ip->config) - ip->curusage;
2554 g_printf(_("driver: interface-state time %s"), time_str);
2556 for(ip = disklist_netifs(); ip != NULL; ip = ip->next) {
2557 g_printf(_(" if %s: free %lu"), interface_name(ip->config), free_kps(ip));
2567 ip->curusage += kps;
2571 deallocate_bandwidth(
2575 assert(kps <= ip->curusage);
2576 ip->curusage -= kps;
2587 total_free = (off_t)0;
2588 for(ha = holdalloc; ha != NULL; ha = ha->next) {
2589 diff = ha->disksize - ha->allocated_space;
2597 * We return an array of pointers to assignedhd_t. The array contains at
2598 * most one entry per holding disk. The list of pointers is terminated by
2599 * a NULL pointer. Each entry contains a pointer to a holdingdisk and
2600 * how much diskspace to use on that disk. Later on, assign_holdingdisk
2601 * will allocate the given amount of space.
2602 * If there is not enough room on the holdingdisks, NULL is returned.
2605 static assignedhd_t **
2609 assignedhd_t * pref)
2611 assignedhd_t **result = NULL;
2612 holdalloc_t *ha, *minp;
2616 off_t halloc, dalloc, hfree, dfree;
2618 (void)cur_idle; /* Quiet unused parameter warning */
2620 if (size < 2*DISK_BLOCK_KB)
2621 size = 2*DISK_BLOCK_KB;
2622 size = am_round(size, (off_t)DISK_BLOCK_KB);
2624 hold_debug(1, _("find_diskspace: want %lld K\n"),
2627 used = alloc(SIZEOF(*used) * num_holdalloc);/*disks used during this run*/
2628 memset( used, 0, (size_t)num_holdalloc );
2629 result = alloc(SIZEOF(assignedhd_t *) * (num_holdalloc + 1));
2632 while( i < num_holdalloc && size > (off_t)0 ) {
2633 /* find the holdingdisk with the fewest active dumpers and among
2634 * those the one with the biggest free space
2636 minp = NULL; minj = -1;
2637 for(j = 0, ha = holdalloc; ha != NULL; ha = ha->next, j++ ) {
2638 if( pref && pref->disk == ha && !used[j] &&
2639 ha->allocated_space <= ha->disksize - (off_t)DISK_BLOCK_KB) {
2644 else if( ha->allocated_space <= ha->disksize - (off_t)(2*DISK_BLOCK_KB) &&
2647 ha->allocated_dumpers < minp->allocated_dumpers ||
2648 (ha->allocated_dumpers == minp->allocated_dumpers &&
2649 ha->disksize-ha->allocated_space > minp->disksize-minp->allocated_space)) ) {
2656 if( !minp ) { break; } /* all holding disks are full */
2659 /* hfree = free space on the disk */
2660 hfree = minp->disksize - minp->allocated_space;
2662 /* dfree = free space for data, remove 1 header for each chunksize */
2663 dfree = hfree - (((hfree-(off_t)1)/holdingdisk_get_chunksize(minp->hdisk))+(off_t)1) * (off_t)DISK_BLOCK_KB;
2665 /* dalloc = space I can allocate for data */
2666 dalloc = ( dfree < size ) ? dfree : size;
2668 /* halloc = space to allocate, including 1 header for each chunksize */
2669 halloc = dalloc + (((dalloc-(off_t)1)/holdingdisk_get_chunksize(minp->hdisk))+(off_t)1) * (off_t)DISK_BLOCK_KB;
2671 hold_debug(1, _("find_diskspace: find diskspace: size %lld hf %lld df %lld da %lld ha %lld\n"),
2678 result[i] = alloc(SIZEOF(assignedhd_t));
2679 result[i]->disk = minp;
2680 result[i]->reserved = halloc;
2681 result[i]->used = (off_t)0;
2682 result[i]->destname = NULL;
2688 if(size != (off_t)0) { /* not enough space available */
2689 g_printf(_("find diskspace: not enough diskspace. Left with %lld K\n"), (long long)size);
2691 free_assignedhd(result);
2695 if (debug_holding > 1) {
2696 for( i = 0; result && result[i]; i++ ) {
2697 hold_debug(1, _("find_diskspace: find diskspace: selected %s free %lld reserved %lld dumpers %d\n"),
2698 holdingdisk_get_diskdir(result[i]->disk->hdisk),
2699 (long long)(result[i]->disk->disksize -
2700 result[i]->disk->allocated_space),
2701 (long long)result[i]->reserved,
2702 result[i]->disk->allocated_dumpers);
2711 assignedhd_t ** holdp,
2716 char *sfn = sanitise_filename(diskp->name);
2718 assignedhd_t **new_holdp;
2721 g_snprintf( lvl, SIZEOF(lvl), "%d", sched(diskp)->level );
2723 size = am_round(sched(diskp)->est_size - sched(diskp)->act_size,
2724 (off_t)DISK_BLOCK_KB);
2726 for( c = 0; holdp[c]; c++ )
2727 (void)c; /* count number of disks */
2729 /* allocate memory for sched(diskp)->holdp */
2730 for(j = 0; sched(diskp)->holdp && sched(diskp)->holdp[j]; j++)
2731 (void)j; /* Quiet lint */
2732 new_holdp = (assignedhd_t **)alloc(SIZEOF(assignedhd_t*)*(j+c+1));
2733 if (sched(diskp)->holdp) {
2734 memcpy(new_holdp, sched(diskp)->holdp, j * SIZEOF(*new_holdp));
2735 amfree(sched(diskp)->holdp);
2737 sched(diskp)->holdp = new_holdp;
2741 if( j > 0 ) { /* This is a request for additional diskspace. See if we can
2742 * merge assignedhd_t's */
2744 if( sched(diskp)->holdp[j-1]->disk == holdp[0]->disk ) { /* Yes! */
2745 sched(diskp)->holdp[j-1]->reserved += holdp[0]->reserved;
2746 holdp[0]->disk->allocated_space += holdp[0]->reserved;
2747 size = (holdp[0]->reserved>size) ? (off_t)0 : size-holdp[0]->reserved;
2748 qname = quote_string(diskp->name);
2749 hold_debug(1, _("assign_holdingdisk: merging holding disk %s to disk %s:%s, add %lld for reserved %lld, left %lld\n"),
2750 holdingdisk_get_diskdir(
2751 sched(diskp)->holdp[j-1]->disk->hdisk),
2752 diskp->host->hostname, qname,
2753 (long long)holdp[0]->reserved,
2754 (long long)sched(diskp)->holdp[j-1]->reserved,
2763 /* copy assignedhd_s to sched(diskp), adjust allocated_space */
2764 for( ; holdp[i]; i++ ) {
2765 holdp[i]->destname = newvstralloc( holdp[i]->destname,
2766 holdingdisk_get_diskdir(holdp[i]->disk->hdisk), "/",
2767 hd_driver_timestamp, "/",
2768 diskp->host->hostname, ".",
2771 sched(diskp)->holdp[j++] = holdp[i];
2772 holdp[i]->disk->allocated_space += holdp[i]->reserved;
2773 size = (holdp[i]->reserved > size) ? (off_t)0 :
2774 (size - holdp[i]->reserved);
2775 qname = quote_string(diskp->name);
2777 _("assign_holdingdisk: %d assigning holding disk %s to disk %s:%s, reserved %lld, left %lld\n"),
2778 i, holdingdisk_get_diskdir(holdp[i]->disk->hdisk),
2779 diskp->host->hostname, qname,
2780 (long long)holdp[i]->reserved,
2783 holdp[i] = NULL; /* so it doesn't get free()d... */
2785 sched(diskp)->holdp[j] = NULL;
2796 assignedhd_t **holdp;
2797 off_t total = (off_t)0;
2800 char *qname, *hqname, *qdest;
2802 (void)cmd; /* Quiet unused parameter warning */
2804 qname = quote_string(diskp->name);
2805 qdest = quote_string(sched(diskp)->destname);
2806 hold_debug(1, _("adjust_diskspace: %s:%s %s\n"),
2807 diskp->host->hostname, qname, qdest);
2809 holdp = sched(diskp)->holdp;
2811 assert(holdp != NULL);
2813 for( i = 0; holdp[i]; i++ ) { /* for each allocated disk */
2814 diff = holdp[i]->used - holdp[i]->reserved;
2815 total += holdp[i]->used;
2816 holdp[i]->disk->allocated_space += diff;
2817 hqname = quote_string(holdingdisk_name(holdp[i]->disk->hdisk));
2818 hold_debug(1, _("adjust_diskspace: hdisk %s done, reserved %lld used %lld diff %lld alloc %lld dumpers %d\n"),
2819 holdingdisk_name(holdp[i]->disk->hdisk),
2820 (long long)holdp[i]->reserved,
2821 (long long)holdp[i]->used,
2823 (long long)holdp[i]->disk->allocated_space,
2824 holdp[i]->disk->allocated_dumpers );
2825 holdp[i]->reserved += diff;
2829 sched(diskp)->act_size = total;
2831 hold_debug(1, _("adjust_diskspace: after: disk %s:%s used %lld\n"),
2832 diskp->host->hostname, qname,
2833 (long long)sched(diskp)->act_size);
2842 assignedhd_t **holdp;
2845 holdp = sched(diskp)->holdp;
2847 assert(holdp != NULL);
2849 for( i = 0; holdp[i]; i++ ) { /* for each disk */
2850 /* find all files of this dump on that disk, and subtract their
2851 * reserved sizes from the disk's allocated space
2853 holdp[i]->disk->allocated_space -= holdp[i]->used;
2856 holding_file_unlink(holdp[0]->destname); /* no need for the entire list,
2857 * because holding_file_unlink
2858 * will walk through all files
2859 * using cont_filename */
2860 free_assignedhd(sched(diskp)->holdp);
2861 sched(diskp)->holdp = NULL;
2862 sched(diskp)->act_size = (off_t)0;
2865 static assignedhd_t **
2872 char buffer[DISK_BLOCK_BYTES];
2874 assignedhd_t **result;
2877 char dirname[1000], *ch;
2879 char *filename = destname;
2881 memset(buffer, 0, sizeof(buffer));
2882 used = alloc(SIZEOF(off_t) * num_holdalloc);
2883 for(i=0;i<num_holdalloc;i++)
2885 result = alloc(SIZEOF(assignedhd_t *) * (num_holdalloc + 1));
2887 while(filename != NULL && filename[0] != '\0') {
2888 strncpy(dirname, filename, 999);
2890 ch = strrchr(dirname,'/');
2892 ch = strrchr(dirname,'/');
2895 for(j = 0, ha = holdalloc; ha != NULL; ha = ha->next, j++ ) {
2896 if(strcmp(dirname, holdingdisk_get_diskdir(ha->hdisk))==0) {
2901 if(stat(filename, &finfo) == -1) {
2902 g_fprintf(stderr, _("stat %s: %s\n"), filename, strerror(errno));
2903 finfo.st_size = (off_t)0;
2905 used[j] += ((off_t)finfo.st_size+(off_t)1023)/(off_t)1024;
2906 if((fd = open(filename,O_RDONLY)) == -1) {
2907 g_fprintf(stderr,_("build_diskspace: open of %s failed: %s\n"),
2908 filename, strerror(errno));
2911 if ((buflen = fullread(fd, buffer, SIZEOF(buffer))) > 0) {;
2912 parse_file_header(buffer, &file, (size_t)buflen);
2915 filename = file.cont_filename;
2918 for(j = 0, i=0, ha = holdalloc; ha != NULL; ha = ha->next, j++ ) {
2919 if(used[j] != (off_t)0) {
2920 result[i] = alloc(SIZEOF(assignedhd_t));
2921 result[i]->disk = ha;
2922 result[i]->reserved = used[j];
2923 result[i]->used = used[j];
2924 result[i]->destname = stralloc(destname);
2942 g_printf(_("driver: hdisk-state time %s"), time_str);
2944 for(ha = holdalloc, dsk = 0; ha != NULL; ha = ha->next, dsk++) {
2945 diff = ha->disksize - ha->allocated_space;
2946 g_printf(_(" hdisk %d: free %lld dumpers %d"), dsk,
2947 (long long)diff, ha->allocated_dumpers);
2953 update_failed_dump_to_tape(
2957 * should simply set no_bump
2960 time_t save_timestamp = sched(dp)->timestamp;
2961 /* setting timestamp to 0 removes the current level from the
2962 * database, so that we ensure that it will not be bumped to the
2963 * next level on the next run. If we didn't do this, dumpdates or
2964 * gnutar-lists might have been updated already, and a bumped
2965 * incremental might be created. */
2966 sched(dp)->timestamp = 0;
2967 update_info_dumper(dp, (off_t)-1, (off_t)-1, (time_t)-1);
2968 sched(dp)->timestamp = save_timestamp;
2971 /* ------------------- */
2978 int result_argc, rc;
2979 char *result_argv[MAX_ARGS+1];
2982 qname = quote_string(dp->name);
2983 g_printf(_("driver: dumping %s:%s directly to tape\n"),
2984 dp->host->hostname, qname);
2987 /* pick a dumper and fail if there are no idle dumpers */
2989 dumper = idle_dumper();
2991 g_printf(_("driver: no idle dumpers for %s:%s.\n"),
2992 dp->host->hostname, qname);
2994 log_add(L_WARNING, _("no idle dumpers for %s:%s.\n"),
2995 dp->host->hostname, qname);
2997 return; /* fatal problem */
3000 /* tell the taper to read from a port number of its choice */
3002 taper_cmd(PORT_WRITE, dp, NULL, sched(dp)->level, sched(dp)->datestamp);
3003 cmd = getresult(taper, 1, &result_argc, result_argv, MAX_ARGS+1);
3005 g_printf(_("driver: did not get PORT from taper for %s:%s\n"),
3006 dp->host->hostname, qname);
3008 log_add(L_WARNING, _("driver: did not get PORT from taper for %s:%s.\n"),
3009 dp->host->hostname, qname);
3011 return; /* fatal problem */
3015 /* copy port number */
3016 dumper->output_port = atoi(result_argv[2]);
3019 dumper->chunker = NULL;
3020 dumper->result = LAST_TOK;
3021 taper_result = LAST_TOK;
3022 sched(dp)->dumper = dumper;
3024 /* tell the dumper to dump to a port */
3025 dumper_cmd(dumper, PORT_DUMP, dp);
3026 dp->host->start_t = time(NULL) + 15;
3028 /* update statistics & print state */
3030 taper_busy = dumper->busy = 1;
3031 taper_input_error = NULL;
3032 taper_tape_error = NULL;
3033 taper_dumper = dumper;
3035 taper_input_error = NULL;
3036 taper_tape_error = NULL;
3037 taper_first_label = NULL;
3039 taper_state |= TAPER_STATE_DUMP_TO_TAPE;
3040 sched(dp)->act_size = sched(dp)->est_size;
3041 dp->host->inprogress += 1;
3043 sched(dp)->timestamp = time((time_t *)0);
3044 allocate_bandwidth(dp->host->netif, sched(dp)->est_kps);
3045 idle_reason = NOT_IDLE;
3049 dumper->ev_read = event_register(dumper->fd, EV_READFD,
3050 handle_dumper_result, dumper);
3051 taper_ev_read = event_register(taper, EV_READFD,
3052 handle_taper_result, NULL);
3062 for(len = 0, p = q.head; p != NULL; len++, p = p->next)
3063 (void)len; /* Quiet lint */
3068 short_dump_state(void)
3073 wall_time = walltime_str(curclock());
3075 g_printf(_("driver: state time %s "), wall_time);
3076 g_printf(_("free kps: %lu space: %lld taper: "),
3078 (long long)free_space());
3079 if(degraded_mode) g_printf(_("DOWN"));
3080 else if(!taper_busy) g_printf(_("idle"));
3081 else g_printf(_("writing"));
3083 for(i = 0; i < inparallel; i++) if(!dmptable[i].busy) nidle++;
3084 g_printf(_(" idle-dumpers: %d"), nidle);
3085 g_printf(_(" qlen tapeq: %d"), queue_length(tapeq));
3086 g_printf(_(" runq: %d"), queue_length(runq));
3087 g_printf(_(" roomq: %d"), queue_length(roomq));
3088 g_printf(_(" wakeup: %d"), (int)sleep_time);
3089 g_printf(_(" driver-idle: %s\n"), _(idle_strings[idle_reason]));
3090 interface_state(wall_time);
3091 holdingdisk_state(wall_time);
3095 static TapeAction tape_action(void)
3097 TapeAction result = TAPE_ACTION_NO_ACTION;
3105 off_t dump_to_disk_size;
3106 int dump_to_disk_terminated;
3109 for(dumper = dmptable; dumper < (dmptable+inparallel); dumper++) {
3111 dumpers_size += sched(dumper->dp)->est_size;
3113 driver_debug(1, _("dumpers_size: %lld\n"), (long long)dumpers_size);
3116 for(dp = runq.head; dp != NULL; dp = dp->next) {
3117 runq_size += sched(dp)->est_size;
3119 driver_debug(1, _("runq_size: %lld\n"), (long long)runq_size);
3122 for(dp = directq.head; dp != NULL; dp = dp->next) {
3123 directq_size += sched(dp)->est_size;
3125 driver_debug(1, _("directq_size: %lld\n"), (long long)directq_size);
3128 for(dp = tapeq.head; dp != NULL; dp = dp->next) {
3129 tapeq_size += sched(dp)->act_size;
3132 tapeq_size += sched(taper_disk)->act_size - taper_written;
3134 driver_debug(1, _("tapeq_size: %lld\n"), (long long)tapeq_size);
3136 sched_size = runq_size + tapeq_size + dumpers_size;
3137 driver_debug(1, _("sched_size: %lld\n"), (long long)sched_size);
3139 dump_to_disk_size = dumpers_size + runq_size;
3140 driver_debug(1, _("dump_to_disk_size: %lld\n"), (long long)dump_to_disk_size);
3142 dump_to_disk_terminated = schedule_done && dump_to_disk_size == 0;
3144 // Changing conditionals can produce a driver hang, take care.
3146 // when to start writting to a new tape
3147 if ((taper_state & TAPER_STATE_WAIT_FOR_TAPE) &&
3148 ((taper_state & TAPER_STATE_DUMP_TO_TAPE) || // for dump to tape
3149 !empty(directq) || // if a dle is waiting for a dump to tape
3150 !empty(roomq) || // holding disk constraint
3151 idle_reason == IDLE_NO_DISKSPACE || // holding disk constraint
3152 (flush_threshold_dumped < tapeq_size && // flush-threshold-dumped &&
3153 flush_threshold_scheduled < sched_size) || // flush-threshold-scheduled
3154 (taperflush < tapeq_size && // taperflush
3155 (force_flush == 1 || // if force_flush
3156 dump_to_disk_terminated)) // or all dump to disk terminated
3158 result |= TAPE_ACTION_NEW_TAPE;
3159 // when to stop using new tape
3160 } else if ((taper_state & TAPER_STATE_WAIT_FOR_TAPE) &&
3161 (taperflush >= tapeq_size && // taperflush criteria not meet
3162 (force_flush == 1 || // if force_flush
3163 dump_to_disk_terminated)) // or all dump to disk terminated
3165 result |= TAPE_ACTION_NO_NEW_TAPE;
3168 // when to start a flush
3169 // We don't start a flush if taper_tape_started == 1 && dump_to_disk_terminated && force_flush == 0,
3170 // it is a criteria need to exit the first event_loop without flushing everything to tape,
3171 // they will be flush in another event_loop.
3172 if (!degraded_mode && !taper_busy && !empty(tapeq) &&
3173 (!((taper_state & TAPER_STATE_TAPE_STARTED) &&
3174 dump_to_disk_terminated && force_flush == 0) || // if tape already started and dump to disk not terminated
3175 ((taper_state & TAPER_STATE_TAPE_STARTED) &&
3176 force_flush == 1) || // if tape already started and force_flush
3177 !empty(roomq) || // holding disk constraint
3178 idle_reason == IDLE_NO_DISKSPACE || // holding disk constraint
3179 (flush_threshold_dumped < tapeq_size && // flush-threshold-dumped &&
3180 flush_threshold_scheduled < sched_size) || // flush-threshold-scheduled
3181 (force_flush == 1 && taperflush < tapeq_size))) { // taperflush if force_flush
3182 result |= TAPE_ACTION_START_A_FLUSH;
3196 g_printf("================\n");
3197 g_printf(_("driver state at time %s: %s\n"), walltime_str(curclock()), str);
3198 g_printf(_("free kps: %lu, space: %lld\n"),
3200 (long long)free_space());
3201 if(degraded_mode) g_printf(_("taper: DOWN\n"));
3202 else if(!taper_busy) g_printf(_("taper: idle\n"));
3203 else g_printf(_("taper: writing %s:%s.%d est size %lld\n"),
3204 taper_disk->host->hostname, taper_disk->name,
3205 sched(taper_disk)->level,
3206 (long long)sched(taper_disk)->est_size);
3207 for(i = 0; i < inparallel; i++) {
3208 dp = dmptable[i].dp;
3209 if(!dmptable[i].busy)
3210 g_printf(_("%s: idle\n"), dmptable[i].name);
3212 qname = quote_string(dp->name);
3213 g_printf(_("%s: dumping %s:%s.%d est kps %d size %lld time %lu\n"),
3214 dmptable[i].name, dp->host->hostname, qname, sched(dp)->level,
3215 sched(dp)->est_kps, (long long)sched(dp)->est_size, sched(dp)->est_time);
3218 dump_queue("TAPE", tapeq, 5, stdout);
3219 dump_queue("ROOM", roomq, 5, stdout);
3220 dump_queue("RUN ", runq, 5, stdout);
3221 g_printf("================\n");