#define REP_TIMEOUT (6*60*60) /* secs for service to reply */
#define ACK_TIMEOUT 10 /* XXX should be configurable */
+#define STDERR_PIPE (DATA_FD_COUNT + 1)
#define amandad_debug(i, ...) do { \
if ((i) <= debug_amandad) { \
struct active_service;
typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
+/* string that we scan for in sendbackup's MESG stream */
+static const char info_end_str[] = "sendbackup: info end\n";
+#define INFO_END_LEN (sizeof(info_end_str)-1)
+
+/*
+ * Here are the services that we allow.
+ * Must be in the same order as services[].
+ */
+typedef enum {
+ SERVICE_NOOP,
+ SERVICE_SENDSIZE,
+ SERVICE_SENDBACKUP,
+ SERVICE_SELFCHECK,
+ SERVICE_AMINDEXD,
+ SERVICE_AMIDXTAPED
+} service_t;
+
+static struct services {
+ char *name;
+ int active;
+ service_t service;
+} services[] = {
+ { "noop", 1, SERVICE_NOOP },
+ { "sendsize", 1, SERVICE_SENDSIZE },
+ { "sendbackup", 1, SERVICE_SENDBACKUP },
+ { "selfcheck", 1, SERVICE_SELFCHECK },
+ { "amindexd", 0, SERVICE_AMINDEXD },
+ { "amidxtaped", 0, SERVICE_AMIDXTAPED }
+};
+#define NSERVICES (int)(sizeof(services) / sizeof(services[0]))
+
/*
* This structure describes an active running service.
*
* for communications with the amanda server.
*/
struct active_service {
+ service_t service; /* service name */
char *cmd; /* name of command we ran */
char *arguments; /* arguments we sent it */
security_handle_t *security_handle; /* remote server */
int send_partial_reply; /* send PREP packet */
int reqfd; /* pipe to write requests */
int repfd; /* pipe to read replies */
+ int errfd; /* pipe to read stderr */
event_handle_t *ev_repfd; /* read event handle for repfd */
event_handle_t *ev_reptimeout; /* timeout for rep data */
+ event_handle_t *ev_errfd; /* read event handle for errfd */
pkt_t rep_pkt; /* rep packet we're sending out */
+ char *errbuf; /* buffer to read the err into */
char *repbuf; /* buffer to read the rep into */
size_t bufsize; /* length of repbuf */
size_t repbufsize; /* length of repbuf */
int repretry; /* times we'll retry sending the rep */
+ int seen_info_end; /* have we seen "sendbackup info end\n"? */
+ char info_end_buf[INFO_END_LEN]; /* last few bytes read, used for scanning for info end */
+
/*
* General user streams to the process, and their equivalent
* network streams.
TAILQ_ENTRY(active_service) tq; /* queue handle */
};
-/*
- * Here are the services that we allow.
- */
-static struct services {
- char *name;
- int active;
-} services[] = {
- { "noop", 1 },
- { "sendsize", 1 },
- { "sendbackup", 1 },
- { "selfcheck", 1 },
- { "amindexd", 0 },
- { "amidxtaped", 0 }
-};
-#define NSERVICES (int)(sizeof(services) / sizeof(services[0]))
-
/*
* Queue of outstanding requests that we are running.
*/
static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
static void repfd_recv(void *);
+static void errfd_recv(void *);
static void timeout_repfd(void *);
static void protocol_recv(void *, pkt_t *, security_status_t);
static void process_readnetfd(void *);
static void process_writenetfd(void *, void *, ssize_t);
static struct active_service *service_new(security_handle_t *,
- const char *, const char *);
+ const char *, service_t, const char *);
static void service_delete(struct active_service *);
static int writebuf(struct active_service *, const void *, size_t);
static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
/* Don't die when child closes pipe */
signal(SIGPIPE, SIG_IGN);
+ /* Parse the configuration; we'll handle errors later */
config_init(CONFIG_INIT_CLIENT, NULL);
if (geteuid() == 0) {
if(strcasecmp(auth, "rsh") == 0 ||
strcasecmp(auth, "ssh") == 0 ||
+ strcasecmp(auth, "local") == 0 ||
strcasecmp(auth, "bsdtcp") == 0) {
wait_30s = 0;
exit_on_qlength = 1;
if (getuid() == 0) {
if (strcasecmp(auth, "krb5") != 0) {
- error(_("Amanda must be run as user '%s' when using '%s' authetication"),
+ error(_("Amanda must be run as user '%s' when using '%s' authentication"),
CLIENT_LOGIN, auth);
}
} else {
if (strcasecmp(auth, "krb5") == 0) {
- error(_("Amanda must be run as user 'root' when using 'krb5' authetication"));
+ error(_("Amanda must be run as user 'root' when using 'krb5' authentication"));
}
}
struct active_service *as;
char *pktbody, *tok, *service, *arguments;
char *service_path = NULL;
+ GSList *errlist = NULL;
int i;
pkt_out.body = NULL;
return;
}
+ /*
+ * If we have errors (not warnings) from the config file, let the server
+ * know immediately. Unfortunately, we only get one ERROR line, so if there
+ * are multiple errors, we just show the first.
+ */
+ if (config_errors(&errlist) >= CFGERR_ERRORS) {
+ GSList *iter = errlist;
+ char *errmsg;
+ gboolean multiple_errors = FALSE;
+
+ if (iter) {
+ errmsg = (char *)iter->data;
+ if (iter->next)
+ multiple_errors = TRUE;
+ } else {
+ errmsg = "(no error message)";
+ }
+
+ pkt_init(&pkt_out, P_NAK, "ERROR %s%s", errmsg,
+ multiple_errors? _(" (additional errors not displayed)"):"");
+ do_sendpkt(handle, &pkt_out);
+ amfree(pkt_out.body);
+ security_close(handle);
+ return;
+ }
+
/*
* If pkt is NULL, then there was a problem with the new connection.
*/
* the request pipe.
*/
dbprintf(_("creating new service: %s\n%s\n"), service, arguments);
- as = service_new(handle, service_path, arguments);
+ as = service_new(handle, service_path, services[i].service, arguments);
if (writebuf(as, arguments, strlen(arguments)) < 0) {
const char *errmsg = strerror(errno);
dbprintf(_("error sending arguments to %s: %s\n"), service, errmsg);
as->ev_repfd = event_register((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
timeout_repfd, as);
+ as->errbuf = NULL;
+ as->ev_errfd = event_register((event_id_t)as->errfd, EV_READFD, errfd_recv, as);
security_recvpkt(as->security_handle, protocol_recv, as, -1);
return (A_PENDING);
}
char *what;
char *msg;
int code = 0;
- int t;
int pid;
amwait_t retstat;
/* If end of service, wait for process status */
if (n == 0) {
- t = 0;
pid = waitpid(as->pid, &retstat, WNOHANG);
- while (t<5 && pid == 0) {
- sleep(1);
- t++;
- pid = waitpid(as->pid, &retstat, WNOHANG);
+ if (as->service == SERVICE_NOOP ||
+ as->service == SERVICE_SENDSIZE ||
+ as->service == SERVICE_SELFCHECK) {
+ int t = 0;
+ while (t<5 && pid == 0) {
+ sleep(1);
+ t++;
+ pid = waitpid(as->pid, &retstat, WNOHANG);
+ }
+ }
+
+ /* Process errfd before sending the REP packet */
+ if (as->ev_errfd) {
+ SELECT_ARG_TYPE readset;
+ struct timeval tv;
+ int nfound;
+
+ memset(&tv, 0, SIZEOF(tv));
+ FD_ZERO(&readset);
+ FD_SET(as->errfd, &readset);
+ nfound = select(as->errfd+1, &readset, NULL, NULL, &tv);
+ if (nfound && FD_ISSET(as->errfd, &readset)) {
+ errfd_recv(as);
+ }
}
+ if (pid == 0)
+ pid = waitpid(as->pid, &retstat, WNOHANG);
+
if (pid > 0) {
what = NULL;
if (! WIFEXITED(retstat)) {
}
strcpy(as->repbuf + as->repbufsize, msg);
as->repbufsize += strlen(msg);
+ amfree(msg);
}
}
}
dh->netfd = NULL;
continue;
}
- /* setup an event for reads from it */
- dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
- process_readnetfd, dh);
+
+ /* setup an event for reads from it. As a special case, don't start
+ * listening on as->data[0] until we read some data on another fd, if
+ * the service is sendbackup. This ensures that we send a MESG or
+ * INDEX token before any DATA tokens, as dumper assumes. This is a
+ * hack, if that wasn't already obvious! */
+ if (dh != &as->data[0] || as->service != SERVICE_SENDBACKUP) {
+ dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
+ process_readnetfd, dh);
+ } else {
+ amandad_debug(1, "Skipping registration of sendbackup's data FD\n");
+ }
security_stream_read(dh->netfd, process_writenetfd, dh);
state_machine(as, A_RECVREP, NULL);
}
+/*
+ * Called when a errfd has received data
+ */
+static void
+errfd_recv(
+ void * cookie)
+{
+ struct active_service *as = cookie;
+ char buf[32769];
+ int n;
+ char *r;
+
+ assert(as != NULL);
+ assert(as->ev_errfd != NULL);
+
+ n = read(as->errfd, &buf, 32768);
+ /* merge buffer */
+ if (n > 0) {
+ /* Terminate it with '\0' */
+ buf[n+1] = '\0';
+
+ if (as->errbuf) {
+ as->errbuf = vstrextend(&as->errbuf, buf, NULL);
+ } else {
+ as->errbuf = stralloc(buf);
+ }
+ } else if (n == 0) {
+ event_release(as->ev_errfd);
+ as->ev_errfd = NULL;
+ } else { /* n < 0 */
+ event_release(as->ev_errfd);
+ as->ev_errfd = NULL;
+ g_snprintf(buf, 32768,
+ "error reading stderr or service: %s\n", strerror(errno));
+ }
+
+ /* for each line terminate by '\n' */
+ while (as->errbuf != NULL && (r = index(as->errbuf, '\n')) != NULL) {
+ char *s;
+
+ *r = '\0';
+ s = vstrallocf("ERROR service %s: %s\n",
+ services[as->service].name, as->errbuf);
+
+ /* Add to repbuf, error message will be in the REP packet if it
+ * is not already sent
+ */
+ n = strlen(s);
+ if (as->bufsize == 0) {
+ as->bufsize = NETWORK_BLOCK_BYTES;
+ as->repbuf = alloc(as->bufsize);
+ }
+ while (as->bufsize < as->repbufsize + n) {
+ char *repbuf_temp;
+ as->bufsize *= 2;
+ repbuf_temp = alloc(as->bufsize);
+ memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
+ amfree(as->repbuf);
+ as->repbuf = repbuf_temp;
+ }
+ memcpy(as->repbuf + as->repbufsize, s, n);
+ as->repbufsize += n;
+
+ dbprintf("%s", s);
+
+ /* remove first line from buffer */
+ r++;
+ s = stralloc(r);
+ amfree(as->errbuf);
+ as->errbuf = s;
+ }
+}
+
/*
* Called when a repfd has timed out
*/
service_delete(as);
return;
}
+
+ /* Handle the special case of recognizing "sendbackup info end"
+ * from sendbackup's MESG fd */
+ if (as->service == SERVICE_SENDBACKUP && !as->seen_info_end && dh == &as->data[1]) {
+ /* make a buffer containing the combined data from info_end_buf
+ * and what we've read this time, and search it for info_end_strj
+ * This includes a NULL byte for strstr's sanity. */
+ char *combined_buf = malloc(INFO_END_LEN + n + 1);
+ memcpy(combined_buf, as->info_end_buf, INFO_END_LEN);
+ memcpy(combined_buf+INFO_END_LEN, as->databuf, n);
+ combined_buf[INFO_END_LEN+n] = '\0';
+
+ as->seen_info_end = (strstr(combined_buf, info_end_str) != NULL);
+
+ /* fill info_end_buf from the tail end of combined_buf */
+ memcpy(as->info_end_buf, combined_buf + n, INFO_END_LEN);
+
+ /* if we did see info_end_str, start reading the data fd (fd 0) */
+ if (as->seen_info_end) {
+ struct datafd_handle *dh = &as->data[0];
+ amandad_debug(1, "Opening datafd to sendbackup (delayed until sendbackup sent header info)\n");
+ dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
+ process_readnetfd, dh);
+ } else {
+ amandad_debug(1, "sendbackup header info still not complete\n");
+ }
+ }
+
if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
/* stream has croaked */
pkt_init(&nak, P_NAK, _("ERROR write error on stream %d: %s\n"),
if (dh->fd_write <= 0) {
dbprintf(_("process_writenetfd: dh->fd_write <= 0\n"));
} else if (size > 0) {
- fullwrite(dh->fd_write, buf, (size_t)size);
+ full_write(dh->fd_write, buf, (size_t)size);
security_stream_read(dh->netfd, process_writenetfd, dh);
}
else {
service_new(
security_handle_t * security_handle,
const char * cmd,
+ service_t service,
const char * arguments)
{
int i;
- int data_read[DATA_FD_COUNT + 1][2];
- int data_write[DATA_FD_COUNT + 1][2];
+ int data_read[DATA_FD_COUNT + 2][2];
+ int data_write[DATA_FD_COUNT + 2][2];
struct active_service *as;
pid_t pid;
int newfd;
assert(arguments != NULL);
/* a plethora of pipes */
+ /* data_read[0] : stdin
+ * data_write[0] : stdout
+ * data_read[1], data_write[1] : first stream
+ * data_read[2], data_write[2] : second stream
+ * data_read[3], data_write[3] : third stream
+ * data_write[4] : stderr
+ */
for (i = 0; i < DATA_FD_COUNT + 1; i++) {
if (pipe(data_read[i]) < 0) {
error(_("pipe: %s\n"), strerror(errno));
/*NOTREACHED*/
}
}
+ if (pipe(data_write[STDERR_PIPE]) < 0) {
+ error(_("pipe: %s\n"), strerror(errno));
+ /*NOTREACHED*/
+ }
switch(pid = fork()) {
case -1:
/*
* The parent. Close the far ends of our pipes and return.
*/
- as = alloc(SIZEOF(*as));
+ as = g_new0(struct active_service, 1);
as->cmd = stralloc(cmd);
as->arguments = stralloc(arguments);
as->security_handle = security_handle;
as->state = NULL;
+ as->service = service;
as->pid = pid;
as->send_partial_reply = 0;
- if(strcmp(cmd+(strlen(cmd)-8), "sendsize") == 0) {
+ as->seen_info_end = FALSE;
+ /* fill in info_end_buf with non-null characters */
+ memset(as->info_end_buf, '-', sizeof(as->info_end_buf));
+ if(service == SERVICE_SENDSIZE) {
g_option_t *g_options;
char *option_str, *p;
as->repretry = 0;
as->rep_pkt.body = NULL;
+ /*
+ * read from the stderr pipe
+ */
+ as->errfd = data_write[STDERR_PIPE][0];
+ aclose(data_write[STDERR_PIPE][1]);
+ as->ev_errfd = NULL;
+ as->errbuf = NULL;
+
/*
* read from the rest of the general-use pipes
* (netfds are opened as the client requests them)
/* close all unneeded fd */
close(STDERR_FILENO);
- debug_dup_stderr_to_debug();
+ dup2(data_write[STDERR_PIPE][1], 2);
+ aclose(data_write[STDERR_PIPE][0]);
+ aclose(data_write[STDERR_PIPE][1]);
safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env());
size_t size)
{
pid_t pid;
- ssize_t writesize;
+ size_t writesize;
switch (pid=fork()) {
case -1:
case 0: /* this is the child */
close(as->repfd);
- writesize = fullwrite(as->reqfd, bufp, size);
- exit(writesize != (ssize_t)size);
+ writesize = full_write(as->reqfd, bufp, size);
+ exit(writesize != size);
/* NOTREACHED */
}
return -1;