lintian doesn't like orphan packages with uploaders...
[debian/amanda] / amandad-src / amandad.c
index ca3bde92305d14e36f4ecbaa8e4fff7a606a733e..d864c3fa1bc7ca511ac93374fbccf9004a8d87f6 100644 (file)
@@ -1,6 +1,7 @@
 /*
  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
  * Copyright (c) 1991-1999 University of Maryland at College Park
+ * Copyright (c) 2007-2012 Zmanda, Inc.  All Rights Reserved.
  * All Rights Reserved.
  *
  * Permission to use, copy, modify, distribute, and sell this software and its
@@ -39,7 +40,6 @@
 #include "amfeatures.h"
 #include "packet.h"
 #include "version.h"
-#include "queue.h"
 #include "security.h"
 #include "stream.h"
 #include "util.h"
@@ -47,6 +47,7 @@
 
 #define        REP_TIMEOUT     (6*60*60)       /* secs for service to reply */
 #define        ACK_TIMEOUT     10              /* XXX should be configurable */
+#define STDERR_PIPE (DATA_FD_COUNT + 1)
 
 #define amandad_debug(i, ...) do {     \
        if ((i) <= debug_amandad) {     \
@@ -67,6 +68,39 @@ typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
 struct active_service;
 typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
 
+/* string that we scan for in sendbackup's MESG stream */
+static const char info_end_str[] = "sendbackup: info end\n";
+#define INFO_END_LEN (sizeof(info_end_str)-1)
+
+/* 
+ * Here are the services that we allow.
+ * Must be in the same order as services[].
+ */
+typedef enum {
+    SERVICE_NOOP,
+    SERVICE_SENDSIZE,
+    SERVICE_SENDBACKUP,
+    SERVICE_SELFCHECK,
+    SERVICE_AMINDEXD,
+    SERVICE_AMIDXTAPED,
+    SERVICE_AMDUMPD
+} service_t;
+
+static struct services {
+    char *name;
+    int  active;
+    service_t service;
+} services[] = {
+   { "noop", 1, SERVICE_NOOP },
+   { "sendsize", 1, SERVICE_SENDSIZE },
+   { "sendbackup", 1, SERVICE_SENDBACKUP },
+   { "selfcheck", 1, SERVICE_SELFCHECK },
+   { "amindexd", 0, SERVICE_AMINDEXD },
+   { "amidxtaped", 0, SERVICE_AMIDXTAPED },
+   { "amdumpd", 0, SERVICE_AMDUMPD }
+};
+#define        NSERVICES       (int)(sizeof(services) / sizeof(services[0]))
+
 /*
  * This structure describes an active running service.
  *
@@ -76,6 +110,7 @@ typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
  * for communications with the amanda server.
  */
 struct active_service {
+    service_t service;                 /* service name */
     char *cmd;                         /* name of command we ran */
     char *arguments;                   /* arguments we sent it */
     security_handle_t *security_handle;        /* remote server */
@@ -84,13 +119,19 @@ struct active_service {
     int send_partial_reply;            /* send PREP packet */
     int reqfd;                         /* pipe to write requests */
     int repfd;                         /* pipe to read replies */
+    int errfd;                         /* pipe to read stderr */
     event_handle_t *ev_repfd;          /* read event handle for repfd */
     event_handle_t *ev_reptimeout;     /* timeout for rep data */
+    event_handle_t *ev_errfd;          /* read event handle for errfd */
     pkt_t rep_pkt;                     /* rep packet we're sending out */
+    char *errbuf;                      /* buffer to read the err into */
     char *repbuf;                      /* buffer to read the rep into */
     size_t bufsize;                    /* length of repbuf */
     size_t repbufsize;                 /* length of repbuf */
     int repretry;                      /* times we'll retry sending the rep */
+    int seen_info_end;                 /* have we seen "sendbackup info end\n"? */
+    char info_end_buf[INFO_END_LEN];   /* last few bytes read, used for scanning for info end */
+
     /*
      * General user streams to the process, and their equivalent
      * network streams.
@@ -104,36 +145,14 @@ struct active_service {
        struct active_service *as;      /* pointer back to our enclosure */
     } data[DATA_FD_COUNT];
     char databuf[NETWORK_BLOCK_BYTES]; /* buffer to relay netfd data in */
-    TAILQ_ENTRY(active_service) tq;    /* queue handle */
 };
 
-/* 
- * Here are the services that we allow.
- */
-static struct services {
-    char *name;
-    int  active;
-} services[] = {
-    { "noop", 1 },
-    { "sendsize", 1 },
-    { "sendbackup", 1 },
-    { "selfcheck", 1 },
-    { "amindexd", 0 },
-    { "amidxtaped", 0 }
-};
-#define        NSERVICES       (int)(sizeof(services) / sizeof(services[0]))
-
 /*
  * Queue of outstanding requests that we are running.
  */
-static struct {
-    TAILQ_HEAD(, active_service) tailq;
-    int qlength;
-} serviceq = {
-    TAILQ_HEAD_INITIALIZER(serviceq.tailq), 0
-};
+GSList *serviceq = NULL;
 
-static int wait_30s = 1;
+static event_handle_t *exit_event;
 static int exit_on_qlength = 1;
 static char *auth = NULL;
 static kencrypt_type amandad_kencrypt = KENCRYPT_NONE;
@@ -152,12 +171,14 @@ static action_t s_sendrep(struct active_service *, action_t, pkt_t *);
 static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
 
 static void repfd_recv(void *);
+static void process_errfd(void *cookie);
+static void errfd_recv(void *);
 static void timeout_repfd(void *);
 static void protocol_recv(void *, pkt_t *, security_status_t);
 static void process_readnetfd(void *);
 static void process_writenetfd(void *, void *, ssize_t);
 static struct active_service *service_new(security_handle_t *,
-    const char *, const char *);
+    const char *, service_t, const char *);
 static void service_delete(struct active_service *);
 static int writebuf(struct active_service *, const void *, size_t);
 static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
@@ -194,6 +215,17 @@ main(
     safe_fd(-1, 0);
     safe_cd();
 
+    /*
+     * Nexenta needs the SUN_PERSONALITY env variable to be unset, otherwise
+     * the Sun version of tar in /usr/sun/sbin/tar is called instead.
+     *
+     * On other operating systems this will have no effect.
+     */
+
+#ifdef HAVE_UNSETENV
+    unsetenv("SUN_PERSONALITY");
+#endif
+
     /*
      * When called via inetd, it is not uncommon to forget to put the
      * argv[0] value on the config line.  On some systems (e.g. Solaris)
@@ -213,14 +245,29 @@ main(
        /*NOTREACHED*/
     }
 
+    if (argc > 1 && argv && argv[1] && g_str_equal(argv[1], "--version")) {
+       printf("amandad-%s\n", VERSION);
+       return (0);
+    }
+
     /* Don't die when child closes pipe */
     signal(SIGPIPE, SIG_IGN);
 
+    /* Parse the configuration; we'll handle errors later */
     config_init(CONFIG_INIT_CLIENT, NULL);
 
-    check_running_as(RUNNING_AS_CLIENT_LOGIN);
+    if (geteuid() == 0) {
+       check_running_as(RUNNING_AS_ROOT);
+       initgroups(CLIENT_LOGIN, get_client_gid());
+       setgid(get_client_gid());
+       setegid(get_client_gid());
+       seteuid(get_client_uid());
+    } else {
+       check_running_as(RUNNING_AS_CLIENT_LOGIN);
+    }
 
-    erroutput_type = (ERR_INTERACTIVE|ERR_SYSLOG);
+    add_amanda_log_handler(amanda_log_stderr);
+    add_amanda_log_handler(amanda_log_syslog);
 
     /*
      * ad-hoc argument parsing
@@ -235,19 +282,10 @@ main(
     in = 0; out = 1;           /* default to stdin/stdout */
     have_services = 0;
     for (i = 1; i < argc; i++) {
-       /*
-        * accept -krb4 as an alias for -auth=krb4 (for compatibility)
-        */
-       if (strcmp(argv[i], "-krb4") == 0) {
-           argv[i] = "-auth=krb4";
-           /* FALLTHROUGH */
-           auth = "krb4";
-       }
-
        /*
         * Get a driver for a security type specified after -auth=
         */
-       else if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
+       if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
            argv[i] += strlen("-auth=");
            secdrv = security_getdriver(argv[i]);
            auth = argv[i];
@@ -255,6 +293,14 @@ main(
                error(_("no driver for security type '%s'\n"), argv[i]);
                 /*NOTREACHED*/
            }
+           if (strcmp(auth, "local") == 0 ||
+               strcmp(auth, "rsh") == 0 ||
+               strcmp(auth, "ssh") == 0) {
+               int i;
+               for (i=0; i < NSERVICES; i++) {
+                   services[i].active = 1;
+               }
+           }
            continue;
        }
 
@@ -290,7 +336,7 @@ main(
            }
 #ifdef USE_REUSEADDR
            r = setsockopt(in, SOL_SOCKET, SO_REUSEADDR,
-               (void *)&on, (socklen_t)sizeof(on));
+               (void *)&on, (socklen_t_equiv)sizeof(on));
            if (r < 0) {
                dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
                          strerror(errno));
@@ -306,7 +352,7 @@ main(
            sin.sin_addr.s_addr = INADDR_ANY;
            sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
 #endif
-           if (bind(in, (struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
+           if (bind(in, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
                error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
                    strerror(errno));
                /*NOTREACHED*/
@@ -322,7 +368,7 @@ main(
            struct sockaddr_in sin;
 #endif
            int sock;
-           socklen_t n;
+           socklen_t_equiv n;
 
            argv[i] += strlen("-tcp=");
 #ifdef WORKING_IPV6
@@ -336,7 +382,7 @@ main(
            }
 #ifdef USE_REUSEADDR
            r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
-               (void *)&on, (socklen_t)sizeof(on));
+               (void *)&on, (socklen_t_equiv)sizeof(on));
            if (r < 0) {
                dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
                          strerror(errno));
@@ -351,13 +397,13 @@ main(
            sin.sin_addr.s_addr = INADDR_ANY;
            sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
 #endif
-           if (bind(sock, (struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
+           if (bind(sock, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
                error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
                    strerror(errno));
                /*NOTREACHED*/
            }
            listen(sock, 10);
-           n = (socklen_t)sizeof(sin);
+           n = (socklen_t_equiv)sizeof(sin);
            in = out = accept(sock, (struct sockaddr *)&sin, &n);
        }
        /*
@@ -391,29 +437,50 @@ main(
     }
 
     /*
-     * If no security type specified, use BSD
+     * If no security type specified, use BSDTCP
      */
     if (secdrv == NULL) {
-       secdrv = security_getdriver("BSD");
-       auth = "bsd";
+       secdrv = security_getdriver("BSDTCP");
+       auth = "bsdtcp";
        if (secdrv == NULL) {
-           error(_("no driver for default security type 'BSD'\n"));
+           error(_("no driver for default security type 'BSDTCP'\n"));
            /*NOTREACHED*/
        }
     }
 
     if(strcasecmp(auth, "rsh") == 0 ||
        strcasecmp(auth, "ssh") == 0 ||
+       strcasecmp(auth, "local") == 0 ||
        strcasecmp(auth, "bsdtcp") == 0) {
-       wait_30s = 0;
        exit_on_qlength = 1;
     }
 
+#ifndef SINGLE_USERID
+    if (geteuid() == 0) {
+       if (strcasecmp(auth, "krb5") != 0) {
+           struct passwd *pwd;
+           /* lookup our local user name */
+           if ((pwd = getpwnam(CLIENT_LOGIN)) == NULL) {
+               error(_("getpwnam(%s) failed."), CLIENT_LOGIN);
+           }
+
+           if (pwd->pw_uid != 0) {
+               error(_("'amandad' must be run as user '%s' when using '%s' authentication"),
+                     CLIENT_LOGIN, auth);
+           }
+       }
+    } else {
+       if (strcasecmp(auth, "krb5") == 0) {
+           error(_("'amandad' must be run as user 'root' when using 'krb5' authentication"));
+       }
+    }
+#endif
+
     /* initialize */
 
     startclock();
 
-    dbprintf(_("version %s\n"), version());
+    dbprintf(_("version %s\n"), VERSION);
     for (i = 0; version_info[i] != NULL; i++) {
        dbprintf("    %s", version_info[i]);
     }
@@ -422,6 +489,11 @@ main(
        dbprintf(_("WARNING: argv[0] not defined: check inetd.conf\n"));
     }
 
+    /* krb5 require the euid to be 0 */
+    if (strcasecmp(auth, "krb5") == 0) {
+       seteuid((uid_t)0);
+    }
+
     /*
      * Schedule to call protocol_accept() when new security handles
      * are created on stdin.
@@ -432,8 +504,7 @@ main(
      * Schedule an event that will try to exit every 30 seconds if there
      * are no requests outstanding.
      */
-    if(wait_30s)
-       (void)event_register((event_id_t)30, EV_TIME, exit_check, &no_exit);
+    exit_event = event_register((event_id_t)30, EV_TIME, exit_check, &no_exit);
 
     /*
      * Call event_loop() with an arg of 0, telling it to block until all
@@ -463,7 +534,7 @@ exit_check(
     /*
      * If things are still running, then don't exit.
      */
-    if (serviceq.qlength > 0)
+    if (g_slist_length(serviceq) > 0)
        return;
 
     /*
@@ -472,6 +543,7 @@ exit_check(
     if (no_exit)
        return;
 
+    g_debug("timeout exit");
     dbclose();
     exit(0);
 }
@@ -486,20 +558,58 @@ protocol_accept(
     pkt_t *            pkt)
 {
     pkt_t pkt_out;
+    GSList *iter;
     struct active_service *as;
     char *pktbody, *tok, *service, *arguments;
     char *service_path = NULL;
+    GSList *errlist = NULL;
     int i;
+    char *peer_name;
 
     pkt_out.body = NULL;
 
     /*
      * If handle is NULL, then the connection is closed.
      */
-    if(handle == NULL) {
+    if (handle == NULL) {
+       if (exit_on_qlength && exit_event) {
+           /* remove the timeout, we will exit once the service terminate */
+           event_release(exit_event);
+           exit_event = NULL;
+       }
        return;
     }
 
+    /*
+     * If we have errors (not warnings) from the config file, let the remote system
+     * know immediately.  Unfortunately, we only get one ERROR line, so if there
+     * are multiple errors, we just show the first.
+     */
+    if (config_errors(&errlist) >= CFGERR_ERRORS) {
+       GSList *iter = errlist;
+       char *errmsg;
+       gboolean multiple_errors = FALSE;
+
+       if (iter) {
+           errmsg = (char *)iter->data;
+           if (iter->next)
+               multiple_errors = TRUE;
+       } else {
+           errmsg = "(no error message)";
+       }
+
+       pkt_init(&pkt_out, P_NAK, "ERROR %s%s", errmsg,
+           multiple_errors? _(" (additional errors not displayed)"):"");
+       do_sendpkt(handle, &pkt_out);
+       amfree(pkt_out.body);
+       security_close(handle);
+       return;
+    }
+
+    peer_name = security_get_authenticated_peer_name(handle);
+    g_debug("authenticated peer name is '%s'", peer_name);
+    amfree(peer_name);
+
     /*
      * If pkt is NULL, then there was a problem with the new connection.
      */
@@ -561,7 +671,7 @@ protocol_accept(
        goto send_pkt_out;
     }
 
-    service_path = vstralloc(amlibexecdir, "/", service, versionsuffix(), NULL);
+    service_path = vstralloc(amlibexecdir, "/", service, NULL);
     if (access(service_path, X_OK) < 0) {
        dbprintf(_("can't execute %s: %s\n"), service_path, strerror(errno));
            pkt_init(&pkt_out, P_NAK,
@@ -571,8 +681,8 @@ protocol_accept(
     }
 
     /* see if its already running */
-    for (as = TAILQ_FIRST(&serviceq.tailq); as != NULL;
-       as = TAILQ_NEXT(as, tq)) {
+    for (iter = serviceq; iter != NULL; iter = g_slist_next(iter)) {
+       as = (struct active_service *)iter->data;
            if (strcmp(as->cmd, service_path) == 0 &&
                strcmp(as->arguments, arguments) == 0) {
                    dbprintf(_("%s %s: already running, acking req\n"),
@@ -587,7 +697,7 @@ protocol_accept(
      * the request pipe.
      */
     dbprintf(_("creating new service: %s\n%s\n"), service, arguments);
-    as = service_new(handle, service_path, arguments);
+    as = service_new(handle, service_path, services[i].service, arguments);
     if (writebuf(as, arguments, strlen(arguments)) < 0) {
        const char *errmsg = strerror(errno);
        dbprintf(_("error sending arguments to %s: %s\n"), service, errmsg);
@@ -734,6 +844,8 @@ s_sendack(
     as->ev_repfd = event_register((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
     as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
        timeout_repfd, as);
+    as->errbuf = NULL;
+    as->ev_errfd = event_register((event_id_t)as->errfd, EV_READFD, errfd_recv, as);
     security_recvpkt(as->security_handle, protocol_recv, as, -1);
     return (A_PENDING);
 }
@@ -754,7 +866,6 @@ s_repwait(
     char     *what;
     char     *msg;
     int       code = 0;
-    int       t;
     int       pid;
     amwait_t  retstat;
 
@@ -810,14 +921,23 @@ s_repwait(
 
     /* If end of service, wait for process status */
     if (n == 0) {
-       t = 0;
        pid = waitpid(as->pid, &retstat, WNOHANG);
-       while (t<5 && pid == 0) {
-           sleep(1);
-           t++;
-           pid = waitpid(as->pid, &retstat, WNOHANG);
+       if (as->service  == SERVICE_NOOP ||
+           as->service  == SERVICE_SENDSIZE ||
+           as->service  == SERVICE_SELFCHECK) {
+           int t = 0;
+           while (t<5 && pid == 0) {
+               sleep(1);
+               t++;
+               pid = waitpid(as->pid, &retstat, WNOHANG);
+           }
        }
 
+       process_errfd(as);
+
+       if (pid == 0)
+           pid = waitpid(as->pid, &retstat, WNOHANG);
+
        if (pid > 0) {
            what = NULL;
            if (! WIFEXITED(retstat)) {
@@ -845,6 +965,7 @@ s_repwait(
                }
                strcpy(as->repbuf + as->repbufsize, msg);
                as->repbufsize += strlen(msg);
+                amfree(msg);
            }
        }
     }
@@ -1045,6 +1166,7 @@ s_ackwait(
     for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
        if (dh->netfd == NULL)
            continue;
+       dbprintf("opening security stream for fd %d\n", (int)(dh - as->data) + DATA_FD_OFFSET);
        if (security_stream_accept(dh->netfd) < 0) {
            dbprintf(_("stream %td accept failed: %s\n"),
                dh - &as->data[0], security_geterror(as->security_handle));
@@ -1052,9 +1174,18 @@ s_ackwait(
            dh->netfd = NULL;
            continue;
        }
-       /* setup an event for reads from it */
-       dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
-                                    process_readnetfd, dh);
+
+       /* setup an event for reads from it.  As a special case, don't start
+        * listening on as->data[0] until we read some data on another fd, if
+        * the service is sendbackup.  This ensures that we send a MESG or 
+        * INDEX token before any DATA tokens, as dumper assumes. This is a
+        * hack, if that wasn't already obvious! */
+       if (dh != &as->data[0] || as->service != SERVICE_SENDBACKUP) {
+           dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
+                                        process_readnetfd, dh);
+       } else {
+           amandad_debug(1, "Skipping registration of sendbackup's data FD\n");
+       }
 
        security_stream_read(dh->netfd, process_writenetfd, dh);
 
@@ -1107,6 +1238,101 @@ repfd_recv(
     state_machine(as, A_RECVREP, NULL);
 }
 
+static void
+process_errfd(
+    void *cookie)
+{
+    struct active_service *as = cookie;
+
+    /* Process errfd before sending the REP packet */
+    if (as->ev_errfd) {
+       SELECT_ARG_TYPE readset;
+       struct timeval  tv;
+       int             nfound;
+
+       memset(&tv, 0, SIZEOF(tv));
+       FD_ZERO(&readset);
+       FD_SET(as->errfd, &readset);
+       nfound = select(as->errfd+1, &readset, NULL, NULL, &tv);
+       if (nfound && FD_ISSET(as->errfd, &readset)) {
+           errfd_recv(as);
+       }
+    }
+}
+
+/*
+ * Called when a errfd has received data
+ */
+static void
+errfd_recv(
+    void *     cookie)
+{
+    struct active_service *as = cookie;
+    char  buf[32769];
+    int   n;
+    char *r;
+
+    assert(as != NULL);
+    assert(as->ev_errfd != NULL);
+
+    n = read(as->errfd, &buf, 32768);
+    /* merge buffer */
+    if (n > 0) {
+       /* Terminate it with '\0' */
+       buf[n+1] = '\0';
+
+       if (as->errbuf) {
+           as->errbuf = vstrextend(&as->errbuf, buf, NULL);
+       } else {
+           as->errbuf = stralloc(buf);
+       }
+    } else if (n == 0) {
+       event_release(as->ev_errfd);
+       as->ev_errfd = NULL;
+    } else { /* n < 0 */
+       event_release(as->ev_errfd);
+       as->ev_errfd = NULL;
+       g_snprintf(buf, 32768,
+                  "error reading stderr or service: %s\n", strerror(errno));
+    }
+
+    /* for each line terminate by '\n' */
+    while (as->errbuf != NULL  && (r = strchr(as->errbuf, '\n')) != NULL) {
+       char *s;
+
+       *r = '\0';
+       s = vstrallocf("ERROR service %s: %s\n",
+                      services[as->service].name, as->errbuf);
+
+       /* Add to repbuf, error message will be in the REP packet if it
+        * is not already sent
+        */
+       n = strlen(s);
+       if (as->bufsize == 0) {
+           as->bufsize = NETWORK_BLOCK_BYTES;
+           as->repbuf = alloc(as->bufsize);
+       }
+       while (as->bufsize < as->repbufsize + n) {
+           char *repbuf_temp;
+           as->bufsize *= 2;
+           repbuf_temp = alloc(as->bufsize);
+           memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
+           amfree(as->repbuf);
+           as->repbuf = repbuf_temp;
+       }
+       memcpy(as->repbuf + as->repbufsize, s, n);
+       as->repbufsize += n;
+
+       dbprintf("%s", s);
+
+       /* remove first line from buffer */
+       r++;
+       s = stralloc(r);
+       amfree(as->errbuf);
+       as->errbuf = s;
+    }
+}
+
 /*
  * Called when a repfd has timed out
  */
@@ -1197,6 +1423,35 @@ process_readnetfd(
        service_delete(as);
        return;
     }
+
+    /* Handle the special case of recognizing "sendbackup info end"
+     * from sendbackup's MESG fd */
+    if (as->service == SERVICE_SENDBACKUP && !as->seen_info_end && dh == &as->data[1]) {
+       /* make a buffer containing the combined data from info_end_buf
+        * and what we've read this time, and search it for info_end_strj
+        * This includes a NULL byte for strstr's sanity. */
+       char *combined_buf = malloc(INFO_END_LEN + n + 1);
+       memcpy(combined_buf, as->info_end_buf, INFO_END_LEN);
+       memcpy(combined_buf+INFO_END_LEN, as->databuf, n);
+       combined_buf[INFO_END_LEN+n] = '\0';
+
+       as->seen_info_end = (strstr(combined_buf, info_end_str) != NULL);
+
+       /* fill info_end_buf from the tail end of combined_buf */
+       memcpy(as->info_end_buf, combined_buf + n, INFO_END_LEN);
+       amfree(combined_buf);
+
+       /* if we did see info_end_str, start reading the data fd (fd 0) */
+       if (as->seen_info_end) {
+           struct datafd_handle *dh = &as->data[0];
+           amandad_debug(1, "Opening datafd to sendbackup (delayed until sendbackup sent header info)\n");
+           dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
+                                        process_readnetfd, dh);
+       } else {
+           amandad_debug(1, "sendbackup header info still not complete\n");
+       }
+    }
+
     if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
        /* stream has croaked */
        pkt_init(&nak, P_NAK, _("ERROR write error on stream %d: %s\n"),
@@ -1230,10 +1485,8 @@ process_writenetfd(
     if (dh->fd_write <= 0) {
        dbprintf(_("process_writenetfd: dh->fd_write <= 0\n"));
     } else if (size > 0) {
-       fullwrite(dh->fd_write, buf, (size_t)size);
-       security_stream_read(dh->netfd, process_writenetfd, dh);
-    }
-    else {
+       full_write(dh->fd_write, buf, (size_t)size);
+    } else {
        aclose(dh->fd_write);
     }
 }
@@ -1252,6 +1505,9 @@ allocstream(
 {
     struct datafd_handle *dh;
 
+    /* note that handle is in the range DATA_FD_OFFSET to DATA_FD_COUNT, but
+     * it is NOT a file descriptor! */
+
     /* if the handle is -1, then we don't bother */
     if (handle < 0)
        return (-1);
@@ -1289,20 +1545,30 @@ static struct active_service *
 service_new(
     security_handle_t *        security_handle,
     const char *       cmd,
+    service_t          service,
     const char *       arguments)
 {
     int i;
-    int data_read[DATA_FD_COUNT + 1][2];
-    int data_write[DATA_FD_COUNT + 1][2];
+    int data_read[DATA_FD_COUNT + 2][2];
+    int data_write[DATA_FD_COUNT + 2][2];
     struct active_service *as;
     pid_t pid;
     int newfd;
+    char *peer_name;
+    char *amanda_remote_host_env[2];
 
     assert(security_handle != NULL);
     assert(cmd != NULL);
     assert(arguments != NULL);
 
     /* a plethora of pipes */
+    /* data_read[0]                : stdin
+     * data_write[0]               : stdout
+     * data_read[1], data_write[1] : first  stream
+     * data_read[2], data_write[2] : second stream
+     * data_read[3], data_write[3] : third stream
+     * data_write[4]               : stderr
+     */
     for (i = 0; i < DATA_FD_COUNT + 1; i++) {
        if (pipe(data_read[i]) < 0) {
            error(_("pipe: %s\n"), strerror(errno));
@@ -1313,6 +1579,10 @@ service_new(
            /*NOTREACHED*/
        }
     }
+    if (pipe(data_write[STDERR_PIPE]) < 0) {
+       error(_("pipe: %s\n"), strerror(errno));
+       /*NOTREACHED*/
+    }
 
     switch(pid = fork()) {
     case -1:
@@ -1322,14 +1592,18 @@ service_new(
        /*
         * The parent.  Close the far ends of our pipes and return.
         */
-       as = alloc(SIZEOF(*as));
+       as = g_new0(struct active_service, 1);
        as->cmd = stralloc(cmd);
        as->arguments = stralloc(arguments);
        as->security_handle = security_handle;
        as->state = NULL;
+       as->service = service;
        as->pid = pid;
        as->send_partial_reply = 0;
-       if(strcmp(cmd+(strlen(cmd)-8), "sendsize") == 0) {
+       as->seen_info_end = FALSE;
+       /* fill in info_end_buf with non-null characters */
+       memset(as->info_end_buf, '-', sizeof(as->info_end_buf));
+       if(service == SERVICE_SENDSIZE) {
            g_option_t *g_options;
            char *option_str, *p;
 
@@ -1361,6 +1635,14 @@ service_new(
        as->repretry = 0;
        as->rep_pkt.body = NULL;
 
+       /*
+        * read from the stderr pipe
+        */
+       as->errfd = data_write[STDERR_PIPE][0];
+       aclose(data_write[STDERR_PIPE][1]);
+       as->ev_errfd = NULL;
+       as->errbuf = NULL;
+
        /*
         * read from the rest of the general-use pipes
         * (netfds are opened as the client requests them)
@@ -1378,8 +1660,7 @@ service_new(
 
        /* add it to the service queue */
        /* increment the active service count */
-       TAILQ_INSERT_TAIL(&serviceq.tailq, as, tq);
-       serviceq.qlength++;
+       serviceq = g_slist_append(serviceq, (gpointer)as);
 
        return (as);
     case 0:
@@ -1388,6 +1669,16 @@ service_new(
         * and start up.
         */
 
+       /* set up the AMANDA_AUTHENTICATED_PEER env var so child services
+        * can use it to authenticate */
+       peer_name = security_get_authenticated_peer_name(security_handle);
+       amanda_remote_host_env[0] = NULL;
+       amanda_remote_host_env[1] = NULL;
+       if (*peer_name) {
+           amanda_remote_host_env[0] =
+               g_strdup_printf("AMANDA_AUTHENTICATED_PEER=%s", peer_name);
+       }
+
        /*
         * The data stream is stdin in the new process
         */
@@ -1434,6 +1725,21 @@ service_new(
                data_write[i + 1][0] = newfd;
            }
        }
+       while(data_write[4][0] >= DATA_FD_OFFSET &&
+             data_write[4][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
+           newfd = dup(data_write[4][0]);
+           if (newfd == -1)
+               error(_("Can't dup out off DATA_FD range"));
+           data_write[4][0] = newfd;
+       }
+       while(data_write[4][1] >= DATA_FD_OFFSET &&
+             data_write[4][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
+           newfd = dup(data_write[4][1]);
+           if (newfd == -1)
+               error(_("Can't dup out off DATA_FD range"));
+           data_write[4][1] = newfd;
+       }
+
        for (i = 0; i < DATA_FD_COUNT*2; i++)
            close(DATA_FD_OFFSET + i);
 
@@ -1457,10 +1763,12 @@ service_new(
 
        /* close all unneeded fd */
        close(STDERR_FILENO);
-       debug_dup_stderr_to_debug();
+       dup2(data_write[STDERR_PIPE][1], 2);
+        aclose(data_write[STDERR_PIPE][0]);
+        aclose(data_write[STDERR_PIPE][1]);
        safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
 
-       execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env());
+       execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env_full(amanda_remote_host_env));
        error(_("could not exec service %s: %s\n"), cmd, strerror(errno));
        /*NOTREACHED*/
     }
@@ -1475,6 +1783,8 @@ service_delete(
     struct active_service *    as)
 {
     int i;
+    int   count;
+    pid_t pid;
     struct datafd_handle *dh;
 
     amandad_debug(1, _("closing service: %s\n"),
@@ -1492,11 +1802,17 @@ service_delete(
        aclose(as->reqfd);
     if (as->repfd != -1)
        aclose(as->repfd);
+    if (as->errfd != -1) {
+       process_errfd(as);
+       aclose(as->errfd);
+    }
 
     if (as->ev_repfd != NULL)
        event_release(as->ev_repfd);
     if (as->ev_reptimeout != NULL)
        event_release(as->ev_reptimeout);
+    if (as->ev_errfd != NULL)
+       event_release(as->ev_errfd);
 
     for (i = 0; i < DATA_FD_COUNT; i++) {
        dh = &as->data[i];
@@ -1516,13 +1832,25 @@ service_delete(
     if (as->security_handle != NULL)
        security_close(as->security_handle);
 
+    /* try to kill the process; if this fails, then it's already dead and
+     * likely some of the other zombie cleanup ate its brains, so we don't
+     * bother to waitpid for it */
     assert(as->pid > 0);
-    kill(as->pid, SIGTERM);
-    waitpid(as->pid, NULL, WNOHANG);
+    pid = waitpid(as->pid, NULL, WNOHANG);
+    if (pid != as->pid && kill(as->pid, SIGTERM) == 0) {
+       pid = waitpid(as->pid, NULL, WNOHANG);
+       count = 5;
+       while (pid != as->pid && count > 0) {
+           count--;
+           sleep(1);
+           pid = waitpid(as->pid, NULL, WNOHANG);
+       }
+       if (pid != as->pid) {
+           g_debug("Process %d failed to exit", (int)as->pid);
+       }
+    }
 
-    TAILQ_REMOVE(&serviceq.tailq, as, tq);
-    assert(serviceq.qlength > 0);
-    serviceq.qlength--;
+    serviceq = g_slist_remove(serviceq, (gpointer)as);
 
     amfree(as->cmd);
     amfree(as->arguments);
@@ -1530,7 +1858,7 @@ service_delete(
     amfree(as->rep_pkt.body);
     amfree(as);
 
-    if(exit_on_qlength == 0 && serviceq.qlength == 0) {
+    if(exit_on_qlength == 0 && g_slist_length(serviceq) == 0) {
        dbclose();
        exit(0);
     }
@@ -1547,7 +1875,7 @@ writebuf(
     size_t                     size)
 {
     pid_t pid;
-    ssize_t    writesize;
+    size_t    writesize;
 
     switch (pid=fork()) {
     case -1:
@@ -1559,8 +1887,8 @@ writebuf(
 
     case 0:                            /* this is the child */
        close(as->repfd);
-       writesize = fullwrite(as->reqfd, bufp, size);
-       exit(writesize != (ssize_t)size);
+       writesize = full_write(as->reqfd, bufp, size);
+       exit(writesize != size);
        /* NOTREACHED */
     }
     return -1;