#include "protocol.h"
#include "security.h"
#include "stream.h"
-#include "version.h"
#include "fileheader.h"
#include "amfeatures.h"
#include "server_util.h"
#include "util.h"
#include "holding.h"
#include "timestamp.h"
-
-#define chunker_debug(i, ...) do { \
- if ((i) <= debug_chunker) { \
- dbprintf(__VA_ARGS__); \
- } \
-} while (0)
+#include "sockaddr-util.h"
#ifndef SEEK_SET
#define SEEK_SET 0
static void databuf_init(struct databuf *, int, char *, off_t, off_t);
static int databuf_flush(struct databuf *);
-static int startup_chunker(char *, off_t, off_t, struct databuf *);
-static int do_chunk(int, struct databuf *);
+static int startup_chunker(char *, off_t, off_t, struct databuf *, int *, int *);
+static int do_chunk(int, struct databuf *, int, int);
+/* we use a function pointer for full_write, so that we can "shim" in
+ * full_write_with_fake_enospc for testing */
+static size_t (*db_full_write)(int fd, const void *buf, size_t count);
+static size_t full_write_with_fake_enospc(int fd, const void *buf, size_t count);
+static off_t fake_enospc_at_byte = -1;
int
main(
{
static struct databuf db;
struct cmdargs *cmdargs;
- int infd;
+ int header_fd;
char *q = NULL;
char *filename = NULL;
off_t chunksize, use;
times_t runtime;
am_feature_t *their_features = NULL;
int a;
- config_overwrites_t *cfg_ovr = NULL;
+ config_overrides_t *cfg_ovr = NULL;
char *cfg_opt = NULL;
char *m;
+ int header_socket;
+ int data_socket;
/*
* Configure program for internationalization:
/* Don't die when child closes pipe */
signal(SIGPIPE, SIG_IGN);
- erroutput_type = (ERR_AMANDALOG|ERR_INTERACTIVE);
- set_logerror(logerror);
+ add_amanda_log_handler(amanda_log_stderr);
+ add_amanda_log_handler(amanda_log_trace_log);
- cfg_ovr = extract_commandline_config_overwrites(&argc, &argv);
+ cfg_ovr = extract_commandline_config_overrides(&argc, &argv);
if (argc > 1)
cfg_opt = argv[1];
+ set_config_overrides(cfg_ovr);
config_init(CONFIG_INIT_EXPLICIT_NAME | CONFIG_INIT_USE_CWD, cfg_opt);
- apply_config_overwrites(cfg_ovr);
if (config_errors(NULL) >= CFGERR_WARNINGS) {
config_print_errors();
g_fprintf(stderr,
_("%s: pid %ld executable %s version %s\n"),
get_pname(), (long) getpid(),
- argv[0], version());
+ argv[0], VERSION);
fflush(stderr);
/* now, make sure we are a valid user */
log_add(L_INFO, "%s pid %ld", get_pname(), (long)getpid());
error(_("Didn't get START command"));
}
+ free_cmdargs(cmdargs);
+
+ /* set up a fake ENOSPC for testing purposes. Note that this counts
+ * headers as well as data written to disk. */
+ if (getenv("CHUNKER_FAKE_ENOSPC_AT")) {
+ char *env = getenv("CHUNKER_FAKE_ENOSPC_AT");
+ fake_enospc_at_byte = (off_t)atoi(env); /* these values are never > MAXINT */
+ db_full_write = full_write_with_fake_enospc;
+ g_debug("will trigger fake ENOSPC at byte %d", (int)fake_enospc_at_byte);
+ } else {
+ db_full_write = full_write;
+ }
/* do {*/
cmdargs = getcmd();
/*NOTREACHED*/
}
- if((infd = startup_chunker(filename, use, chunksize, &db)) < 0) {
+ if ((header_fd = startup_chunker(filename, use, chunksize, &db,
+ &header_socket, &data_socket)) < 0) {
q = quote_string(vstrallocf(_("[chunker startup failed: %s]"), errstr));
putresult(TRYAGAIN, "%s %s\n", handle, q);
error("startup_chunker failed: %s", errstr);
}
command_in_transit = NULL;
- if(infd >= 0 && do_chunk(infd, &db)) {
+ if (header_fd >= 0 && do_chunk(header_fd, &db, header_socket, data_socket)) {
char kb_str[NUM_STR_SIZE];
char kps_str[NUM_STR_SIZE];
double rt;
m = vstrallocf("[%s]", errstr);
q = quote_string(m);
amfree(m);
+ free_cmdargs(cmdargs);
if(command_in_transit != NULL) {
cmdargs = command_in_transit;
command_in_transit = NULL;
default: break;
}
amfree(q);
- } else if(infd != -2) {
+ } else if (header_fd != -2) {
if(q == NULL) {
m = vstrallocf("[%s]", errstr);
q = quote_string(m);
char * filename,
off_t use,
off_t chunksize,
- struct databuf * db)
+ struct databuf * db,
+ int *headersocket,
+ int *datasocket)
{
- int infd, outfd;
+ int header_fd, outfd;
char *tmp_filename, *pc;
- in_port_t data_port;
- int data_socket;
+ in_port_t header_port, data_port;
+ int header_socket, data_socket;
int result;
struct addrinfo *res;
+ struct addrinfo *res_addr;
+ sockaddr_union *addr = NULL;
+ sockaddr_union data_addr;
+ header_port = 0;
data_port = 0;
if ((result = resolve_hostname("localhost", 0, &res, NULL) != 0)) {
errstr = newvstrallocf(errstr, _("could not resolve localhost: %s"),
gai_strerror(result));
return -1;
}
- data_socket = stream_server(res->ai_family, &data_port, 0,
+ for (res_addr = res; res_addr != NULL; res_addr = res_addr->ai_next) {
+ g_debug("ra: %s\n", str_sockaddr((sockaddr_union*)res_addr->ai_addr));
+ if (res_addr->ai_family == AF_INET) {
+ addr = (sockaddr_union *)res_addr->ai_addr;
+ break;
+ }
+ }
+ if (!addr) {
+ addr = (sockaddr_union *)res->ai_addr;
+ g_debug("addr: %s\n", str_sockaddr(addr));
+ }
+
+ header_socket = stream_server(SU_GET_FAMILY(addr), &header_port, 0,
+ STREAM_BUFSIZE, 0);
+ data_socket = stream_server(SU_GET_FAMILY(addr), &data_port, 0,
STREAM_BUFSIZE, 0);
+ copy_sockaddr(&data_addr, addr);
+
+ SU_SET_PORT(&data_addr, data_port);
+
if (res) freeaddrinfo(res);
- if(data_socket < 0) {
- errstr = vstrallocf(_("error creating stream server: %s"), strerror(errno));
+ if (header_socket < 0) {
+ errstr = vstrallocf(_("error creating header stream server: %s"), strerror(errno));
+ aclose(data_socket);
return -1;
}
- putresult(PORT, "%d\n", data_port);
+ if (data_socket < 0) {
+ errstr = vstrallocf(_("error creating data stream server: %s"), strerror(errno));
+ aclose(header_socket);
+ return -1;
+ }
- infd = stream_accept(data_socket, CONNECT_TIMEOUT, 0, STREAM_BUFSIZE);
- aclose(data_socket);
- if(infd == -1) {
- errstr = vstrallocf(_("error accepting stream: %s"), strerror(errno));
+ putresult(PORT, "%d %s\n", header_port, str_sockaddr(&data_addr));
+
+ header_fd = stream_accept(header_socket, CONNECT_TIMEOUT, 0,
+ STREAM_BUFSIZE);
+ if (header_fd == -1) {
+ errstr = vstrallocf(_("error accepting header stream: %s"),
+ strerror(errno));
+ aclose(header_socket);
+ aclose(data_socket);
return -1;
}
errstr = quote_string(m);
amfree(m);
amfree(tmp_filename);
- aclose(infd);
+ aclose(header_fd);
+ aclose(data_socket);
if(save_errno == ENOSPC) {
putresult(NO_ROOM, "%s %lld\n",
handle, (long long)use);
amfree(tmp_filename);
databuf_init(db, outfd, filename, use, chunksize);
db->filename_seq++;
- return infd;
+ *headersocket = header_socket;
+ *datasocket = data_socket;
+ return header_fd;
}
static int
do_chunk(
- int infd,
- struct databuf * db)
+ int header_fd,
+ struct databuf * db,
+ int header_socket,
+ int data_socket)
{
size_t nread;
+ int data_fd;
char header_buf[DISK_BLOCK_BYTES];
startclock();
* need to save into "file", as well as write out. Later, the
* chunk code will rewrite it.
*/
- nread = full_read(infd, header_buf, SIZEOF(header_buf));
+ nread = full_read(header_fd, header_buf, SIZEOF(header_buf));
+ aclose(header_fd);
+ aclose(header_socket);
if (nread != sizeof(header_buf)) {
if(errno != 0) {
errstr = vstrallocf(_("cannot read header: %s"), strerror(errno));
errstr = vstrallocf(_("cannot read header: got %zd bytes instead of %zd"),
nread, sizeof(header_buf));
}
+ aclose(data_socket);
return 0;
}
parse_file_header(header_buf, &file, (size_t)nread);
putresult(NO_ROOM, "%s %lld\n", handle,
(long long)(db->use+db->split_size-dumpsize));
}
+ aclose(data_socket);
return 0;
}
dumpsize += (off_t)DISK_BLOCK_KB;
filesize = (off_t)DISK_BLOCK_KB;
headersize += DISK_BLOCK_KB;
+ /* open the data socket */
+ data_fd = stream_accept(data_socket, CONNECT_TIMEOUT, 0, STREAM_BUFSIZE);
+
+ if (data_fd == -1) {
+ errstr = vstrallocf(_("error accepting data stream: %s"),
+ strerror(errno));
+ aclose(data_socket);
+ return 0;
+ }
+
/*
* We've written the file header. Now, just write data until the
* end.
*/
- while ((nread = full_read(infd, db->buf,
+ while ((nread = full_read(data_fd, db->buf,
(size_t)(db->datalimit - db->datain))) > 0) {
db->datain += nread;
while(db->dataout < db->datain) {
if(!databuf_flush(db)) {
+ aclose(data_fd);
+ aclose(data_socket);
return 0;
}
}
}
while(db->dataout < db->datain) {
if(!databuf_flush(db)) {
+ aclose(data_fd);
+ aclose(data_socket);
return 0;
}
}
dumpsize += (off_t)1; /* count partial final KByte */
filesize += (off_t)1;
}
+ aclose(data_fd);
+ aclose(data_socket);
return 1;
}
{
struct cmdargs *cmdargs = NULL;
int rc = 1;
+ size_t size_to_write;
size_t written;
off_t left_in_chunk;
char *arg_filename = NULL;
aclose(newfd);
if(save_errno == ENOSPC) {
+ if (unlink(tmp_filename) < 0) {
+ g_debug("could not delete '%s'; ignoring", tmp_filename);
+ }
putresult(NO_ROOM, "%s %lld\n", handle,
(long long)(db->use+db->split_size-dumpsize));
db->use = (off_t)0; /* force RQ_MORE DISK */
db->split_size = dumpsize;
+ file.type = save_type;
continue;
}
m = vstrallocf(_("write_tapeheader file %s: %s"),
/*
* Write out the buffer
*/
- written = full_write(db->fd, db->dataout,
- (size_t)(db->datain - db->dataout));
+ size_to_write = (size_t)(db->datain - db->dataout);
+ written = db_full_write(db->fd, db->dataout, size_to_write);
if (written > 0) {
db->dataout += written;
dumpbytes += (off_t)written;
dumpsize += (dumpbytes / (off_t)1024);
filesize += (dumpbytes / (off_t)1024);
dumpbytes %= 1024;
- if (written == 0) {
+ if (written < size_to_write) {
if (errno != ENOSPC) {
char *m = vstrallocf(_("data write: %s"), strerror(errno));
errstr = quote_string(m);
size_t written;
file->blocksize = DISK_BLOCK_BYTES;
- buffer = build_header(file, DISK_BLOCK_BYTES);
+ if (debug_chunker > 1)
+ dump_dumpfile_t(file);
+ buffer = build_header(file, NULL, DISK_BLOCK_BYTES);
+ if (!buffer) /* this shouldn't happen */
+ error(_("header does not fit in %zd bytes"), (size_t)DISK_BLOCK_BYTES);
- written = full_write(outfd, buffer, DISK_BLOCK_BYTES);
+ written = db_full_write(outfd, buffer, DISK_BLOCK_BYTES);
amfree(buffer);
if(written == DISK_BLOCK_BYTES) return 0;
return (ssize_t)-1;
}
+
+static size_t
+full_write_with_fake_enospc(
+ int fd,
+ const void *buf,
+ size_t count)
+{
+ size_t rc;
+
+ //g_debug("HERE %zd %zd", count, (size_t)fake_enospc_at_byte);
+
+ if (count <= (size_t)fake_enospc_at_byte) {
+ fake_enospc_at_byte -= count;
+ return full_write(fd, buf, count);
+ }
+
+ /* if we get here, the caller has requested a size that is less
+ * than fake_enospc_at_byte. */
+ count = fake_enospc_at_byte;
+ g_debug("returning fake ENOSPC");
+
+ if (fake_enospc_at_byte) {
+ rc = full_write(fd, buf, fake_enospc_at_byte);
+ if (rc == (size_t)fake_enospc_at_byte) {
+ /* full_write succeeded, so fake a failure */
+ errno = ENOSPC;
+ }
+ } else {
+ /* no bytes to write; just fake an error */
+ errno = ENOSPC;
+ rc = 0;
+ }
+
+ /* switch back to calling full_write directly */
+ fake_enospc_at_byte = -1;
+ db_full_write = full_write;
+ return rc;
+}