2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 1991-1999 University of Maryland at College Park
6 * Permission to use, copy, modify, distribute, and sell this software and its
7 * documentation for any purpose is hereby granted without fee, provided that
8 * the above copyright notice appear in all copies and that both that
9 * copyright notice and this permission notice appear in supporting
10 * documentation, and that the name of U.M. not be used in advertising or
11 * publicity pertaining to distribution of the software without specific,
12 * written prior permission. U.M. makes no representations about the
13 * suitability of this software for any purpose. It is provided "as is"
14 * without express or implied warranty.
16 * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20 * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21 * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
23 * Authors: the Amanda Development Team. Its members are listed in a
24 * file named AUTHORS, in the root directory of this distribution.
26 /* $Id: chunker.c,v 1.25 2006/03/21 13:23:35 martinea Exp $
28 * requests remote amandad processes to dump filesystems
42 #include "fileheader.h"
43 #include "amfeatures.h"
44 #include "server_util.h"
56 #define CONNECT_TIMEOUT 5*60
58 #define STARTUP_TIMEOUT 60
61 int fd; /* file to flush to */
62 char *filename; /* name of what fd points to */
63 int filename_seq; /* for chunking */
64 long split_size; /* when to chunk */
65 long chunk_size; /* size of each chunk */
66 long use; /* size to use on this disk */
67 char buf[DISK_BLOCK_BYTES];
68 char *datain; /* data buffer markers */
73 static char *handle = NULL;
75 static char *errstr = NULL;
76 static int abort_pending;
77 static long dumpsize, headersize;
78 static long dumpbytes;
81 static char *hostname = NULL;
82 static char *diskname = NULL;
83 static char *options = NULL;
84 static char *progname = NULL;
86 static char *dumpdate = NULL;
87 static char *datestamp;
88 static int command_in_transit;
90 static dumpfile_t file;
93 int main P((int, char **));
94 static int write_tapeheader P((int, dumpfile_t *));
95 static void databuf_init P((struct databuf *, int, char *, long, long));
96 static int databuf_flush P((struct databuf *));
98 static int startup_chunker P((char *, long, long, struct databuf *));
99 static int do_chunk P((int, struct databuf *));
103 main(main_argc, main_argv)
107 static struct databuf db;
108 struct cmdargs cmdargs;
111 unsigned long malloc_hist_1, malloc_size_1;
112 unsigned long malloc_hist_2, malloc_size_2;
118 am_feature_t *their_features = NULL;
123 set_pname("chunker");
125 /* Don't die when child closes pipe */
126 signal(SIGPIPE, SIG_IGN);
128 malloc_size_1 = malloc_inuse(&malloc_hist_1);
130 erroutput_type = (ERR_AMANDALOG|ERR_INTERACTIVE);
131 set_logerror(logerror);
134 config_name = stralloc(main_argv[1]);
135 config_dir = vstralloc(CONFIG_DIR, "/", config_name, "/", NULL);
137 char my_cwd[STR_SIZE];
139 if (getcwd(my_cwd, sizeof(my_cwd)) == NULL) {
140 error("cannot determine current working directory");
142 config_dir = stralloc2(my_cwd, "/");
143 if ((config_name = strrchr(my_cwd, '/')) != NULL) {
144 config_name = stralloc(config_name + 1);
150 conffile = stralloc2(config_dir, CONFFILE_NAME);
151 if(read_conffile(conffile)) {
152 error("errors processing config file \"%s\"", conffile);
157 "%s: pid %ld executable %s version %s\n",
158 get_pname(), (long) getpid(),
159 main_argv[0], version());
162 /* now, make sure we are a valid user */
164 if (getpwuid(getuid()) == NULL)
165 error("can't get login name for my uid %ld", (long)getuid());
167 signal(SIGPIPE, SIG_IGN);
168 signal(SIGCHLD, SIG_IGN);
170 datestamp = construct_datestamp(NULL);
173 cmd = getcmd(&cmdargs);
194 cmdargs.argc++; /* true count of args */
197 if(a >= cmdargs.argc) {
198 error("error [chunker PORT-WRITE: not enough args: handle]");
200 handle = newstralloc(handle, cmdargs.argv[a++]);
202 if(a >= cmdargs.argc) {
203 error("error [chunker PORT-WRITE: not enough args: filename]");
205 filename = cmdargs.argv[a++];
207 if(a >= cmdargs.argc) {
208 error("error [chunker PORT-WRITE: not enough args: hostname]");
210 hostname = newstralloc(hostname, cmdargs.argv[a++]);
212 if(a >= cmdargs.argc) {
213 error("error [chunker PORT-WRITE: not enough args: features]");
215 am_release_feature_set(their_features);
216 their_features = am_string_to_feature(cmdargs.argv[a++]);
218 if(a >= cmdargs.argc) {
219 error("error [chunker PORT-WRITE: not enough args: diskname]");
221 diskname = newstralloc(diskname, cmdargs.argv[a++]);
223 if(a >= cmdargs.argc) {
224 error("error [chunker PORT-WRITE: not enough args: level]");
226 level = atoi(cmdargs.argv[a++]);
228 if(a >= cmdargs.argc) {
229 error("error [chunker PORT-WRITE: not enough args: dumpdate]");
231 dumpdate = newstralloc(dumpdate, cmdargs.argv[a++]);
233 if(a >= cmdargs.argc) {
234 error("error [chunker PORT-WRITE: not enough args: chunksize]");
236 chunksize = atoi(cmdargs.argv[a++]);
237 chunksize = am_floor(chunksize, DISK_BLOCK_KB);
239 if(a >= cmdargs.argc) {
240 error("error [chunker PORT-WRITE: not enough args: progname]");
242 progname = newstralloc(progname, cmdargs.argv[a++]);
244 if(a >= cmdargs.argc) {
245 error("error [chunker PORT-WRITE: not enough args: use]");
247 use = am_floor(atoi(cmdargs.argv[a++]), DISK_BLOCK_KB);
249 if(a >= cmdargs.argc) {
250 error("error [chunker PORT-WRITE: not enough args: options]");
252 options = newstralloc(options, cmdargs.argv[a++]);
254 if(a != cmdargs.argc) {
255 error("error [chunker PORT-WRITE: too many args: %d != %d]",
259 if((infd = startup_chunker(filename, use, chunksize, &db)) < 0) {
260 q = squotef("[chunker startup failed: %s]", errstr);
261 putresult(TRYAGAIN, "%s %s\n", handle, q);
262 error("startup_chunker failed");
264 command_in_transit = -1;
265 if(infd >= 0 && do_chunk(infd, &db)) {
266 char kb_str[NUM_STR_SIZE];
267 char kps_str[NUM_STR_SIZE];
270 runtime = stopclock();
271 rt = runtime.r.tv_sec+runtime.r.tv_usec/1000000.0;
272 snprintf(kb_str, sizeof(kb_str), "%ld", dumpsize - headersize);
273 snprintf(kps_str, sizeof(kps_str), "%3.1f",
274 rt ? dumpsize / rt : 0.0);
275 errstr = newvstralloc(errstr,
276 "sec ", walltime_str(runtime),
280 q = squotef("[%s]", errstr);
281 if(command_in_transit != -1)
282 cmd = command_in_transit;
284 cmd = getcmd(&cmdargs);
287 putresult(DONE, "%s %ld %s\n",
288 handle, dumpsize - headersize, q);
289 log_add(L_SUCCESS, "%s %s %s %d [%s]",
290 hostname, diskname, datestamp, level, errstr);
296 if(dumpsize > DISK_BLOCK_KB) {
297 putresult(PARTIAL, "%s %ld %s\n",
298 handle, dumpsize - headersize, q);
299 log_add(L_PARTIAL, "%s %s %s %d [%s]",
300 hostname, diskname, datestamp, level, errstr);
303 errstr = newvstralloc(errstr,
308 q = squotef("[%s]",errstr);
309 putresult(FAILED, "%s %s\n", handle, q);
310 log_add(L_FAIL, "%s %s %s %d [%s]",
311 hostname, diskname, datestamp, level, errstr);
316 } else if(infd != -2) {
319 q = squotef("[%s]", errstr);
321 putresult(FAILED, "%s %s\n", handle, q);
322 log_add(L_FAIL, "%s %s %s %d [%s]",
323 hostname, diskname, datestamp, level, errstr);
330 if(cmdargs.argc >= 1) {
331 q = squote(cmdargs.argv[1]);
332 } else if(cmdargs.argc >= 0) {
333 q = squote(cmdargs.argv[0]);
335 q = stralloc("(no input?)");
337 putresult(BAD_COMMAND, "%s\n", q);
342 /* } while(cmd != QUIT); */
354 am_release_feature_set(their_features);
355 their_features = NULL;
357 malloc_size_2 = malloc_inuse(&malloc_hist_2);
359 if (malloc_size_1 != malloc_size_2)
360 malloc_list(fileno(stderr), malloc_hist_1, malloc_hist_2);
366 * Returns a file descriptor to the incoming port
367 * on success, or -1 on error.
370 startup_chunker(filename, use, chunksize, db)
377 char *tmp_filename, *pc;
378 int data_port, data_socket;
381 data_socket = stream_server(&data_port, -1, STREAM_BUFSIZE);
383 if(data_socket < 0) {
384 errstr = stralloc2("error creating stream server: ", strerror(errno));
388 putresult(PORT, "%d\n", data_port);
390 infd = stream_accept(data_socket, CONNECT_TIMEOUT, -1, NETWORK_BLOCK_BYTES);
392 errstr = stralloc2("error accepting stream: ", strerror(errno));
396 tmp_filename = vstralloc(filename, ".tmp", NULL);
397 pc = strrchr(tmp_filename, '/');
399 mkholdingdir(tmp_filename);
401 if ((outfd = open(tmp_filename, O_RDWR|O_CREAT|O_TRUNC, 0600)) < 0) {
402 int save_errno = errno;
404 errstr = squotef("holding file \"%s\": %s",
407 amfree(tmp_filename);
409 if(save_errno == ENOSPC) {
410 putresult(NO_ROOM, "%s %lu", handle, use);
416 amfree(tmp_filename);
417 databuf_init(db, outfd, filename, use, chunksize);
428 char header_buf[DISK_BLOCK_BYTES];
432 dumpsize = headersize = dumpbytes = filesize = 0;
435 * The first thing we should receive is the file header, which we
436 * need to save into "file", as well as write out. Later, the
437 * chunk code will rewrite it.
439 nread = fullread(infd, header_buf, sizeof(header_buf));
440 if (nread != DISK_BLOCK_BYTES) {
441 char number1[NUM_STR_SIZE];
442 char number2[NUM_STR_SIZE];
445 errstr = stralloc2("cannot read header: ", strerror(errno));
447 snprintf(number1, sizeof(number1), "%d", nread);
448 snprintf(number2, sizeof(number2), "%d", DISK_BLOCK_BYTES);
449 errstr = vstralloc("cannot read header: got ",
457 parse_file_header(header_buf, &file, nread);
458 if(write_tapeheader(db->fd, &file)) {
459 int save_errno = errno;
461 errstr = squotef("write_tapeheader file \"%s\": %s",
462 db->filename, strerror(errno));
463 if(save_errno == ENOSPC) {
464 putresult(NO_ROOM, "%s %lu\n", handle,
465 db->use+db->split_size-dumpsize);
469 dumpsize += DISK_BLOCK_KB;
470 filesize = DISK_BLOCK_KB;
471 headersize += DISK_BLOCK_KB;
474 * We've written the file header. Now, just write data until the
477 while ((nread = fullread(infd, db->buf, db->datalimit - db->datain)) > 0) {
479 while(db->dataout < db->datain) {
480 if(!databuf_flush(db)) {
485 while(db->dataout < db->datain) {
486 if(!databuf_flush(db)) {
491 dumpsize++; /* count partial final KByte */
498 * Initialize a databuf. Takes a writeable file descriptor.
501 databuf_init(db, fd, filename, use, chunk_size)
509 db->filename = stralloc(filename);
510 db->filename_seq = 0;
511 db->chunk_size = chunk_size;
512 db->split_size = (db->chunk_size > use) ? use : db->chunk_size;
513 db->use = (use>db->split_size) ? use - db->split_size : 0;
514 db->datain = db->dataout = db->buf;
515 db->datalimit = db->buf + sizeof(db->buf);
520 * Write out the buffer to the backing file
526 struct cmdargs cmdargs;
530 char *arg_filename = NULL;
531 char *new_filename = NULL;
532 char *tmp_filename = NULL;
533 char sequence[NUM_STR_SIZE];
535 filetype_t save_type;
541 * If there's no data, do nothing.
543 if (db->dataout >= db->datain) {
548 * See if we need to split this file.
550 while (db->split_size > 0 && dumpsize >= db->split_size) {
553 * Probably no more space on this disk. Request some more.
557 putresult(RQ_MORE_DISK, "%s\n", handle);
558 cmd = getcmd(&cmdargs);
559 if(command_in_transit == -1 &&
560 (cmd == DONE || cmd == TRYAGAIN || cmd == FAILED)) {
561 command_in_transit = cmd;
562 cmd = getcmd(&cmdargs);
564 if(cmd == CONTINUE) {
572 cmdargs.argc++; /* true count of args */
575 if(a >= cmdargs.argc) {
576 error("error [chunker CONTINUE: not enough args: filename]");
578 arg_filename = newstralloc(arg_filename, cmdargs.argv[a++]);
580 if(a >= cmdargs.argc) {
581 error("error [chunker CONTINUE: not enough args: chunksize]");
583 db->chunk_size = atoi(cmdargs.argv[a++]);
584 db->chunk_size = am_floor(db->chunk_size, DISK_BLOCK_KB);
586 if(a >= cmdargs.argc) {
587 error("error [chunker CONTINUE: not enough args: use]");
589 db->use = atoi(cmdargs.argv[a++]);
591 if(a != cmdargs.argc) {
592 error("error [chunker CONTINUE: too many args: %d != %d]",
596 if(strcmp(db->filename, arg_filename) == 0) {
598 * Same disk, so use what room is left up to the
599 * next chunk boundary or the amount we were given,
602 left_in_chunk = db->chunk_size - filesize;
603 if(left_in_chunk > db->use) {
604 db->split_size += db->use;
607 db->split_size += left_in_chunk;
608 db->use -= left_in_chunk;
610 if(left_in_chunk > 0) {
612 * We still have space in this chunk.
618 * Different disk, so use new file.
620 db->filename = newstralloc(db->filename, arg_filename);
622 } else if(cmd == ABORT) {
624 errstr = newstralloc(errstr, "ERROR");
625 putresult(ABORT_FINISHED, "%s\n", handle);
629 if(cmdargs.argc >= 1) {
630 q = squote(cmdargs.argv[1]);
631 } else if(cmdargs.argc >= 0) {
632 q = squote(cmdargs.argv[0]);
634 q = stralloc("(no input?)");
636 error("error [bad command after RQ-MORE-DISK: \"%s\"]", q);
641 * Time to use another file.
645 * First, open the new chunk file, and give it a new header
646 * that has no cont_filename pointer.
648 snprintf(sequence, sizeof(sequence), "%d", db->filename_seq);
649 new_filename = newvstralloc(new_filename,
654 tmp_filename = newvstralloc(tmp_filename,
658 pc = strrchr(tmp_filename, '/');
660 mkholdingdir(tmp_filename);
662 newfd = open(tmp_filename, O_RDWR|O_CREAT|O_TRUNC, 0600);
664 int save_errno = errno;
666 if(save_errno == ENOSPC) {
667 putresult(NO_ROOM, "%s %lu\n",
669 db->use+db->split_size-dumpsize);
670 db->use = 0; /* force RQ_MORE_DISK */
671 db->split_size = dumpsize;
674 errstr = squotef("creating chunk holding file \"%s\": %s",
681 save_type = file.type;
682 file.type = F_CONT_DUMPFILE;
683 file.cont_filename[0] = '\0';
684 if(write_tapeheader(newfd, &file)) {
685 int save_errno = errno;
688 if(save_errno == ENOSPC) {
689 putresult(NO_ROOM, "%s %lu\n",
691 db->use+db->split_size-dumpsize);
692 db->use = 0; /* force RQ_MORE DISK */
693 db->split_size = dumpsize;
696 errstr = squotef("write_tapeheader file \"%s\": %s",
704 * Now, update the header of the current file to point
705 * to the next chunk, and then close it.
707 if (lseek(db->fd, (off_t)0, SEEK_SET) < 0) {
708 errstr = squotef("lseek holding file \"%s\": %s",
716 file.type = save_type;
717 strncpy(file.cont_filename, new_filename, sizeof(file.cont_filename));
718 file.cont_filename[sizeof(file.cont_filename)] = '\0';
719 if(write_tapeheader(db->fd, &file)) {
720 errstr = squotef("write_tapeheader file \"%s\": %s",
724 unlink(tmp_filename);
728 file.type = F_CONT_DUMPFILE;
731 * Now shift the file descriptor.
738 * Update when we need to chunk again
740 if(db->use <= DISK_BLOCK_KB) {
742 * Cheat and use one more block than allowed so we can make
745 db->split_size += 2 * DISK_BLOCK_KB;
747 } else if(db->chunk_size > db->use) {
748 db->split_size += db->use;
751 db->split_size += db->chunk_size;
752 db->use -= db->chunk_size;
756 amfree(tmp_filename);
757 amfree(new_filename);
758 dumpsize += DISK_BLOCK_KB;
759 filesize = DISK_BLOCK_KB;
760 headersize += DISK_BLOCK_KB;
765 * Write out the buffer
767 written = fullwrite(db->fd, db->dataout, db->datain - db->dataout);
769 db->dataout += written;
770 dumpbytes += written;
772 dumpsize += (dumpbytes / 1024);
773 filesize += (dumpbytes / 1024);
776 if (errno != ENOSPC) {
777 errstr = squotef("data write: %s", strerror(errno));
783 * NO-ROOM is informational only. Later, RQ_MORE_DISK will be
784 * issued to use another holding disk.
786 putresult(NO_ROOM, "%s %lu\n", handle, db->use+db->split_size-dumpsize);
787 db->use = 0; /* force RQ_MORE_DISK */
788 db->split_size = dumpsize;
791 if (db->datain == db->dataout) {
793 * We flushed the whole buffer so reset to use it all.
795 db->datain = db->dataout = db->buf;
800 amfree(new_filename);
801 amfree(tmp_filename);
802 amfree(arg_filename);
808 * Send an Amanda dump header to the output file.
811 write_tapeheader(outfd, file)
815 char buffer[DISK_BLOCK_BYTES];
818 file->blocksize = DISK_BLOCK_BYTES;
819 build_header(buffer, file, sizeof(buffer));
821 written = fullwrite(outfd, buffer, sizeof(buffer));
822 if(written == sizeof(buffer)) return 0;
823 if(written < 0) return written;