Imported Upstream version 2.6.1
[debian/amanda] / amandad-src / amandad.c
index 430f2b7f6a5a33990c396891725f6a1f8f171e92..cfb7a47948ec7edeb04786fe85e9058f389a468c 100644 (file)
@@ -47,6 +47,7 @@
 
 #define        REP_TIMEOUT     (6*60*60)       /* secs for service to reply */
 #define        ACK_TIMEOUT     10              /* XXX should be configurable */
+#define STDERR_PIPE (DATA_FD_COUNT + 1)
 
 #define amandad_debug(i, ...) do {     \
        if ((i) <= debug_amandad) {     \
@@ -67,6 +68,37 @@ typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
 struct active_service;
 typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
 
+/* string that we scan for in sendbackup's MESG stream */
+static const char info_end_str[] = "sendbackup: info end\n";
+#define INFO_END_LEN (sizeof(info_end_str)-1)
+
+/* 
+ * Here are the services that we allow.
+ * Must be in the same order as services[].
+ */
+typedef enum {
+    SERVICE_NOOP,
+    SERVICE_SENDSIZE,
+    SERVICE_SENDBACKUP,
+    SERVICE_SELFCHECK,
+    SERVICE_AMINDEXD,
+    SERVICE_AMIDXTAPED
+} service_t;
+
+static struct services {
+    char *name;
+    int  active;
+    service_t service;
+} services[] = {
+   { "noop", 1, SERVICE_NOOP },
+   { "sendsize", 1, SERVICE_SENDSIZE },
+   { "sendbackup", 1, SERVICE_SENDBACKUP },
+   { "selfcheck", 1, SERVICE_SELFCHECK },
+   { "amindexd", 0, SERVICE_AMINDEXD },
+   { "amidxtaped", 0, SERVICE_AMIDXTAPED }
+};
+#define        NSERVICES       (int)(sizeof(services) / sizeof(services[0]))
+
 /*
  * This structure describes an active running service.
  *
@@ -76,6 +108,7 @@ typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
  * for communications with the amanda server.
  */
 struct active_service {
+    service_t service;                 /* service name */
     char *cmd;                         /* name of command we ran */
     char *arguments;                   /* arguments we sent it */
     security_handle_t *security_handle;        /* remote server */
@@ -84,13 +117,19 @@ struct active_service {
     int send_partial_reply;            /* send PREP packet */
     int reqfd;                         /* pipe to write requests */
     int repfd;                         /* pipe to read replies */
+    int errfd;                         /* pipe to read stderr */
     event_handle_t *ev_repfd;          /* read event handle for repfd */
     event_handle_t *ev_reptimeout;     /* timeout for rep data */
+    event_handle_t *ev_errfd;          /* read event handle for errfd */
     pkt_t rep_pkt;                     /* rep packet we're sending out */
+    char *errbuf;                      /* buffer to read the err into */
     char *repbuf;                      /* buffer to read the rep into */
     size_t bufsize;                    /* length of repbuf */
     size_t repbufsize;                 /* length of repbuf */
     int repretry;                      /* times we'll retry sending the rep */
+    int seen_info_end;                 /* have we seen "sendbackup info end\n"? */
+    char info_end_buf[INFO_END_LEN];   /* last few bytes read, used for scanning for info end */
+
     /*
      * General user streams to the process, and their equivalent
      * network streams.
@@ -107,22 +146,6 @@ struct active_service {
     TAILQ_ENTRY(active_service) tq;    /* queue handle */
 };
 
-/* 
- * Here are the services that we allow.
- */
-static struct services {
-    char *name;
-    int  active;
-} services[] = {
-    { "noop", 1 },
-    { "sendsize", 1 },
-    { "sendbackup", 1 },
-    { "selfcheck", 1 },
-    { "amindexd", 0 },
-    { "amidxtaped", 0 }
-};
-#define        NSERVICES       (int)(sizeof(services) / sizeof(services[0]))
-
 /*
  * Queue of outstanding requests that we are running.
  */
@@ -152,12 +175,13 @@ static action_t s_sendrep(struct active_service *, action_t, pkt_t *);
 static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
 
 static void repfd_recv(void *);
+static void errfd_recv(void *);
 static void timeout_repfd(void *);
 static void protocol_recv(void *, pkt_t *, security_status_t);
 static void process_readnetfd(void *);
 static void process_writenetfd(void *, void *, ssize_t);
 static struct active_service *service_new(security_handle_t *,
-    const char *, const char *);
+    const char *, service_t, const char *);
 static void service_delete(struct active_service *);
 static int writebuf(struct active_service *, const void *, size_t);
 static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
@@ -216,6 +240,7 @@ main(
     /* Don't die when child closes pipe */
     signal(SIGPIPE, SIG_IGN);
 
+    /* Parse the configuration; we'll handle errors later */
     config_init(CONFIG_INIT_CLIENT, NULL);
 
     if (geteuid() == 0) {
@@ -412,6 +437,7 @@ main(
 
     if(strcasecmp(auth, "rsh") == 0 ||
        strcasecmp(auth, "ssh") == 0 ||
+       strcasecmp(auth, "local") == 0 ||
        strcasecmp(auth, "bsdtcp") == 0) {
        wait_30s = 0;
        exit_on_qlength = 1;
@@ -419,12 +445,12 @@ main(
 
     if (getuid() == 0) {
        if (strcasecmp(auth, "krb5") != 0) {
-           error(_("Amanda must be run as user '%s' when using '%s' authetication"),
+           error(_("Amanda must be run as user '%s' when using '%s' authentication"),
                  CLIENT_LOGIN, auth);
        }
     } else {
        if (strcasecmp(auth, "krb5") == 0) {
-           error(_("Amanda must be run as user 'root' when using 'krb5' authetication"));
+           error(_("Amanda must be run as user 'root' when using 'krb5' authentication"));
        }
     }
 
@@ -514,6 +540,7 @@ protocol_accept(
     struct active_service *as;
     char *pktbody, *tok, *service, *arguments;
     char *service_path = NULL;
+    GSList *errlist = NULL;
     int i;
 
     pkt_out.body = NULL;
@@ -525,6 +552,32 @@ protocol_accept(
        return;
     }
 
+    /*
+     * If we have errors (not warnings) from the config file, let the server
+     * know immediately.  Unfortunately, we only get one ERROR line, so if there
+     * are multiple errors, we just show the first.
+     */
+    if (config_errors(&errlist) >= CFGERR_ERRORS) {
+       GSList *iter = errlist;
+       char *errmsg;
+       gboolean multiple_errors = FALSE;
+
+       if (iter) {
+           errmsg = (char *)iter->data;
+           if (iter->next)
+               multiple_errors = TRUE;
+       } else {
+           errmsg = "(no error message)";
+       }
+
+       pkt_init(&pkt_out, P_NAK, "ERROR %s%s", errmsg,
+           multiple_errors? _(" (additional errors not displayed)"):"");
+       do_sendpkt(handle, &pkt_out);
+       amfree(pkt_out.body);
+       security_close(handle);
+       return;
+    }
+
     /*
      * If pkt is NULL, then there was a problem with the new connection.
      */
@@ -612,7 +665,7 @@ protocol_accept(
      * the request pipe.
      */
     dbprintf(_("creating new service: %s\n%s\n"), service, arguments);
-    as = service_new(handle, service_path, arguments);
+    as = service_new(handle, service_path, services[i].service, arguments);
     if (writebuf(as, arguments, strlen(arguments)) < 0) {
        const char *errmsg = strerror(errno);
        dbprintf(_("error sending arguments to %s: %s\n"), service, errmsg);
@@ -759,6 +812,8 @@ s_sendack(
     as->ev_repfd = event_register((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
     as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
        timeout_repfd, as);
+    as->errbuf = NULL;
+    as->ev_errfd = event_register((event_id_t)as->errfd, EV_READFD, errfd_recv, as);
     security_recvpkt(as->security_handle, protocol_recv, as, -1);
     return (A_PENDING);
 }
@@ -779,7 +834,6 @@ s_repwait(
     char     *what;
     char     *msg;
     int       code = 0;
-    int       t;
     int       pid;
     amwait_t  retstat;
 
@@ -835,14 +889,36 @@ s_repwait(
 
     /* If end of service, wait for process status */
     if (n == 0) {
-       t = 0;
        pid = waitpid(as->pid, &retstat, WNOHANG);
-       while (t<5 && pid == 0) {
-           sleep(1);
-           t++;
-           pid = waitpid(as->pid, &retstat, WNOHANG);
+       if (as->service  == SERVICE_NOOP ||
+           as->service  == SERVICE_SENDSIZE ||
+           as->service  == SERVICE_SELFCHECK) {
+           int t = 0;
+           while (t<5 && pid == 0) {
+               sleep(1);
+               t++;
+               pid = waitpid(as->pid, &retstat, WNOHANG);
+           }
+       }
+
+       /* Process errfd before sending the REP packet */
+       if (as->ev_errfd) {
+           SELECT_ARG_TYPE readset;
+           struct timeval  tv;
+           int             nfound;
+
+           memset(&tv, 0, SIZEOF(tv));
+           FD_ZERO(&readset);
+           FD_SET(as->errfd, &readset);
+           nfound = select(as->errfd+1, &readset, NULL, NULL, &tv);
+           if (nfound && FD_ISSET(as->errfd, &readset)) {
+               errfd_recv(as);
+           }
        }
 
+       if (pid == 0)
+           pid = waitpid(as->pid, &retstat, WNOHANG);
+
        if (pid > 0) {
            what = NULL;
            if (! WIFEXITED(retstat)) {
@@ -870,6 +946,7 @@ s_repwait(
                }
                strcpy(as->repbuf + as->repbufsize, msg);
                as->repbufsize += strlen(msg);
+                amfree(msg);
            }
        }
     }
@@ -1077,9 +1154,18 @@ s_ackwait(
            dh->netfd = NULL;
            continue;
        }
-       /* setup an event for reads from it */
-       dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
-                                    process_readnetfd, dh);
+
+       /* setup an event for reads from it.  As a special case, don't start
+        * listening on as->data[0] until we read some data on another fd, if
+        * the service is sendbackup.  This ensures that we send a MESG or 
+        * INDEX token before any DATA tokens, as dumper assumes. This is a
+        * hack, if that wasn't already obvious! */
+       if (dh != &as->data[0] || as->service != SERVICE_SENDBACKUP) {
+           dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
+                                        process_readnetfd, dh);
+       } else {
+           amandad_debug(1, "Skipping registration of sendbackup's data FD\n");
+       }
 
        security_stream_read(dh->netfd, process_writenetfd, dh);
 
@@ -1132,6 +1218,79 @@ repfd_recv(
     state_machine(as, A_RECVREP, NULL);
 }
 
+/*
+ * Called when a errfd has received data
+ */
+static void
+errfd_recv(
+    void *     cookie)
+{
+    struct active_service *as = cookie;
+    char  buf[32769];
+    int   n;
+    char *r;
+
+    assert(as != NULL);
+    assert(as->ev_errfd != NULL);
+
+    n = read(as->errfd, &buf, 32768);
+    /* merge buffer */
+    if (n > 0) {
+       /* Terminate it with '\0' */
+       buf[n+1] = '\0';
+
+       if (as->errbuf) {
+           as->errbuf = vstrextend(&as->errbuf, buf, NULL);
+       } else {
+           as->errbuf = stralloc(buf);
+       }
+    } else if (n == 0) {
+       event_release(as->ev_errfd);
+       as->ev_errfd = NULL;
+    } else { /* n < 0 */
+       event_release(as->ev_errfd);
+       as->ev_errfd = NULL;
+       g_snprintf(buf, 32768,
+                  "error reading stderr or service: %s\n", strerror(errno));
+    }
+
+    /* for each line terminate by '\n' */
+    while (as->errbuf != NULL  && (r = index(as->errbuf, '\n')) != NULL) {
+       char *s;
+
+       *r = '\0';
+       s = vstrallocf("ERROR service %s: %s\n",
+                      services[as->service].name, as->errbuf);
+
+       /* Add to repbuf, error message will be in the REP packet if it
+        * is not already sent
+        */
+       n = strlen(s);
+       if (as->bufsize == 0) {
+           as->bufsize = NETWORK_BLOCK_BYTES;
+           as->repbuf = alloc(as->bufsize);
+       }
+       while (as->bufsize < as->repbufsize + n) {
+           char *repbuf_temp;
+           as->bufsize *= 2;
+           repbuf_temp = alloc(as->bufsize);
+           memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
+           amfree(as->repbuf);
+           as->repbuf = repbuf_temp;
+       }
+       memcpy(as->repbuf + as->repbufsize, s, n);
+       as->repbufsize += n;
+
+       dbprintf("%s", s);
+
+       /* remove first line from buffer */
+       r++;
+       s = stralloc(r);
+       amfree(as->errbuf);
+       as->errbuf = s;
+    }
+}
+
 /*
  * Called when a repfd has timed out
  */
@@ -1222,6 +1381,34 @@ process_readnetfd(
        service_delete(as);
        return;
     }
+
+    /* Handle the special case of recognizing "sendbackup info end"
+     * from sendbackup's MESG fd */
+    if (as->service == SERVICE_SENDBACKUP && !as->seen_info_end && dh == &as->data[1]) {
+       /* make a buffer containing the combined data from info_end_buf
+        * and what we've read this time, and search it for info_end_strj
+        * This includes a NULL byte for strstr's sanity. */
+       char *combined_buf = malloc(INFO_END_LEN + n + 1);
+       memcpy(combined_buf, as->info_end_buf, INFO_END_LEN);
+       memcpy(combined_buf+INFO_END_LEN, as->databuf, n);
+       combined_buf[INFO_END_LEN+n] = '\0';
+
+       as->seen_info_end = (strstr(combined_buf, info_end_str) != NULL);
+
+       /* fill info_end_buf from the tail end of combined_buf */
+       memcpy(as->info_end_buf, combined_buf + n, INFO_END_LEN);
+
+       /* if we did see info_end_str, start reading the data fd (fd 0) */
+       if (as->seen_info_end) {
+           struct datafd_handle *dh = &as->data[0];
+           amandad_debug(1, "Opening datafd to sendbackup (delayed until sendbackup sent header info)\n");
+           dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
+                                        process_readnetfd, dh);
+       } else {
+           amandad_debug(1, "sendbackup header info still not complete\n");
+       }
+    }
+
     if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
        /* stream has croaked */
        pkt_init(&nak, P_NAK, _("ERROR write error on stream %d: %s\n"),
@@ -1255,7 +1442,7 @@ process_writenetfd(
     if (dh->fd_write <= 0) {
        dbprintf(_("process_writenetfd: dh->fd_write <= 0\n"));
     } else if (size > 0) {
-       fullwrite(dh->fd_write, buf, (size_t)size);
+       full_write(dh->fd_write, buf, (size_t)size);
        security_stream_read(dh->netfd, process_writenetfd, dh);
     }
     else {
@@ -1314,11 +1501,12 @@ static struct active_service *
 service_new(
     security_handle_t *        security_handle,
     const char *       cmd,
+    service_t          service,
     const char *       arguments)
 {
     int i;
-    int data_read[DATA_FD_COUNT + 1][2];
-    int data_write[DATA_FD_COUNT + 1][2];
+    int data_read[DATA_FD_COUNT + 2][2];
+    int data_write[DATA_FD_COUNT + 2][2];
     struct active_service *as;
     pid_t pid;
     int newfd;
@@ -1328,6 +1516,13 @@ service_new(
     assert(arguments != NULL);
 
     /* a plethora of pipes */
+    /* data_read[0]                : stdin
+     * data_write[0]               : stdout
+     * data_read[1], data_write[1] : first  stream
+     * data_read[2], data_write[2] : second stream
+     * data_read[3], data_write[3] : third stream
+     * data_write[4]               : stderr
+     */
     for (i = 0; i < DATA_FD_COUNT + 1; i++) {
        if (pipe(data_read[i]) < 0) {
            error(_("pipe: %s\n"), strerror(errno));
@@ -1338,6 +1533,10 @@ service_new(
            /*NOTREACHED*/
        }
     }
+    if (pipe(data_write[STDERR_PIPE]) < 0) {
+       error(_("pipe: %s\n"), strerror(errno));
+       /*NOTREACHED*/
+    }
 
     switch(pid = fork()) {
     case -1:
@@ -1347,14 +1546,18 @@ service_new(
        /*
         * The parent.  Close the far ends of our pipes and return.
         */
-       as = alloc(SIZEOF(*as));
+       as = g_new0(struct active_service, 1);
        as->cmd = stralloc(cmd);
        as->arguments = stralloc(arguments);
        as->security_handle = security_handle;
        as->state = NULL;
+       as->service = service;
        as->pid = pid;
        as->send_partial_reply = 0;
-       if(strcmp(cmd+(strlen(cmd)-8), "sendsize") == 0) {
+       as->seen_info_end = FALSE;
+       /* fill in info_end_buf with non-null characters */
+       memset(as->info_end_buf, '-', sizeof(as->info_end_buf));
+       if(service == SERVICE_SENDSIZE) {
            g_option_t *g_options;
            char *option_str, *p;
 
@@ -1386,6 +1589,14 @@ service_new(
        as->repretry = 0;
        as->rep_pkt.body = NULL;
 
+       /*
+        * read from the stderr pipe
+        */
+       as->errfd = data_write[STDERR_PIPE][0];
+       aclose(data_write[STDERR_PIPE][1]);
+       as->ev_errfd = NULL;
+       as->errbuf = NULL;
+
        /*
         * read from the rest of the general-use pipes
         * (netfds are opened as the client requests them)
@@ -1482,7 +1693,9 @@ service_new(
 
        /* close all unneeded fd */
        close(STDERR_FILENO);
-       debug_dup_stderr_to_debug();
+       dup2(data_write[STDERR_PIPE][1], 2);
+        aclose(data_write[STDERR_PIPE][0]);
+        aclose(data_write[STDERR_PIPE][1]);
        safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
 
        execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env());
@@ -1572,7 +1785,7 @@ writebuf(
     size_t                     size)
 {
     pid_t pid;
-    ssize_t    writesize;
+    size_t    writesize;
 
     switch (pid=fork()) {
     case -1:
@@ -1584,8 +1797,8 @@ writebuf(
 
     case 0:                            /* this is the child */
        close(as->repfd);
-       writesize = fullwrite(as->reqfd, bufp, size);
-       exit(writesize != (ssize_t)size);
+       writesize = full_write(as->reqfd, bufp, size);
+       exit(writesize != size);
        /* NOTREACHED */
     }
     return -1;