2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 1991-1999 University of Maryland at College Park
6 * Permission to use, copy, modify, distribute, and sell this software and its
7 * documentation for any purpose is hereby granted without fee, provided that
8 * the above copyright notice appear in all copies and that both that
9 * copyright notice and this permission notice appear in supporting
10 * documentation, and that the name of U.M. not be used in advertising or
11 * publicity pertaining to distribution of the software without specific,
12 * written prior permission. U.M. makes no representations about the
13 * suitability of this software for any purpose. It is provided "as is"
14 * without express or implied warranty.
16 * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20 * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21 * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
23 * Authors: the Amanda Development Team. Its members are listed in a
24 * file named AUTHORS, in the root directory of this distribution.
26 /* $Id: chunker.c,v 1.36 2006/08/24 11:23:32 martinea Exp $
28 * requests remote amandad processes to dump filesystems
42 #include "fileheader.h"
43 #include "amfeatures.h"
44 #include "server_util.h"
56 #define CONNECT_TIMEOUT 5*60
58 #define STARTUP_TIMEOUT 60
61 int fd; /* file to flush to */
62 char *filename; /* name of what fd points to */
63 int filename_seq; /* for chunking */
64 off_t split_size; /* when to chunk */
65 off_t chunk_size; /* size of each chunk */
66 off_t use; /* size to use on this disk */
67 char buf[DISK_BLOCK_BYTES];
68 char *datain; /* data buffer markers */
73 static char *handle = NULL;
75 static char *errstr = NULL;
76 static int abort_pending;
77 static off_t dumpsize;
78 static unsigned long headersize;
79 static off_t dumpbytes;
80 static off_t filesize;
82 static char *hostname = NULL;
83 static char *diskname = NULL;
84 static char *qdiskname = NULL;
85 static char *options = NULL;
86 static char *progname = NULL;
88 static char *dumpdate = NULL;
89 static int command_in_transit;
90 static char *chunker_timestamp = NULL;
92 static dumpfile_t file;
95 int main(int, char **);
96 static ssize_t write_tapeheader(int, dumpfile_t *);
97 static void databuf_init(struct databuf *, int, char *, off_t, off_t);
98 static int databuf_flush(struct databuf *);
100 static int startup_chunker(char *, off_t, off_t, struct databuf *);
101 static int do_chunk(int, struct databuf *);
109 static struct databuf db;
110 struct cmdargs cmdargs;
113 unsigned long malloc_hist_1, malloc_size_1;
114 unsigned long malloc_hist_2, malloc_size_2;
117 char *filename = NULL;
118 char *qfilename = NULL;
119 off_t chunksize, use;
121 am_feature_t *their_features = NULL;
123 int new_argc, my_argc;
124 char **new_argv, **my_argv;
128 set_pname("chunker");
130 dbopen(DBG_SUBDIR_SERVER);
132 /* Don't die when child closes pipe */
133 signal(SIGPIPE, SIG_IGN);
135 malloc_size_1 = malloc_inuse(&malloc_hist_1);
137 erroutput_type = (ERR_AMANDALOG|ERR_INTERACTIVE);
138 set_logerror(logerror);
140 parse_server_conf(main_argc, main_argv, &new_argc, &new_argv);
145 config_name = stralloc(my_argv[1]);
146 config_dir = vstralloc(CONFIG_DIR, "/", config_name, "/", NULL);
148 char my_cwd[STR_SIZE];
150 if (getcwd(my_cwd, SIZEOF(my_cwd)) == NULL) {
151 error("cannot determine current working directory");
154 config_dir = stralloc2(my_cwd, "/");
155 if ((config_name = strrchr(my_cwd, '/')) != NULL) {
156 config_name = stralloc(config_name + 1);
162 conffile = stralloc2(config_dir, CONFFILE_NAME);
163 if(read_conffile(conffile)) {
164 error("errors processing config file \"%s\"", conffile);
169 dbrename(config_name, DBG_SUBDIR_SERVER);
171 report_bad_conf_arg();
174 "%s: pid %ld executable %s version %s\n",
175 get_pname(), (long) getpid(),
176 my_argv[0], version());
179 /* now, make sure we are a valid user */
181 if (getpwuid(getuid()) == NULL) {
182 error("can't get login name for my uid %ld", (long)getuid());
186 signal(SIGPIPE, SIG_IGN);
187 signal(SIGCHLD, SIG_IGN);
189 cmd = getcmd(&cmdargs);
191 if(cmdargs.argc <= 1)
192 error("error [dumper START: not enough args: timestamp]");
193 chunker_timestamp = newstralloc(chunker_timestamp, cmdargs.argv[2]);
196 error("Didn't get START command");
200 cmd = getcmd(&cmdargs);
221 cmdargs.argc++; /* true count of args */
224 if(a >= cmdargs.argc) {
225 error("error [chunker PORT-WRITE: not enough args: handle]");
228 handle = newstralloc(handle, cmdargs.argv[a++]);
230 if(a >= cmdargs.argc) {
231 error("error [chunker PORT-WRITE: not enough args: filename]");
234 qfilename = newstralloc(qfilename, cmdargs.argv[a++]);
235 if (filename != NULL)
237 filename = unquote_string(qfilename);
240 if(a >= cmdargs.argc) {
241 error("error [chunker PORT-WRITE: not enough args: hostname]");
244 hostname = newstralloc(hostname, cmdargs.argv[a++]);
246 if(a >= cmdargs.argc) {
247 error("error [chunker PORT-WRITE: not enough args: features]");
250 am_release_feature_set(their_features);
251 their_features = am_string_to_feature(cmdargs.argv[a++]);
253 if(a >= cmdargs.argc) {
254 error("error [chunker PORT-WRITE: not enough args: diskname]");
257 qdiskname = newstralloc(qdiskname, cmdargs.argv[a++]);
258 if (diskname != NULL)
260 diskname = unquote_string(qdiskname);
262 if(a >= cmdargs.argc) {
263 error("error [chunker PORT-WRITE: not enough args: level]");
266 level = atoi(cmdargs.argv[a++]);
268 if(a >= cmdargs.argc) {
269 error("error [chunker PORT-WRITE: not enough args: dumpdate]");
272 dumpdate = newstralloc(dumpdate, cmdargs.argv[a++]);
274 if(a >= cmdargs.argc) {
275 error("error [chunker PORT-WRITE: not enough args: chunksize]");
278 chunksize = OFF_T_ATOI(cmdargs.argv[a++]);
279 chunksize = am_floor(chunksize, (off_t)DISK_BLOCK_KB);
281 if(a >= cmdargs.argc) {
282 error("error [chunker PORT-WRITE: not enough args: progname]");
285 progname = newstralloc(progname, cmdargs.argv[a++]);
287 if(a >= cmdargs.argc) {
288 error("error [chunker PORT-WRITE: not enough args: use]");
291 use = am_floor(OFF_T_ATOI(cmdargs.argv[a++]), DISK_BLOCK_KB);
293 if(a >= cmdargs.argc) {
294 error("error [chunker PORT-WRITE: not enough args: options]");
297 options = newstralloc(options, cmdargs.argv[a++]);
299 if(a != cmdargs.argc) {
300 error("error [chunker PORT-WRITE: too many args: %d != %d]",
305 if((infd = startup_chunker(filename, use, chunksize, &db)) < 0) {
306 q = squotef("[chunker startup failed: %s]", errstr);
307 putresult(TRYAGAIN, "%s %s\n", handle, q);
308 error("startup_chunker failed");
310 command_in_transit = -1;
311 if(infd >= 0 && do_chunk(infd, &db)) {
312 char kb_str[NUM_STR_SIZE];
313 char kps_str[NUM_STR_SIZE];
316 runtime = stopclock();
317 rt = (double)(runtime.r.tv_sec) +
318 ((double)(runtime.r.tv_usec) / 1000000.0);
319 snprintf(kb_str, SIZEOF(kb_str), OFF_T_FMT,
320 (OFF_T_FMT_TYPE)(dumpsize - (off_t)headersize));
321 snprintf(kps_str, SIZEOF(kps_str), "%3.1lf",
322 isnormal(rt) ? (double)dumpsize / rt : 0.0);
323 errstr = newvstralloc(errstr,
324 "sec ", walltime_str(runtime),
328 q = squotef("[%s]", errstr);
329 if(command_in_transit != -1)
330 cmd = command_in_transit;
332 cmd = getcmd(&cmdargs);
335 putresult(DONE, "%s " OFF_T_FMT " %s\n", handle,
336 (OFF_T_FMT_TYPE)(dumpsize - (off_t)headersize), q);
337 log_add(L_SUCCESS, "%s %s %s %d [%s]",
338 hostname, qdiskname, chunker_timestamp, level, errstr);
344 if(dumpsize > (off_t)DISK_BLOCK_KB) {
345 putresult(PARTIAL, "%s " OFF_T_FMT " %s\n", handle,
346 (OFF_T_FMT_TYPE)(dumpsize - (off_t)headersize),
348 log_add(L_PARTIAL, "%s %s %s %d [%s]",
349 hostname, qdiskname, chunker_timestamp, level, errstr);
352 errstr = newvstralloc(errstr,
357 q = squotef("[%s]",errstr);
358 putresult(FAILED, "%s %s\n", handle, q);
359 log_add(L_FAIL, "%s %s %s %d [%s]",
360 hostname, qdiskname, chunker_timestamp, level, errstr);
365 } else if(infd != -2) {
368 q = squotef("[%s]", errstr);
370 putresult(FAILED, "%s %s\n", handle, q);
371 log_add(L_FAIL, "%s %s %s %d [%s]",
372 hostname, qdiskname, chunker_timestamp, level, errstr);
381 if(cmdargs.argc >= 1) {
382 q = squote(cmdargs.argv[1]);
383 } else if(cmdargs.argc >= 0) {
384 q = squote(cmdargs.argv[0]);
386 q = stralloc("(no input?)");
388 putresult(BAD_COMMAND, "%s\n", q);
393 /* } while(cmd != QUIT); */
395 free_new_argv(new_argc, new_argv);
396 free_server_config();
398 amfree(chunker_timestamp);
408 am_release_feature_set(their_features);
409 their_features = NULL;
411 malloc_size_2 = malloc_inuse(&malloc_hist_2);
413 if (malloc_size_1 != malloc_size_2)
414 malloc_list(fileno(stderr), malloc_hist_1, malloc_hist_2);
418 return (0); /* exit */
422 * Returns a file descriptor to the incoming port
423 * on success, or -1 on error.
433 char *tmp_filename, *pc;
438 data_socket = stream_server(&data_port, 0, STREAM_BUFSIZE, 0);
440 if(data_socket < 0) {
441 errstr = stralloc2("error creating stream server: ", strerror(errno));
445 putresult(PORT, "%d\n", data_port);
447 infd = stream_accept(data_socket, CONNECT_TIMEOUT, 0, STREAM_BUFSIZE);
449 errstr = stralloc2("error accepting stream: ", strerror(errno));
453 tmp_filename = vstralloc(filename, ".tmp", NULL);
454 pc = strrchr(tmp_filename, '/');
456 mkholdingdir(tmp_filename);
458 if ((outfd = open(tmp_filename, O_RDWR|O_CREAT|O_TRUNC, 0600)) < 0) {
459 int save_errno = errno;
461 errstr = squotef("holding file \"%s\": %s",
464 amfree(tmp_filename);
466 if(save_errno == ENOSPC) {
467 putresult(NO_ROOM, "%s " OFF_T_FMT "\n",
468 handle, (OFF_T_FMT_TYPE)use);
474 amfree(tmp_filename);
475 databuf_init(db, outfd, filename, use, chunksize);
486 char header_buf[DISK_BLOCK_BYTES];
490 dumpsize = dumpbytes = filesize = (off_t)0;
492 memset(header_buf, 0, sizeof(header_buf));
495 * The first thing we should receive is the file header, which we
496 * need to save into "file", as well as write out. Later, the
497 * chunk code will rewrite it.
499 nread = fullread(infd, header_buf, SIZEOF(header_buf));
500 if (nread != DISK_BLOCK_BYTES) {
501 char number1[NUM_STR_SIZE];
502 char number2[NUM_STR_SIZE];
505 errstr = stralloc2("cannot read header: ", strerror(errno));
507 snprintf(number1, SIZEOF(number1), SSIZE_T_FMT,
508 (SSIZE_T_FMT_TYPE)nread);
509 snprintf(number2, SIZEOF(number2), "%d", DISK_BLOCK_BYTES);
510 errstr = vstralloc("cannot read header: got ",
518 parse_file_header(header_buf, &file, (size_t)nread);
519 if(write_tapeheader(db->fd, &file)) {
520 int save_errno = errno;
522 errstr = squotef("write_tapeheader file %s: %s",
523 db->filename, strerror(errno));
524 if(save_errno == ENOSPC) {
525 putresult(NO_ROOM, "%s " OFF_T_FMT "\n", handle,
526 (OFF_T_FMT_TYPE)(db->use+db->split_size-dumpsize));
530 dumpsize += (off_t)DISK_BLOCK_KB;
531 filesize = (off_t)DISK_BLOCK_KB;
532 headersize += DISK_BLOCK_KB;
535 * We've written the file header. Now, just write data until the
538 while ((nread = fullread(infd, db->buf,
539 (size_t)(db->datalimit - db->datain))) > 0) {
541 while(db->dataout < db->datain) {
542 if(!databuf_flush(db)) {
547 while(db->dataout < db->datain) {
548 if(!databuf_flush(db)) {
552 if(dumpbytes > (off_t)0) {
553 dumpsize += (off_t)1; /* count partial final KByte */
554 filesize += (off_t)1;
560 * Initialize a databuf. Takes a writeable file descriptor.
571 db->filename = stralloc(filename);
572 db->filename_seq = (off_t)0;
573 db->chunk_size = chunk_size;
574 db->split_size = (db->chunk_size > use) ? use : db->chunk_size;
575 db->use = (use > db->split_size) ? use - db->split_size : (off_t)0;
576 db->datain = db->dataout = db->buf;
577 db->datalimit = db->buf + SIZEOF(db->buf);
582 * Write out the buffer to the backing file
588 struct cmdargs cmdargs;
592 char *arg_filename = NULL;
593 char *qarg_filename = NULL;
594 char *new_filename = NULL;
595 char *tmp_filename = NULL;
596 char sequence[NUM_STR_SIZE];
598 filetype_t save_type;
604 * If there's no data, do nothing.
606 if (db->dataout >= db->datain) {
611 * See if we need to split this file.
613 while (db->split_size > (off_t)0 && dumpsize >= db->split_size) {
614 if( db->use == (off_t)0 ) {
616 * Probably no more space on this disk. Request some more.
620 putresult(RQ_MORE_DISK, "%s\n", handle);
621 cmd = getcmd(&cmdargs);
622 if(command_in_transit == -1 &&
623 (cmd == DONE || cmd == TRYAGAIN || cmd == FAILED)) {
624 command_in_transit = cmd;
625 cmd = getcmd(&cmdargs);
627 if(cmd == CONTINUE) {
635 cmdargs.argc++; /* true count of args */
638 if(a >= cmdargs.argc) {
639 error("error [chunker CONTINUE: not enough args: filename]");
642 qarg_filename = newstralloc(qarg_filename, cmdargs.argv[a++]);
643 if (arg_filename != NULL)
644 amfree(arg_filename);
645 arg_filename = unquote_string(qarg_filename);
647 if(a >= cmdargs.argc) {
648 error("error [chunker CONTINUE: not enough args: chunksize]");
651 db->chunk_size = OFF_T_ATOI(cmdargs.argv[a++]);
652 db->chunk_size = am_floor(db->chunk_size, (off_t)DISK_BLOCK_KB);
654 if(a >= cmdargs.argc) {
655 error("error [chunker CONTINUE: not enough args: use]");
658 db->use = OFF_T_ATOI(cmdargs.argv[a++]);
660 if(a != cmdargs.argc) {
661 error("error [chunker CONTINUE: too many args: %d != %d]",
666 if(strcmp(db->filename, arg_filename) == 0) {
668 * Same disk, so use what room is left up to the
669 * next chunk boundary or the amount we were given,
672 left_in_chunk = db->chunk_size - filesize;
673 if(left_in_chunk > db->use) {
674 db->split_size += db->use;
677 db->split_size += left_in_chunk;
678 db->use -= left_in_chunk;
680 if(left_in_chunk > (off_t)0) {
682 * We still have space in this chunk.
688 * Different disk, so use new file.
690 db->filename = newstralloc(db->filename, arg_filename);
692 } else if(cmd == ABORT) {
694 errstr = newstralloc(errstr, "ERROR");
695 putresult(ABORT_FINISHED, "%s\n", handle);
699 if(cmdargs.argc >= 1) {
700 q = squote(cmdargs.argv[1]);
701 } else if(cmdargs.argc >= 0) {
702 q = squote(cmdargs.argv[0]);
704 q = stralloc("(no input?)");
706 error("error [bad command after RQ-MORE-DISK: \"%s\"]", q);
712 * Time to use another file.
716 * First, open the new chunk file, and give it a new header
717 * that has no cont_filename pointer.
719 snprintf(sequence, SIZEOF(sequence), "%d", db->filename_seq);
720 new_filename = newvstralloc(new_filename,
725 tmp_filename = newvstralloc(tmp_filename,
729 pc = strrchr(tmp_filename, '/');
731 mkholdingdir(tmp_filename);
733 newfd = open(tmp_filename, O_RDWR|O_CREAT|O_TRUNC, 0600);
735 int save_errno = errno;
737 if(save_errno == ENOSPC) {
738 putresult(NO_ROOM, "%s " OFF_T_FMT "\n", handle,
739 (OFF_T_FMT_TYPE)(db->use+db->split_size-dumpsize));
740 db->use = (off_t)0; /* force RQ_MORE_DISK */
741 db->split_size = dumpsize;
744 errstr = squotef("creating chunk holding file \"%s\": %s",
751 save_type = file.type;
752 file.type = F_CONT_DUMPFILE;
753 file.cont_filename[0] = '\0';
754 if(write_tapeheader(newfd, &file)) {
755 int save_errno = errno;
758 if(save_errno == ENOSPC) {
759 putresult(NO_ROOM, "%s " OFF_T_FMT "\n", handle,
760 (OFF_T_FMT_TYPE)(db->use+db->split_size-dumpsize));
761 db->use = (off_t)0; /* force RQ_MORE DISK */
762 db->split_size = dumpsize;
765 errstr = squotef("write_tapeheader file %s: %s",
773 * Now, update the header of the current file to point
774 * to the next chunk, and then close it.
776 if (lseek(db->fd, (off_t)0, SEEK_SET) < (off_t)0) {
777 errstr = squotef("lseek holding file %s: %s",
785 file.type = save_type;
786 strncpy(file.cont_filename, new_filename, SIZEOF(file.cont_filename));
787 file.cont_filename[SIZEOF(file.cont_filename)] = '\0';
788 if(write_tapeheader(db->fd, &file)) {
789 errstr = squotef("write_tapeheader file \"%s\": %s",
793 unlink(tmp_filename);
797 file.type = F_CONT_DUMPFILE;
800 * Now shift the file descriptor.
807 * Update when we need to chunk again
809 if(db->use <= (off_t)DISK_BLOCK_KB) {
811 * Cheat and use one more block than allowed so we can make
814 db->split_size += (off_t)(2 * DISK_BLOCK_KB);
816 } else if(db->chunk_size > db->use) {
817 db->split_size += db->use;
820 db->split_size += db->chunk_size;
821 db->use -= db->chunk_size;
825 amfree(tmp_filename);
826 amfree(new_filename);
827 dumpsize += (off_t)DISK_BLOCK_KB;
828 filesize = (off_t)DISK_BLOCK_KB;
829 headersize += DISK_BLOCK_KB;
834 * Write out the buffer
836 written = fullwrite(db->fd, db->dataout,
837 (size_t)(db->datain - db->dataout));
839 db->dataout += written;
840 dumpbytes += (off_t)written;
842 dumpsize += (dumpbytes / (off_t)1024);
843 filesize += (dumpbytes / (off_t)1024);
846 if (errno != ENOSPC) {
847 errstr = squotef("data write: %s", strerror(errno));
853 * NO-ROOM is informational only. Later, RQ_MORE_DISK will be
854 * issued to use another holding disk.
856 putresult(NO_ROOM, "%s " OFF_T_FMT "\n", handle,
857 (OFF_T_FMT_TYPE)(db->use+db->split_size-dumpsize));
858 db->use = (off_t)0; /* force RQ_MORE_DISK */
859 db->split_size = dumpsize;
862 if (db->datain == db->dataout) {
864 * We flushed the whole buffer so reset to use it all.
866 db->datain = db->dataout = db->buf;
871 amfree(new_filename);
872 /*@i@*/ amfree(tmp_filename);
873 amfree(arg_filename);
874 amfree(qarg_filename);
880 * Send an Amanda dump header to the output file.
887 char buffer[DISK_BLOCK_BYTES];
890 file->blocksize = DISK_BLOCK_BYTES;
891 build_header(buffer, file, SIZEOF(buffer));
893 written = fullwrite(outfd, buffer, SIZEOF(buffer));
894 if(written == (ssize_t)sizeof(buffer))