2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 1991-1999 University of Maryland at College Park
6 * Permission to use, copy, modify, distribute, and sell this software and its
7 * documentation for any purpose is hereby granted without fee, provided that
8 * the above copyright notice appear in all copies and that both that
9 * copyright notice and this permission notice appear in supporting
10 * documentation, and that the name of U.M. not be used in advertising or
11 * publicity pertaining to distribution of the software without specific,
12 * written prior permission. U.M. makes no representations about the
13 * suitability of this software for any purpose. It is provided "as is"
14 * without express or implied warranty.
16 * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20 * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21 * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
23 * Authors: the Amanda Development Team. Its members are listed in a
24 * file named AUTHORS, in the root directory of this distribution.
26 /* $Id: chunker.c,v 1.36 2006/08/24 11:23:32 martinea Exp $
28 * requests remote amandad processes to dump filesystems
42 #include "fileheader.h"
43 #include "amfeatures.h"
44 #include "server_util.h"
48 #define chunker_debug(i,x) do { \
49 if ((i) <= debug_chunker) { \
62 #define CONNECT_TIMEOUT 5*60
64 #define STARTUP_TIMEOUT 60
67 int fd; /* file to flush to */
68 char *filename; /* name of what fd points to */
69 int filename_seq; /* for chunking */
70 off_t split_size; /* when to chunk */
71 off_t chunk_size; /* size of each chunk */
72 off_t use; /* size to use on this disk */
73 char buf[DISK_BLOCK_BYTES];
74 char *datain; /* data buffer markers */
79 static char *handle = NULL;
81 static char *errstr = NULL;
82 static int abort_pending;
83 static off_t dumpsize;
84 static unsigned long headersize;
85 static off_t dumpbytes;
86 static off_t filesize;
88 static char *hostname = NULL;
89 static char *diskname = NULL;
90 static char *qdiskname = NULL;
91 static char *options = NULL;
92 static char *progname = NULL;
94 static char *dumpdate = NULL;
95 static int command_in_transit;
96 static char *chunker_timestamp = NULL;
98 static dumpfile_t file;
100 /* local functions */
101 int main(int, char **);
102 static ssize_t write_tapeheader(int, dumpfile_t *);
103 static void databuf_init(struct databuf *, int, char *, off_t, off_t);
104 static int databuf_flush(struct databuf *);
106 static int startup_chunker(char *, off_t, off_t, struct databuf *);
107 static int do_chunk(int, struct databuf *);
115 static struct databuf db;
116 struct cmdargs cmdargs;
119 unsigned long malloc_hist_1, malloc_size_1;
120 unsigned long malloc_hist_2, malloc_size_2;
123 char *filename = NULL;
124 char *qfilename = NULL;
125 off_t chunksize, use;
127 am_feature_t *their_features = NULL;
129 int new_argc, my_argc;
130 char **new_argv, **my_argv;
134 set_pname("chunker");
136 dbopen(DBG_SUBDIR_SERVER);
138 /* Don't die when child closes pipe */
139 signal(SIGPIPE, SIG_IGN);
141 malloc_size_1 = malloc_inuse(&malloc_hist_1);
143 erroutput_type = (ERR_AMANDALOG|ERR_INTERACTIVE);
144 set_logerror(logerror);
146 parse_conf(main_argc, main_argv, &new_argc, &new_argv);
151 config_name = stralloc(my_argv[1]);
152 config_dir = vstralloc(CONFIG_DIR, "/", config_name, "/", NULL);
154 char my_cwd[STR_SIZE];
156 if (getcwd(my_cwd, SIZEOF(my_cwd)) == NULL) {
157 error("cannot determine current working directory");
160 config_dir = stralloc2(my_cwd, "/");
161 if ((config_name = strrchr(my_cwd, '/')) != NULL) {
162 config_name = stralloc(config_name + 1);
168 conffile = stralloc2(config_dir, CONFFILE_NAME);
169 if(read_conffile(conffile)) {
170 error("errors processing config file \"%s\"", conffile);
175 dbrename(config_name, DBG_SUBDIR_SERVER);
177 report_bad_conf_arg();
180 "%s: pid %ld executable %s version %s\n",
181 get_pname(), (long) getpid(),
182 my_argv[0], version());
185 /* now, make sure we are a valid user */
187 if (getpwuid(getuid()) == NULL) {
188 error("can't get login name for my uid %ld", (long)getuid());
192 signal(SIGPIPE, SIG_IGN);
193 signal(SIGCHLD, SIG_IGN);
195 cmd = getcmd(&cmdargs);
197 if(cmdargs.argc <= 1)
198 error("error [dumper START: not enough args: timestamp]");
199 chunker_timestamp = newstralloc(chunker_timestamp, cmdargs.argv[2]);
202 error("Didn't get START command");
206 cmd = getcmd(&cmdargs);
227 cmdargs.argc++; /* true count of args */
230 if(a >= cmdargs.argc) {
231 error("error [chunker PORT-WRITE: not enough args: handle]");
234 handle = newstralloc(handle, cmdargs.argv[a++]);
236 if(a >= cmdargs.argc) {
237 error("error [chunker PORT-WRITE: not enough args: filename]");
240 qfilename = newstralloc(qfilename, cmdargs.argv[a++]);
241 if (filename != NULL)
243 filename = unquote_string(qfilename);
246 if(a >= cmdargs.argc) {
247 error("error [chunker PORT-WRITE: not enough args: hostname]");
250 hostname = newstralloc(hostname, cmdargs.argv[a++]);
252 if(a >= cmdargs.argc) {
253 error("error [chunker PORT-WRITE: not enough args: features]");
256 am_release_feature_set(their_features);
257 their_features = am_string_to_feature(cmdargs.argv[a++]);
259 if(a >= cmdargs.argc) {
260 error("error [chunker PORT-WRITE: not enough args: diskname]");
263 qdiskname = newstralloc(qdiskname, cmdargs.argv[a++]);
264 if (diskname != NULL)
266 diskname = unquote_string(qdiskname);
268 if(a >= cmdargs.argc) {
269 error("error [chunker PORT-WRITE: not enough args: level]");
272 level = atoi(cmdargs.argv[a++]);
274 if(a >= cmdargs.argc) {
275 error("error [chunker PORT-WRITE: not enough args: dumpdate]");
278 dumpdate = newstralloc(dumpdate, cmdargs.argv[a++]);
280 if(a >= cmdargs.argc) {
281 error("error [chunker PORT-WRITE: not enough args: chunksize]");
284 chunksize = OFF_T_ATOI(cmdargs.argv[a++]);
285 chunksize = am_floor(chunksize, (off_t)DISK_BLOCK_KB);
287 if(a >= cmdargs.argc) {
288 error("error [chunker PORT-WRITE: not enough args: progname]");
291 progname = newstralloc(progname, cmdargs.argv[a++]);
293 if(a >= cmdargs.argc) {
294 error("error [chunker PORT-WRITE: not enough args: use]");
297 use = am_floor(OFF_T_ATOI(cmdargs.argv[a++]), DISK_BLOCK_KB);
299 if(a >= cmdargs.argc) {
300 error("error [chunker PORT-WRITE: not enough args: options]");
303 options = newstralloc(options, cmdargs.argv[a++]);
305 if(a != cmdargs.argc) {
306 error("error [chunker PORT-WRITE: too many args: %d != %d]",
311 if((infd = startup_chunker(filename, use, chunksize, &db)) < 0) {
312 q = squotef("[chunker startup failed: %s]", errstr);
313 putresult(TRYAGAIN, "%s %s\n", handle, q);
314 error("startup_chunker failed");
316 command_in_transit = -1;
317 if(infd >= 0 && do_chunk(infd, &db)) {
318 char kb_str[NUM_STR_SIZE];
319 char kps_str[NUM_STR_SIZE];
322 runtime = stopclock();
323 rt = (double)(runtime.r.tv_sec) +
324 ((double)(runtime.r.tv_usec) / 1000000.0);
325 snprintf(kb_str, SIZEOF(kb_str), OFF_T_FMT,
326 (OFF_T_FMT_TYPE)(dumpsize - (off_t)headersize));
327 snprintf(kps_str, SIZEOF(kps_str), "%3.1lf",
328 isnormal(rt) ? (double)dumpsize / rt : 0.0);
329 errstr = newvstralloc(errstr,
330 "sec ", walltime_str(runtime),
334 q = squotef("[%s]", errstr);
335 if(command_in_transit != -1)
336 cmd = command_in_transit;
338 cmd = getcmd(&cmdargs);
341 putresult(DONE, "%s " OFF_T_FMT " %s\n", handle,
342 (OFF_T_FMT_TYPE)(dumpsize - (off_t)headersize), q);
343 log_add(L_SUCCESS, "%s %s %s %d [%s]",
344 hostname, qdiskname, chunker_timestamp, level, errstr);
350 if(dumpsize > (off_t)DISK_BLOCK_KB) {
351 putresult(PARTIAL, "%s " OFF_T_FMT " %s\n", handle,
352 (OFF_T_FMT_TYPE)(dumpsize - (off_t)headersize),
354 log_add(L_PARTIAL, "%s %s %s %d [%s]",
355 hostname, qdiskname, chunker_timestamp, level, errstr);
358 errstr = newvstralloc(errstr,
363 q = squotef("[%s]",errstr);
364 putresult(FAILED, "%s %s\n", handle, q);
365 log_add(L_FAIL, "%s %s %s %d [%s]",
366 hostname, qdiskname, chunker_timestamp, level, errstr);
371 } else if(infd != -2) {
374 q = squotef("[%s]", errstr);
376 putresult(FAILED, "%s %s\n", handle, q);
377 log_add(L_FAIL, "%s %s %s %d [%s]",
378 hostname, qdiskname, chunker_timestamp, level, errstr);
387 if(cmdargs.argc >= 1) {
388 q = squote(cmdargs.argv[1]);
389 } else if(cmdargs.argc >= 0) {
390 q = squote(cmdargs.argv[0]);
392 q = stralloc("(no input?)");
394 putresult(BAD_COMMAND, "%s\n", q);
399 /* } while(cmd != QUIT); */
401 free_new_argv(new_argc, new_argv);
402 free_server_config();
404 amfree(chunker_timestamp);
414 am_release_feature_set(their_features);
415 their_features = NULL;
417 malloc_size_2 = malloc_inuse(&malloc_hist_2);
419 if (malloc_size_1 != malloc_size_2)
420 malloc_list(fileno(stderr), malloc_hist_1, malloc_hist_2);
424 return (0); /* exit */
428 * Returns a file descriptor to the incoming port
429 * on success, or -1 on error.
439 char *tmp_filename, *pc;
444 data_socket = stream_server(&data_port, 0, STREAM_BUFSIZE, 0);
446 if(data_socket < 0) {
447 errstr = stralloc2("error creating stream server: ", strerror(errno));
451 putresult(PORT, "%d\n", data_port);
453 infd = stream_accept(data_socket, CONNECT_TIMEOUT, 0, STREAM_BUFSIZE);
455 errstr = stralloc2("error accepting stream: ", strerror(errno));
459 tmp_filename = vstralloc(filename, ".tmp", NULL);
460 pc = strrchr(tmp_filename, '/');
462 mkholdingdir(tmp_filename);
464 if ((outfd = open(tmp_filename, O_RDWR|O_CREAT|O_TRUNC, 0600)) < 0) {
465 int save_errno = errno;
467 errstr = squotef("holding file \"%s\": %s",
470 amfree(tmp_filename);
472 if(save_errno == ENOSPC) {
473 putresult(NO_ROOM, "%s " OFF_T_FMT "\n",
474 handle, (OFF_T_FMT_TYPE)use);
480 amfree(tmp_filename);
481 databuf_init(db, outfd, filename, use, chunksize);
492 char header_buf[DISK_BLOCK_BYTES];
496 dumpsize = dumpbytes = filesize = (off_t)0;
498 memset(header_buf, 0, sizeof(header_buf));
501 * The first thing we should receive is the file header, which we
502 * need to save into "file", as well as write out. Later, the
503 * chunk code will rewrite it.
505 nread = fullread(infd, header_buf, SIZEOF(header_buf));
506 if (nread != DISK_BLOCK_BYTES) {
507 char number1[NUM_STR_SIZE];
508 char number2[NUM_STR_SIZE];
511 errstr = stralloc2("cannot read header: ", strerror(errno));
513 snprintf(number1, SIZEOF(number1), SSIZE_T_FMT,
514 (SSIZE_T_FMT_TYPE)nread);
515 snprintf(number2, SIZEOF(number2), "%d", DISK_BLOCK_BYTES);
516 errstr = vstralloc("cannot read header: got ",
524 parse_file_header(header_buf, &file, (size_t)nread);
525 if(write_tapeheader(db->fd, &file)) {
526 int save_errno = errno;
528 errstr = squotef("write_tapeheader file %s: %s",
529 db->filename, strerror(errno));
530 if(save_errno == ENOSPC) {
531 putresult(NO_ROOM, "%s " OFF_T_FMT "\n", handle,
532 (OFF_T_FMT_TYPE)(db->use+db->split_size-dumpsize));
536 dumpsize += (off_t)DISK_BLOCK_KB;
537 filesize = (off_t)DISK_BLOCK_KB;
538 headersize += DISK_BLOCK_KB;
541 * We've written the file header. Now, just write data until the
544 while ((nread = fullread(infd, db->buf,
545 (size_t)(db->datalimit - db->datain))) > 0) {
547 while(db->dataout < db->datain) {
548 if(!databuf_flush(db)) {
553 while(db->dataout < db->datain) {
554 if(!databuf_flush(db)) {
558 if(dumpbytes > (off_t)0) {
559 dumpsize += (off_t)1; /* count partial final KByte */
560 filesize += (off_t)1;
566 * Initialize a databuf. Takes a writeable file descriptor.
577 db->filename = stralloc(filename);
578 db->filename_seq = (off_t)0;
579 db->chunk_size = chunk_size;
580 db->split_size = (db->chunk_size > use) ? use : db->chunk_size;
581 db->use = (use > db->split_size) ? use - db->split_size : (off_t)0;
582 db->datain = db->dataout = db->buf;
583 db->datalimit = db->buf + SIZEOF(db->buf);
588 * Write out the buffer to the backing file
594 struct cmdargs cmdargs;
598 char *arg_filename = NULL;
599 char *qarg_filename = NULL;
600 char *new_filename = NULL;
601 char *tmp_filename = NULL;
602 char sequence[NUM_STR_SIZE];
604 filetype_t save_type;
610 * If there's no data, do nothing.
612 if (db->dataout >= db->datain) {
617 * See if we need to split this file.
619 while (db->split_size > (off_t)0 && dumpsize >= db->split_size) {
620 if( db->use == (off_t)0 ) {
622 * Probably no more space on this disk. Request some more.
626 putresult(RQ_MORE_DISK, "%s\n", handle);
627 cmd = getcmd(&cmdargs);
628 if(command_in_transit == -1 &&
629 (cmd == DONE || cmd == TRYAGAIN || cmd == FAILED)) {
630 command_in_transit = cmd;
631 cmd = getcmd(&cmdargs);
633 if(cmd == CONTINUE) {
641 cmdargs.argc++; /* true count of args */
644 if(a >= cmdargs.argc) {
645 error("error [chunker CONTINUE: not enough args: filename]");
648 qarg_filename = newstralloc(qarg_filename, cmdargs.argv[a++]);
649 if (arg_filename != NULL)
650 amfree(arg_filename);
651 arg_filename = unquote_string(qarg_filename);
653 if(a >= cmdargs.argc) {
654 error("error [chunker CONTINUE: not enough args: chunksize]");
657 db->chunk_size = OFF_T_ATOI(cmdargs.argv[a++]);
658 db->chunk_size = am_floor(db->chunk_size, (off_t)DISK_BLOCK_KB);
660 if(a >= cmdargs.argc) {
661 error("error [chunker CONTINUE: not enough args: use]");
664 db->use = OFF_T_ATOI(cmdargs.argv[a++]);
666 if(a != cmdargs.argc) {
667 error("error [chunker CONTINUE: too many args: %d != %d]",
672 if(strcmp(db->filename, arg_filename) == 0) {
674 * Same disk, so use what room is left up to the
675 * next chunk boundary or the amount we were given,
678 left_in_chunk = db->chunk_size - filesize;
679 if(left_in_chunk > db->use) {
680 db->split_size += db->use;
683 db->split_size += left_in_chunk;
684 db->use -= left_in_chunk;
686 if(left_in_chunk > (off_t)0) {
688 * We still have space in this chunk.
694 * Different disk, so use new file.
696 db->filename = newstralloc(db->filename, arg_filename);
698 } else if(cmd == ABORT) {
700 errstr = newstralloc(errstr, "ERROR");
701 putresult(ABORT_FINISHED, "%s\n", handle);
705 if(cmdargs.argc >= 1) {
706 q = squote(cmdargs.argv[1]);
707 } else if(cmdargs.argc >= 0) {
708 q = squote(cmdargs.argv[0]);
710 q = stralloc("(no input?)");
712 error("error [bad command after RQ-MORE-DISK: \"%s\"]", q);
718 * Time to use another file.
722 * First, open the new chunk file, and give it a new header
723 * that has no cont_filename pointer.
725 snprintf(sequence, SIZEOF(sequence), "%d", db->filename_seq);
726 new_filename = newvstralloc(new_filename,
731 tmp_filename = newvstralloc(tmp_filename,
735 pc = strrchr(tmp_filename, '/');
737 mkholdingdir(tmp_filename);
739 newfd = open(tmp_filename, O_RDWR|O_CREAT|O_TRUNC, 0600);
741 int save_errno = errno;
743 if(save_errno == ENOSPC) {
744 putresult(NO_ROOM, "%s " OFF_T_FMT "\n", handle,
745 (OFF_T_FMT_TYPE)(db->use+db->split_size-dumpsize));
746 db->use = (off_t)0; /* force RQ_MORE_DISK */
747 db->split_size = dumpsize;
750 errstr = squotef("creating chunk holding file \"%s\": %s",
757 save_type = file.type;
758 file.type = F_CONT_DUMPFILE;
759 file.cont_filename[0] = '\0';
760 if(write_tapeheader(newfd, &file)) {
761 int save_errno = errno;
764 if(save_errno == ENOSPC) {
765 putresult(NO_ROOM, "%s " OFF_T_FMT "\n", handle,
766 (OFF_T_FMT_TYPE)(db->use+db->split_size-dumpsize));
767 db->use = (off_t)0; /* force RQ_MORE DISK */
768 db->split_size = dumpsize;
771 errstr = squotef("write_tapeheader file %s: %s",
779 * Now, update the header of the current file to point
780 * to the next chunk, and then close it.
782 if (lseek(db->fd, (off_t)0, SEEK_SET) < (off_t)0) {
783 errstr = squotef("lseek holding file %s: %s",
791 file.type = save_type;
792 strncpy(file.cont_filename, new_filename, SIZEOF(file.cont_filename));
793 file.cont_filename[SIZEOF(file.cont_filename)] = '\0';
794 if(write_tapeheader(db->fd, &file)) {
795 errstr = squotef("write_tapeheader file \"%s\": %s",
799 unlink(tmp_filename);
803 file.type = F_CONT_DUMPFILE;
806 * Now shift the file descriptor.
813 * Update when we need to chunk again
815 if(db->use <= (off_t)DISK_BLOCK_KB) {
817 * Cheat and use one more block than allowed so we can make
820 db->split_size += (off_t)(2 * DISK_BLOCK_KB);
822 } else if(db->chunk_size > db->use) {
823 db->split_size += db->use;
826 db->split_size += db->chunk_size;
827 db->use -= db->chunk_size;
831 amfree(tmp_filename);
832 amfree(new_filename);
833 dumpsize += (off_t)DISK_BLOCK_KB;
834 filesize = (off_t)DISK_BLOCK_KB;
835 headersize += DISK_BLOCK_KB;
840 * Write out the buffer
842 written = fullwrite(db->fd, db->dataout,
843 (size_t)(db->datain - db->dataout));
845 db->dataout += written;
846 dumpbytes += (off_t)written;
848 dumpsize += (dumpbytes / (off_t)1024);
849 filesize += (dumpbytes / (off_t)1024);
852 if (errno != ENOSPC) {
853 errstr = squotef("data write: %s", strerror(errno));
859 * NO-ROOM is informational only. Later, RQ_MORE_DISK will be
860 * issued to use another holding disk.
862 putresult(NO_ROOM, "%s " OFF_T_FMT "\n", handle,
863 (OFF_T_FMT_TYPE)(db->use+db->split_size-dumpsize));
864 db->use = (off_t)0; /* force RQ_MORE_DISK */
865 db->split_size = dumpsize;
868 if (db->datain == db->dataout) {
870 * We flushed the whole buffer so reset to use it all.
872 db->datain = db->dataout = db->buf;
877 amfree(new_filename);
878 /*@i@*/ amfree(tmp_filename);
879 amfree(arg_filename);
880 amfree(qarg_filename);
886 * Send an Amanda dump header to the output file and set file->blocksize
893 char buffer[DISK_BLOCK_BYTES];
896 file->blocksize = DISK_BLOCK_BYTES;
897 build_header(buffer, file, SIZEOF(buffer));
899 written = fullwrite(outfd, buffer, SIZEOF(buffer));
900 if(written == (ssize_t)sizeof(buffer))