2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 1991-1999 University of Maryland at College Park
6 * Permission to use, copy, modify, distribute, and sell this software and its
7 * documentation for any purpose is hereby granted without fee, provided that
8 * the above copyright notice appear in all copies and that both that
9 * copyright notice and this permission notice appear in supporting
10 * documentation, and that the name of U.M. not be used in advertising or
11 * publicity pertaining to distribution of the software without specific,
12 * written prior permission. U.M. makes no representations about the
13 * suitability of this software for any purpose. It is provided "as is"
14 * without express or implied warranty.
16 * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20 * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21 * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
23 * Authors: the Amanda Development Team. Its members are listed in a
24 * file named AUTHORS, in the root directory of this distribution.
26 /* $Id: chunker.c,v 1.25.2.1 2006/04/23 18:52:04 martinea Exp $
28 * requests remote amandad processes to dump filesystems
42 #include "fileheader.h"
43 #include "amfeatures.h"
44 #include "server_util.h"
56 #define CONNECT_TIMEOUT 5*60
58 #define STARTUP_TIMEOUT 60
61 int fd; /* file to flush to */
62 char *filename; /* name of what fd points to */
63 int filename_seq; /* for chunking */
64 long split_size; /* when to chunk */
65 long chunk_size; /* size of each chunk */
66 long use; /* size to use on this disk */
67 char buf[DISK_BLOCK_BYTES];
68 char *datain; /* data buffer markers */
73 static char *handle = NULL;
75 static char *errstr = NULL;
76 static int abort_pending;
77 static long dumpsize, headersize;
78 static long dumpbytes;
81 static char *hostname = NULL;
82 static char *diskname = NULL;
83 static char *options = NULL;
84 static char *progname = NULL;
86 static char *dumpdate = NULL;
87 static char *datestamp = NULL;
88 static int command_in_transit;
90 static dumpfile_t file;
93 int main P((int, char **));
94 static int write_tapeheader P((int, dumpfile_t *));
95 static void databuf_init P((struct databuf *, int, char *, long, long));
96 static int databuf_flush P((struct databuf *));
98 static int startup_chunker P((char *, long, long, struct databuf *));
99 static int do_chunk P((int, struct databuf *));
103 main(main_argc, main_argv)
107 static struct databuf db;
108 struct cmdargs cmdargs;
111 unsigned long malloc_hist_1, malloc_size_1;
112 unsigned long malloc_hist_2, malloc_size_2;
118 am_feature_t *their_features = NULL;
123 set_pname("chunker");
125 /* Don't die when child closes pipe */
126 signal(SIGPIPE, SIG_IGN);
128 malloc_size_1 = malloc_inuse(&malloc_hist_1);
130 erroutput_type = (ERR_AMANDALOG|ERR_INTERACTIVE);
131 set_logerror(logerror);
134 config_name = stralloc(main_argv[1]);
135 config_dir = vstralloc(CONFIG_DIR, "/", config_name, "/", NULL);
137 char my_cwd[STR_SIZE];
139 if (getcwd(my_cwd, sizeof(my_cwd)) == NULL) {
140 error("cannot determine current working directory");
142 config_dir = stralloc2(my_cwd, "/");
143 if ((config_name = strrchr(my_cwd, '/')) != NULL) {
144 config_name = stralloc(config_name + 1);
150 conffile = stralloc2(config_dir, CONFFILE_NAME);
151 if(read_conffile(conffile)) {
152 error("errors processing config file \"%s\"", conffile);
157 "%s: pid %ld executable %s version %s\n",
158 get_pname(), (long) getpid(),
159 main_argv[0], version());
162 /* now, make sure we are a valid user */
164 if (getpwuid(getuid()) == NULL)
165 error("can't get login name for my uid %ld", (long)getuid());
167 signal(SIGPIPE, SIG_IGN);
168 signal(SIGCHLD, SIG_IGN);
170 cmd = getcmd(&cmdargs);
172 if(cmdargs.argc <= 1)
173 error("error [dumper START: not enough args: datestamp]");
174 datestamp = newstralloc(datestamp, cmdargs.argv[2]);
177 error("Didn't get START command");
181 cmd = getcmd(&cmdargs);
202 cmdargs.argc++; /* true count of args */
205 if(a >= cmdargs.argc) {
206 error("error [chunker PORT-WRITE: not enough args: handle]");
208 handle = newstralloc(handle, cmdargs.argv[a++]);
210 if(a >= cmdargs.argc) {
211 error("error [chunker PORT-WRITE: not enough args: filename]");
213 filename = cmdargs.argv[a++];
215 if(a >= cmdargs.argc) {
216 error("error [chunker PORT-WRITE: not enough args: hostname]");
218 hostname = newstralloc(hostname, cmdargs.argv[a++]);
220 if(a >= cmdargs.argc) {
221 error("error [chunker PORT-WRITE: not enough args: features]");
223 am_release_feature_set(their_features);
224 their_features = am_string_to_feature(cmdargs.argv[a++]);
226 if(a >= cmdargs.argc) {
227 error("error [chunker PORT-WRITE: not enough args: diskname]");
229 diskname = newstralloc(diskname, cmdargs.argv[a++]);
231 if(a >= cmdargs.argc) {
232 error("error [chunker PORT-WRITE: not enough args: level]");
234 level = atoi(cmdargs.argv[a++]);
236 if(a >= cmdargs.argc) {
237 error("error [chunker PORT-WRITE: not enough args: dumpdate]");
239 dumpdate = newstralloc(dumpdate, cmdargs.argv[a++]);
241 if(a >= cmdargs.argc) {
242 error("error [chunker PORT-WRITE: not enough args: chunksize]");
244 chunksize = atoi(cmdargs.argv[a++]);
245 chunksize = am_floor(chunksize, DISK_BLOCK_KB);
247 if(a >= cmdargs.argc) {
248 error("error [chunker PORT-WRITE: not enough args: progname]");
250 progname = newstralloc(progname, cmdargs.argv[a++]);
252 if(a >= cmdargs.argc) {
253 error("error [chunker PORT-WRITE: not enough args: use]");
255 use = am_floor(atoi(cmdargs.argv[a++]), DISK_BLOCK_KB);
257 if(a >= cmdargs.argc) {
258 error("error [chunker PORT-WRITE: not enough args: options]");
260 options = newstralloc(options, cmdargs.argv[a++]);
262 if(a != cmdargs.argc) {
263 error("error [chunker PORT-WRITE: too many args: %d != %d]",
267 if((infd = startup_chunker(filename, use, chunksize, &db)) < 0) {
268 q = squotef("[chunker startup failed: %s]", errstr);
269 putresult(TRYAGAIN, "%s %s\n", handle, q);
270 error("startup_chunker failed");
272 command_in_transit = -1;
273 if(infd >= 0 && do_chunk(infd, &db)) {
274 char kb_str[NUM_STR_SIZE];
275 char kps_str[NUM_STR_SIZE];
278 runtime = stopclock();
279 rt = runtime.r.tv_sec+runtime.r.tv_usec/1000000.0;
280 snprintf(kb_str, sizeof(kb_str), "%ld", dumpsize - headersize);
281 snprintf(kps_str, sizeof(kps_str), "%3.1f",
282 rt ? dumpsize / rt : 0.0);
283 errstr = newvstralloc(errstr,
284 "sec ", walltime_str(runtime),
288 q = squotef("[%s]", errstr);
289 if(command_in_transit != -1)
290 cmd = command_in_transit;
292 cmd = getcmd(&cmdargs);
295 putresult(DONE, "%s %ld %s\n",
296 handle, dumpsize - headersize, q);
297 log_add(L_SUCCESS, "%s %s %s %d [%s]",
298 hostname, diskname, datestamp, level, errstr);
304 if(dumpsize > DISK_BLOCK_KB) {
305 putresult(PARTIAL, "%s %ld %s\n",
306 handle, dumpsize - headersize, q);
307 log_add(L_PARTIAL, "%s %s %s %d [%s]",
308 hostname, diskname, datestamp, level, errstr);
311 errstr = newvstralloc(errstr,
316 q = squotef("[%s]",errstr);
317 putresult(FAILED, "%s %s\n", handle, q);
318 log_add(L_FAIL, "%s %s %s %d [%s]",
319 hostname, diskname, datestamp, level, errstr);
324 } else if(infd != -2) {
327 q = squotef("[%s]", errstr);
329 putresult(FAILED, "%s %s\n", handle, q);
330 log_add(L_FAIL, "%s %s %s %d [%s]",
331 hostname, diskname, datestamp, level, errstr);
338 if(cmdargs.argc >= 1) {
339 q = squote(cmdargs.argv[1]);
340 } else if(cmdargs.argc >= 0) {
341 q = squote(cmdargs.argv[0]);
343 q = stralloc("(no input?)");
345 putresult(BAD_COMMAND, "%s\n", q);
350 /* } while(cmd != QUIT); */
362 am_release_feature_set(their_features);
363 their_features = NULL;
365 malloc_size_2 = malloc_inuse(&malloc_hist_2);
367 if (malloc_size_1 != malloc_size_2)
368 malloc_list(fileno(stderr), malloc_hist_1, malloc_hist_2);
374 * Returns a file descriptor to the incoming port
375 * on success, or -1 on error.
378 startup_chunker(filename, use, chunksize, db)
385 char *tmp_filename, *pc;
386 int data_port, data_socket;
389 data_socket = stream_server(&data_port, -1, STREAM_BUFSIZE);
391 if(data_socket < 0) {
392 errstr = stralloc2("error creating stream server: ", strerror(errno));
396 putresult(PORT, "%d\n", data_port);
398 infd = stream_accept(data_socket, CONNECT_TIMEOUT, -1, NETWORK_BLOCK_BYTES);
400 errstr = stralloc2("error accepting stream: ", strerror(errno));
404 tmp_filename = vstralloc(filename, ".tmp", NULL);
405 pc = strrchr(tmp_filename, '/');
407 mkholdingdir(tmp_filename);
409 if ((outfd = open(tmp_filename, O_RDWR|O_CREAT|O_TRUNC, 0600)) < 0) {
410 int save_errno = errno;
412 errstr = squotef("holding file \"%s\": %s",
415 amfree(tmp_filename);
417 if(save_errno == ENOSPC) {
418 putresult(NO_ROOM, "%s %lu", handle, use);
424 amfree(tmp_filename);
425 databuf_init(db, outfd, filename, use, chunksize);
436 char header_buf[DISK_BLOCK_BYTES];
440 dumpsize = headersize = dumpbytes = filesize = 0;
443 * The first thing we should receive is the file header, which we
444 * need to save into "file", as well as write out. Later, the
445 * chunk code will rewrite it.
447 nread = fullread(infd, header_buf, sizeof(header_buf));
448 if (nread != DISK_BLOCK_BYTES) {
449 char number1[NUM_STR_SIZE];
450 char number2[NUM_STR_SIZE];
453 errstr = stralloc2("cannot read header: ", strerror(errno));
455 snprintf(number1, sizeof(number1), "%d", nread);
456 snprintf(number2, sizeof(number2), "%d", DISK_BLOCK_BYTES);
457 errstr = vstralloc("cannot read header: got ",
465 parse_file_header(header_buf, &file, nread);
466 if(write_tapeheader(db->fd, &file)) {
467 int save_errno = errno;
469 errstr = squotef("write_tapeheader file \"%s\": %s",
470 db->filename, strerror(errno));
471 if(save_errno == ENOSPC) {
472 putresult(NO_ROOM, "%s %lu\n", handle,
473 db->use+db->split_size-dumpsize);
477 dumpsize += DISK_BLOCK_KB;
478 filesize = DISK_BLOCK_KB;
479 headersize += DISK_BLOCK_KB;
482 * We've written the file header. Now, just write data until the
485 while ((nread = fullread(infd, db->buf, db->datalimit - db->datain)) > 0) {
487 while(db->dataout < db->datain) {
488 if(!databuf_flush(db)) {
493 while(db->dataout < db->datain) {
494 if(!databuf_flush(db)) {
499 dumpsize++; /* count partial final KByte */
506 * Initialize a databuf. Takes a writeable file descriptor.
509 databuf_init(db, fd, filename, use, chunk_size)
517 db->filename = stralloc(filename);
518 db->filename_seq = 0;
519 db->chunk_size = chunk_size;
520 db->split_size = (db->chunk_size > use) ? use : db->chunk_size;
521 db->use = (use>db->split_size) ? use - db->split_size : 0;
522 db->datain = db->dataout = db->buf;
523 db->datalimit = db->buf + sizeof(db->buf);
528 * Write out the buffer to the backing file
534 struct cmdargs cmdargs;
538 char *arg_filename = NULL;
539 char *new_filename = NULL;
540 char *tmp_filename = NULL;
541 char sequence[NUM_STR_SIZE];
543 filetype_t save_type;
549 * If there's no data, do nothing.
551 if (db->dataout >= db->datain) {
556 * See if we need to split this file.
558 while (db->split_size > 0 && dumpsize >= db->split_size) {
561 * Probably no more space on this disk. Request some more.
565 putresult(RQ_MORE_DISK, "%s\n", handle);
566 cmd = getcmd(&cmdargs);
567 if(command_in_transit == -1 &&
568 (cmd == DONE || cmd == TRYAGAIN || cmd == FAILED)) {
569 command_in_transit = cmd;
570 cmd = getcmd(&cmdargs);
572 if(cmd == CONTINUE) {
580 cmdargs.argc++; /* true count of args */
583 if(a >= cmdargs.argc) {
584 error("error [chunker CONTINUE: not enough args: filename]");
586 arg_filename = newstralloc(arg_filename, cmdargs.argv[a++]);
588 if(a >= cmdargs.argc) {
589 error("error [chunker CONTINUE: not enough args: chunksize]");
591 db->chunk_size = atoi(cmdargs.argv[a++]);
592 db->chunk_size = am_floor(db->chunk_size, DISK_BLOCK_KB);
594 if(a >= cmdargs.argc) {
595 error("error [chunker CONTINUE: not enough args: use]");
597 db->use = atoi(cmdargs.argv[a++]);
599 if(a != cmdargs.argc) {
600 error("error [chunker CONTINUE: too many args: %d != %d]",
604 if(strcmp(db->filename, arg_filename) == 0) {
606 * Same disk, so use what room is left up to the
607 * next chunk boundary or the amount we were given,
610 left_in_chunk = db->chunk_size - filesize;
611 if(left_in_chunk > db->use) {
612 db->split_size += db->use;
615 db->split_size += left_in_chunk;
616 db->use -= left_in_chunk;
618 if(left_in_chunk > 0) {
620 * We still have space in this chunk.
626 * Different disk, so use new file.
628 db->filename = newstralloc(db->filename, arg_filename);
630 } else if(cmd == ABORT) {
632 errstr = newstralloc(errstr, "ERROR");
633 putresult(ABORT_FINISHED, "%s\n", handle);
637 if(cmdargs.argc >= 1) {
638 q = squote(cmdargs.argv[1]);
639 } else if(cmdargs.argc >= 0) {
640 q = squote(cmdargs.argv[0]);
642 q = stralloc("(no input?)");
644 error("error [bad command after RQ-MORE-DISK: \"%s\"]", q);
649 * Time to use another file.
653 * First, open the new chunk file, and give it a new header
654 * that has no cont_filename pointer.
656 snprintf(sequence, sizeof(sequence), "%d", db->filename_seq);
657 new_filename = newvstralloc(new_filename,
662 tmp_filename = newvstralloc(tmp_filename,
666 pc = strrchr(tmp_filename, '/');
668 mkholdingdir(tmp_filename);
670 newfd = open(tmp_filename, O_RDWR|O_CREAT|O_TRUNC, 0600);
672 int save_errno = errno;
674 if(save_errno == ENOSPC) {
675 putresult(NO_ROOM, "%s %lu\n",
677 db->use+db->split_size-dumpsize);
678 db->use = 0; /* force RQ_MORE_DISK */
679 db->split_size = dumpsize;
682 errstr = squotef("creating chunk holding file \"%s\": %s",
689 save_type = file.type;
690 file.type = F_CONT_DUMPFILE;
691 file.cont_filename[0] = '\0';
692 if(write_tapeheader(newfd, &file)) {
693 int save_errno = errno;
696 if(save_errno == ENOSPC) {
697 putresult(NO_ROOM, "%s %lu\n",
699 db->use+db->split_size-dumpsize);
700 db->use = 0; /* force RQ_MORE DISK */
701 db->split_size = dumpsize;
704 errstr = squotef("write_tapeheader file \"%s\": %s",
712 * Now, update the header of the current file to point
713 * to the next chunk, and then close it.
715 if (lseek(db->fd, (off_t)0, SEEK_SET) < 0) {
716 errstr = squotef("lseek holding file \"%s\": %s",
724 file.type = save_type;
725 strncpy(file.cont_filename, new_filename, sizeof(file.cont_filename));
726 file.cont_filename[sizeof(file.cont_filename)] = '\0';
727 if(write_tapeheader(db->fd, &file)) {
728 errstr = squotef("write_tapeheader file \"%s\": %s",
732 unlink(tmp_filename);
736 file.type = F_CONT_DUMPFILE;
739 * Now shift the file descriptor.
746 * Update when we need to chunk again
748 if(db->use <= DISK_BLOCK_KB) {
750 * Cheat and use one more block than allowed so we can make
753 db->split_size += 2 * DISK_BLOCK_KB;
755 } else if(db->chunk_size > db->use) {
756 db->split_size += db->use;
759 db->split_size += db->chunk_size;
760 db->use -= db->chunk_size;
764 amfree(tmp_filename);
765 amfree(new_filename);
766 dumpsize += DISK_BLOCK_KB;
767 filesize = DISK_BLOCK_KB;
768 headersize += DISK_BLOCK_KB;
773 * Write out the buffer
775 written = fullwrite(db->fd, db->dataout, db->datain - db->dataout);
777 db->dataout += written;
778 dumpbytes += written;
780 dumpsize += (dumpbytes / 1024);
781 filesize += (dumpbytes / 1024);
784 if (errno != ENOSPC) {
785 errstr = squotef("data write: %s", strerror(errno));
791 * NO-ROOM is informational only. Later, RQ_MORE_DISK will be
792 * issued to use another holding disk.
794 putresult(NO_ROOM, "%s %lu\n", handle, db->use+db->split_size-dumpsize);
795 db->use = 0; /* force RQ_MORE_DISK */
796 db->split_size = dumpsize;
799 if (db->datain == db->dataout) {
801 * We flushed the whole buffer so reset to use it all.
803 db->datain = db->dataout = db->buf;
808 amfree(new_filename);
809 amfree(tmp_filename);
810 amfree(arg_filename);
816 * Send an Amanda dump header to the output file.
819 write_tapeheader(outfd, file)
823 char buffer[DISK_BLOCK_BYTES];
826 file->blocksize = DISK_BLOCK_BYTES;
827 build_header(buffer, file, sizeof(buffer));
829 written = fullwrite(outfd, buffer, sizeof(buffer));
830 if(written == sizeof(buffer)) return 0;
831 if(written < 0) return written;