Imported Upstream version 2.5.0
[debian/amanda] / client-src / amandad.c
index 2e1e052c18442f602c854396346cf946bdd324e1..7ba39a4e208c8f354ff4a2cdec1c361a14364170 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 1991-1998 University of Maryland at College Park
+ * Copyright (c) 1991-1999 University of Maryland at College Park
  * All Rights Reserved.
  *
  * Permission to use, copy, modify, distribute, and sell this software and its
  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
  *
- * Author: James da Silva, Systems Design and Analysis Group
- *                        Computer Science Department
- *                        University of Maryland at College Park
+ * Authors: the Amanda Development Team.  Its members are listed in a
+ * file named AUTHORS, in the root directory of this distribution.
  */
+
 /*
- * $Id: amandad.c,v 1.32.2.4.4.1.2.6.2.2 2005/09/20 21:31:52 jrjackson Exp $
+ * $Id: amandad.c,v 1.62 2006/03/09 16:51:41 martinea Exp $
  *
  * handle client-host side of Amanda network communications, including
  * security checks, execution of the proper service, and acking the
  * master side
  */
 
+/*#define      AMANDAD_DEBUG*/
+
 #include "amanda.h"
+#include "amandad.h"
 #include "clock.h"
-#include "dgram.h"
+#include "event.h"
 #include "amfeatures.h"
+#include "packet.h"
 #include "version.h"
-#include "protocol.h"
+#include "queue.h"
+#include "security.h"
+#include "stream.h"
 #include "util.h"
 #include "client_util.h"
 
-#define RECV_TIMEOUT 30
-#define ACK_TIMEOUT  10                /* XXX should be configurable */
-#define MAX_RETRIES   5
+#define        REP_TIMEOUT     (6*60*60)       /* secs for service to reply */
+#define        ACK_TIMEOUT     10              /* XXX should be configurable */
+#define        MAX_REP_RETRIES 5
+
+/*
+ * These are the actions for entering the state machine
+ */
+typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
+    A_SENDNAK, A_TIMEOUT } action_t;
+
+/*
+ * This is a state in the state machine.  It is a function pointer to
+ * the function that actually implements the state.
+ */
+struct active_service;
+typedef action_t (*state_t) P((struct active_service *, action_t, pkt_t *));
+
+/*
+ * This structure describes an active running service.
+ *
+ * An active service is something running that we have received
+ * a request for.  This structure holds info on that service, including
+ * file descriptors for data, etc, as well as the security handle
+ * for communications with the amanda server.
+ */
+struct active_service {
+    char *cmd;                         /* name of command we ran */
+    char *arguments;                   /* arguments we sent it */
+    security_handle_t *security_handle;        /* remote server */
+    state_t state;                     /* how far this has progressed */
+    pid_t pid;                         /* pid of subprocess */
+    int send_partial_reply;            /* send PREP packet */
+    int reqfd;                         /* pipe to write requests */
+    int repfd;                         /* pipe to read replies */
+    event_handle_t *ev_repfd;          /* read event handle for repfd */
+    event_handle_t *ev_reptimeout;     /* timeout for rep data */
+    pkt_t rep_pkt;                     /* rep packet we're sending out */
+    char repbuf[MAX_PACKET];           /* buffer to read the rep into */
+    int repbufsize;                    /* length of repbuf */
+    int repretry;                      /* times we'll retry sending the rep */
+    /*
+     * General user streams to the process, and their equivalent
+     * network streams.
+     */
+    struct datafd_handle {
+       int fd;                         /* pipe to child process */
+       event_handle_t *ev_handle;      /* it's read event handle */
+       security_stream_t *netfd;       /* stream to amanda server */
+       struct active_service *as;      /* pointer back to our enclosure */
+    } data[DATA_FD_COUNT];
+    char databuf[NETWORK_BLOCK_BYTES]; /* buffer to relay netfd data in */
+    TAILQ_ENTRY(active_service) tq;    /* queue handle */
+};
 
 /* 
- * Here are the services that we understand.
+ * Here are the services that we allow.
  */
-struct service_s {
-    char *name;
-    int flags;
-#      define NONE             0
-#      define IS_INTERNAL      1       /* service is internal */
-#      define NEED_KEYPIPE     2       /* pass kerberos key in pipe */
-#      define NO_AUTH          4       /* doesn't need authentication */
-} service_table[] = {
-    { "noop",          IS_INTERNAL },
-    { "sendsize",      NONE },
-    { "sendbackup",    NEED_KEYPIPE },
-    { "sendfsinfo",    NONE },
-    { "selfcheck",     NONE },
-    { NULL, NONE }
+static const char *services[] = {
+    "noop",
+    "sendsize",
+    "sendbackup",
+    "selfcheck",
 };
+#define        NSERVICES       (sizeof(services) / sizeof(services[0]))
 
+/*
+ * Queue of outstanding requests that we are running.
+ */
+static struct {
+    TAILQ_HEAD(, active_service) tailq;
+    int qlength;
+} serviceq = {
+    TAILQ_HEAD_INITIALIZER(serviceq.tailq), 0
+};
+
+/*
+ * Data for dbmalloc to check for memory leaks
+ */
+#ifdef USE_DBMALLOC
+static struct {
+    struct {
+       unsigned long size, hist;
+    } start, end;
+} dbmalloc_info;
+#endif
 
-int max_retry_count = MAX_RETRIES;
 int ack_timeout     = ACK_TIMEOUT;
 
-#ifdef KRB4_SECURITY
-#  include "amandad-krb4.c"
+int main P((int argc, char **argv));
+
+static int allocstream P((struct active_service *, int));
+static void exit_check P((void *));
+static void protocol_accept P((security_handle_t *, pkt_t *));
+static void state_machine P((struct active_service *, action_t, pkt_t *));
+
+static action_t s_sendack P((struct active_service *, action_t, pkt_t *));
+static action_t s_repwait P((struct active_service *, action_t, pkt_t *));
+static action_t s_processrep P((struct active_service *, action_t, pkt_t *));
+static action_t s_sendrep P((struct active_service *, action_t, pkt_t *));
+static action_t s_ackwait P((struct active_service *, action_t, pkt_t *));
+
+static void repfd_recv P((void *));
+static void timeout_repfd P((void *));
+static void protocol_recv P((void *, pkt_t *, security_status_t));
+static void process_netfd P((void *));
+static struct active_service *service_new P((security_handle_t *,
+    const char *, const char *));
+static void service_delete P((struct active_service *));
+static int writebuf P((struct active_service *, const void *, size_t));
+static int do_sendpkt P((security_handle_t *handle, pkt_t *pkt));
+
+#ifdef AMANDAD_DEBUG
+static const char *state2str P((state_t));
+static const char *action2str P((action_t));
 #endif
 
-/* local functions */
-int main P((int argc, char **argv));
-void sendack P((pkt_t *hdr, pkt_t *msg));
-void sendnak P((pkt_t *hdr, pkt_t *msg, char *str));
-void setup_rep P((pkt_t *hdr, pkt_t *msg, int partial_rep));
-char *strlower P((char *str));
-
-int main(argc, argv)
-int argc;
-char **argv;
+int
+main(argc, argv)
+    int argc;
+    char **argv;
 {
-    int n;
-    char *errstr = NULL;
-    unsigned long malloc_hist_1, malloc_size_1;
-    unsigned long malloc_hist_2, malloc_size_2;
+    int i, in, out;
+    const security_driver_t *secdrv;
+    int no_exit = 0;
     char *pgm = "amandad";             /* in case argv[0] is not set */
 
-    /* in_msg: The first incoming request.
-       dup_msg: Any other incoming message.
-       out_msg: Standard, i.e. non-repeated, ACK and REP.
-       rej_msg: Any other outgoing message.
-     */
-    pkt_t in_msg, out_msg, out_pmsg, rej_msg, dup_msg;
-    char *cmd = NULL, *base = NULL;
-    char *noop_file = NULL;
-    char **vp;
-    char *s;
-    ssize_t s_len;
-    int retry_count, rc, reqlen;
-    int req_pipe[2], rep_pipe[2];
-    int dglen = 0;
-    char number[NUM_STR_SIZE];
-    am_feature_t *our_features = NULL;
-    char *our_feature_string = NULL;
-    int send_partial_reply = 0;
-
-    struct service_s *servp;
-    fd_set insock;
-
     safe_fd(-1, 0);
     safe_cd();
 
+    if(argv == NULL) {
+       error("argv == NULL\n");
+    }
+
     /*
      * When called via inetd, it is not uncommon to forget to put the
      * argv[0] value on the config line.  On some systems (e.g. Solaris)
@@ -130,41 +196,130 @@ char **argv;
 
     set_pname(pgm);
 
-    malloc_size_1 = malloc_inuse(&malloc_hist_1);
+    /* Don't die when child closes pipe */
+    signal(SIGPIPE, SIG_IGN);
+
+#ifdef USE_DBMALLOC
+    dbmalloc_info.start.size = malloc_inuse(&dbmalloc_info.start.hist);
+#endif
 
     erroutput_type = (ERR_INTERACTIVE|ERR_SYSLOG);
 
 #ifdef FORCE_USERID
-
     /* we'd rather not run as root */
-    if(geteuid() == 0) {
-#ifdef KRB4_SECURITY
-        if(client_uid == (uid_t) -1) {
+    if (geteuid() == 0) {
+       if(client_uid == (uid_t) -1) {
            error("error [cannot find user %s in passwd file]\n", CLIENT_LOGIN);
        }
-
-        /*
-        * if we're using kerberos security, we'll need to be root in
-        * order to get at the machine's srvtab entry, so we hang on to
-        * some root privledges for now.  We give them up entirely later.
-        */
-       setegid(client_gid);
-       seteuid(client_uid);
-#else
        initgroups(CLIENT_LOGIN, client_gid);
        setgid(client_gid);
-       setuid(client_uid);
-#endif  /* KRB4_SECURITY */
+       setegid(client_gid);
+       seteuid(client_uid);
     }
 #endif /* FORCE_USERID */
 
+    /*
+     * ad-hoc argument parsing
+     *
+     * We accept       -auth=[authentication type]
+     *                 -no-exit
+#ifdef AMANDAD_DEBUG
+     *                 -tcp=[port]
+     *                 -udp=[port]
+#endif
+     */
+    secdrv = NULL;
+    in = 0; out = 1;           /* default to stdin/stdout */
+    for (i = 1; i < argc; i++) {
+       /*
+        * accept -krb4 as an alias for -auth=krb4 (for compatibility)
+        */
+       if (strcmp(argv[i], "-krb4") == 0) {
+           argv[i] = "-auth=krb4";
+           /* FALLTHROUGH */
+       }
+
+       /*
+        * Get a driver for a security type specified after -auth=
+        */
+       if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
+           argv[i] += strlen("-auth=");
+           secdrv = security_getdriver(argv[i]);
+           if (secdrv == NULL)
+               error("no driver for security type '%s'", argv[i]);
+           continue;
+       }
+
+       /*
+        * If -no-exit is specified, always run even after requests have
+        * been satisfied.
+        */
+       if (strcmp(argv[i], "-no-exit") == 0) {
+           no_exit = 1;
+           continue;
+       }
+
+#ifdef AMANDAD_DEBUG
+       /*
+        * Allow us to directly bind to a udp port for debugging.
+        * This may only apply to some security types.
+        */
+       if (strncmp(argv[i], "-udp=", strlen("-udp=")) == 0) {
+           struct sockaddr_in sin;
+
+           argv[i] += strlen("-udp=");
+           in = out = socket(AF_INET, SOCK_DGRAM, 0);
+           if (in < 0)
+               error("can't create dgram socket: %s\n", strerror(errno));
+           sin.sin_family = AF_INET;
+           sin.sin_addr.s_addr = INADDR_ANY;
+           sin.sin_port = htons(atoi(argv[i]));
+           if (bind(in, (struct sockaddr *)&sin, sizeof(sin)) < 0)
+               error("can't bind to port %d: %s\n", atoi(argv[i]),
+                   strerror(errno));
+       }
+       /*
+        * Ditto for tcp ports.
+        */
+       if (strncmp(argv[i], "-tcp=", strlen("-tcp=")) == 0) {
+           struct sockaddr_in sin;
+           int sock, n;
+
+           argv[i] += strlen("-tcp=");
+           sock = socket(AF_INET, SOCK_STREAM, 0);
+           if (sock < 0)
+               error("can't create tcp socket: %s\n", strerror(errno));
+           n = 1;
+           setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&n, sizeof(n));
+           sin.sin_family = AF_INET;
+           sin.sin_addr.s_addr = INADDR_ANY;
+           sin.sin_port = htons(atoi(argv[i]));
+           if (bind(sock, (struct sockaddr *)&sin, sizeof(sin)) < 0)
+               error("can't bind to port %d: %s\n", atoi(argv[i]),
+                   strerror(errno));
+           listen(sock, 10);
+           n = sizeof(sin);
+           in = out = accept(sock, (struct sockaddr *)&sin, &n);
+       }
+#endif
+    }
+
+    /*
+     * If no security type specified, use BSD
+     */
+    if (secdrv == NULL) {
+       secdrv = security_getdriver("BSD");
+       if (secdrv == NULL)
+           error("no driver for default security type 'BSD'");
+    }
+
     /* initialize */
 
     dbopen();
     {
+       /* this lameness is for error() */
        int db_fd = dbfd();
        if(db_fd != -1) {
-           dup2(db_fd, 1);
            dup2(db_fd, 2);
        }
     }
@@ -172,521 +327,1063 @@ char **argv;
     startclock();
 
     dbprintf(("%s: version %s\n", get_pname(), version()));
-    for(vp = version_info; *vp != NULL; vp++)
-       dbprintf(("%s: %s", debug_prefix(NULL), *vp));
+    for (i = 0; version_info[i] != NULL; i++) {
+       dbprintf(("%s: %s", debug_prefix(NULL), version_info[i]));
+    }
 
     if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
        dbprintf(("%s: WARNING: argv[0] not defined: check inetd.conf\n",
                  debug_prefix(NULL)));
     }
 
-    our_features = am_init_feature_set();
-    our_feature_string = am_feature_to_string(our_features);
+    /*
+     * Schedule to call protocol_accept() when new security handles
+     * are created on stdin.
+     */
+    security_accept(secdrv, in, out, protocol_accept);
+
+    /*
+     * Schedule an event that will try to exit every 30 seconds if there
+     * are no requests outstanding.
+     */
+    (void)event_register(30, EV_TIME, exit_check, &no_exit);
 
-    dgram_zero(&in_msg.dgram); 
-    dgram_socket(&in_msg.dgram, 0);
+    /*
+     * Call event_loop() with an arg of 0, telling it to block until all
+     * events are completed.
+     */
+    event_loop(0);
 
-    dgram_zero(&dup_msg.dgram);
-    dgram_socket(&dup_msg.dgram, 0);
+    /* NOTREACHED */
+    exit(1);   /* appease gcc/lint */
+}
 
-    dgram_zero(&out_msg.dgram);
-    dgram_socket(&out_msg.dgram, 0);
+/*
+ * This runs periodically and checks to see if we have any active services
+ * still running.  If we don't, then we quit.
+ */
+static void
+exit_check(cookie)
+    void *cookie;
+{
+    int no_exit;
 
-    dgram_zero(&out_pmsg.dgram);
-    dgram_socket(&out_pmsg.dgram, 0);
+    assert(cookie != NULL);
+    no_exit = *(int *)cookie;
 
-    dgram_zero(&rej_msg.dgram);
-    dgram_socket(&rej_msg.dgram, 0);
+    /*
+     * If things are still running, then don't exit.
+     */
+    if (serviceq.qlength > 0)
+       return;
 
-    dgram_zero(&rej_msg.dgram);
-    dgram_socket(&rej_msg.dgram, 0);
+    /*
+     * If the caller asked us to never exit, then we're done
+     */
+    if (no_exit)
+       return;
 
-    /* set up input and response pipes */
+#ifdef USE_DBMALLOC
+    dbmalloc_info.end.size = malloc_inuse(&dbmalloc_info.end.hist);
 
-#ifdef KRB4_SECURITY
-    if(argc >= 2 && strcmp(argv[1], "-krb4") == 0) {
-       krb4_auth = 1;
-       dbprintf(("%s: using krb4 security\n", debug_prefix(NULL)));
-    }
-    else {
-       dbprintf(("%s: using bsd security\n", debug_prefix(NULL)));
-       krb4_auth = 0;
+    if (dbmalloc_info.start.size != dbmalloc_info.end.size) {
+       malloc_list(dbfd(), dbmalloc_info.start.hist,
+           dbmalloc_info.end.hist);
     }
 #endif
 
-    /* get request packet and attempt to parse it */
+    dbclose();
+    exit(0);
+}
 
-    if((n = dgram_recv(&in_msg.dgram, RECV_TIMEOUT, &in_msg.peer)) <= 0) {
-       char *s;
+/*
+ * Handles new incoming protocol handles.  This is a callback for
+ * security_accept(), which gets called when new handles are detected.
+ */
+static void
+protocol_accept(handle, pkt)
+    security_handle_t *handle;
+    pkt_t *pkt;
+{
+    pkt_t pkt_out;
+    struct active_service *as;
+    char *pktbody, *tok, *service, *arguments;
+    int i;
 
-       if (n == 0) {
-           s = "timeout";
-       } else {
-           s = strerror(errno);
-       }
-       error("error receiving message: %s", s);
-    }
-
-    dbprintf(("%s: got packet:\n--------\n%s--------\n\n",
-             debug_prefix_time(NULL), in_msg.dgram.cur));
-
-    parse_pkt_header(&in_msg);
-    if(in_msg.type != P_REQ && in_msg.type != P_NAK && in_msg.type != P_ACK) {
-       /* XXX */
-       dbprintf(("%s: this is a %s packet, nak'ing it\n", 
-                 debug_prefix_time(NULL),
-                 in_msg.type == P_BOGUS? "bogus" : "unexpected"));
-       if(in_msg.type != P_BOGUS) {
-           parse_errmsg = newvstralloc(parse_errmsg,"unexpected ",
-               in_msg.type == P_ACK? "ack ":
-               in_msg.type == P_REP? "rep ": "",
-               "packet", NULL);
-       }
-       sendnak(&in_msg, &rej_msg, parse_errmsg);
-       dbclose();
-       return 1;
+    /*
+     * If pkt is NULL, then there was a problem with the new connection.
+     */
+    if (pkt == NULL) {
+       dbprintf(("%s: accept error: %s\n",
+           debug_prefix_time(NULL), security_geterror(handle)));
+       pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
+       do_sendpkt(handle, &pkt_out);
+       security_close(handle);
+       return;
     }
-    if(in_msg.type != P_REQ) {
-       dbprintf(("%s: strange, this is not a request packet\n",
-                 debug_prefix_time(NULL)));
-       dbclose();
-       return 1;
+
+    dbprintf(("%s: accept recv %s pkt:\n<<<<<\n%s>>>>>\n",
+       debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
+
+    /*
+     * If this is not a REQ packet, just forget about it.
+     */
+    if (pkt->type != P_REQ) {
+       dbprintf(("%s: received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n",
+           debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
+       security_close(handle);
+       return;
     }
 
-    /* lookup service */
+    pktbody = service = arguments = NULL;
+    as = NULL;
+
+    /*
+     * Parse out the service and arguments
+     */
+
+    pktbody = stralloc(pkt->body);
 
-    for(servp = service_table; servp->name != NULL; servp++)
-       if(strcmp(servp->name, in_msg.service) == 0) break;
+    tok = strtok(pktbody, " ");
+    if (tok == NULL)
+       goto badreq;
+    if (strcmp(tok, "SERVICE") != 0)
+       goto badreq;
 
-    if(servp->name == NULL) {
-       errstr = newstralloc2(errstr, "unknown service: ", in_msg.service);
-       sendnak(&in_msg, &rej_msg, errstr);
-       dbclose();
-       return 1;
-    }
+    tok = strtok(NULL, " \n");
+    if (tok == NULL)
+       goto badreq;
+    service = stralloc(tok);
 
-    if((servp->flags & IS_INTERNAL) != 0) {
-       cmd = stralloc(servp->name);
-    } else {
-       base = newstralloc(base, servp->name);
-       cmd = newvstralloc(cmd, libexecdir, "/", base, versionsuffix(), NULL);
-
-       if(access(cmd, X_OK) == -1) {
-           dbprintf(("%s: execute access to \"%s\" denied\n",
-                     debug_prefix_time(NULL), cmd));
-           errstr = newvstralloc(errstr,
-                                 "service ", base, " unavailable",
-                                 NULL);
-           amfree(base);
-           sendnak(&in_msg, &rej_msg, errstr);
-           dbclose();
-           return 1;
-       }
-       amfree(base);
+    /* we call everything else 'arguments' */
+    tok = strtok(NULL, "");
+    if (tok == NULL)
+       goto badreq;
+    arguments = stralloc(tok);
+
+    /* see if it's one we allow */
+    for (i = 0; i < NSERVICES; i++)
+       if (strcmp(services[i], service) == 0)
+           break;
+    if (i == NSERVICES) {
+       dbprintf(("%s: %s: invalid service\n",
+           debug_prefix_time(NULL), service));
+       pkt_init(&pkt_out, P_NAK, "ERROR %s: invalid service\n", service);
+       goto send_pkt_out;
     }
 
-    /* everything looks ok initially, send ACK */
+    service = newvstralloc(service,
+                      libexecdir, "/", service, versionsuffix(),
+                      NULL);
+    if (access(service, X_OK) < 0) {
+       dbprintf(("%s: can't execute %s: %s\n",
+           debug_prefix_time(NULL), service, strerror(errno)));
+           pkt_init(&pkt_out, P_NAK, "ERROR execute access to \"%s\" denied\n",
+           service);
+       goto send_pkt_out;
+    }
 
-    sendack(&in_msg, &out_msg);
+    /* see if its already running */
+    for (as = TAILQ_FIRST(&serviceq.tailq); as != NULL;
+       as = TAILQ_NEXT(as, tq)) {
+           if (strcmp(as->cmd, service) == 0 &&
+               strcmp(as->arguments, arguments) == 0) {
+                   dbprintf(("%s: %s %s: already running, acking req\n",
+                       debug_prefix_time(NULL), service, arguments));
+                   pkt_init(&pkt_out, P_ACK, "");
+                   goto send_pkt_out;
+           }
+    }
 
-    /* 
-     * handle security check: this could take a long time, so it is 
-     * done after the initial ack.
+    /*
+     * create a new service instance, and send the arguments down
+     * the request pipe.
      */
+    dbprintf(("%s: creating new service: %s\n%s\n",
+       debug_prefix_time(NULL), service, arguments));
+    as = service_new(handle, service, arguments);
+    if (writebuf(as, arguments, strlen(arguments)) < 0) {
+       const char *errmsg = strerror(errno);
+       dbprintf(("%s: error sending arguments to %s: %s\n",
+           debug_prefix_time(NULL), service, errmsg));
+       pkt_init(&pkt_out, P_NAK, "ERROR error writing arguments to %s: %s\n",
+           service, errmsg);
+       goto send_pkt_out;
+    }
+    aclose(as->reqfd);
+
+    amfree(pktbody);
+    amfree(service);
+    amfree(arguments);
 
-#if defined(KRB4_SECURITY)
     /*
-     * we need to be root to access the srvtab file, but only if we started
-     * out that way.
+     * Move to the sendack state, and start up the state
+     * machine.
      */
-    setegid(getgid());
-    seteuid(getuid());
-#endif /* KRB4_SECURITY */
+    as->state = s_sendack;
+    state_machine(as, A_START, NULL);
+    return;
+
+badreq:
+    pkt_init(&pkt_out, P_NAK, "ERROR invalid REQ\n");
+    dbprintf(("%s: received invalid %s packet:\n<<<<<\n%s>>>>>\n\n",
+       debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
+
+send_pkt_out:
+    amfree(pktbody);
+    amfree(service);
+    amfree(arguments);
+    if(as) service_delete(as);
+    do_sendpkt(handle, &pkt_out);
+    security_close(handle);
+}
+
+/*
+ * Handles incoming protocol packets.  Routes responses to the proper
+ * running service.
+ */
+static void
+state_machine(as, action, pkt)
+    struct active_service *as;
+    action_t action;
+    pkt_t *pkt;
+{
+    action_t retaction;
+    state_t curstate;
+    pkt_t nak;
+
+#ifdef AMANDAD_DEBUG
+    dbprintf(("%s: state_machine: %X entering\n",
+       debug_prefix_time(NULL), (unsigned int)as));
+#endif
+    for (;;) {
+       curstate = as->state;
+#ifdef AMANDAD_DEBUG
+       dbprintf(("%s: state_machine: %X curstate=%s action=%s\n",
+           debug_prefix_time(NULL), (unsigned int)as,
+           state2str(curstate), action2str(action)));
+#endif
+       retaction = (*curstate)(as, action, pkt);
+#ifdef AMANDAD_DEBUG
+       dbprintf(("%s: state_machine: %X curstate=%s returned %s (nextstate=%s)\n",
+           debug_prefix_time(NULL),
+           (unsigned int)as, state2str(curstate), action2str(retaction),
+           state2str(as->state)));
+#endif
+
+       switch (retaction) {
+       /*
+        * State has queued up and is now blocking on input.
+        */
+       case A_PENDING:
+#ifdef AMANDAD_DEBUG
+           dbprintf(("%s: state_machine: %X leaving (A_PENDING)\n",
+               debug_prefix_time(NULL), (unsigned int)as));
+#endif
+           return;
+
+       /*
+        * service has switched states.  Loop.
+        */
+       case A_CONTINUE:
+           break;
+
+       /*
+        * state has determined that the packet it received was bogus.
+        * Send a nak, and return.
+        */
+       case A_SENDNAK:
+           dbprintf(("%s: received unexpected %s packet\n",
+               debug_prefix_time(NULL), pkt_type2str(pkt->type)));
+           dbprintf(("<<<<<\n%s----\n\n", pkt->body));
+           pkt_init(&nak, P_NAK, "ERROR unexpected packet type %s\n",
+               pkt_type2str(pkt->type));
+           do_sendpkt(as->security_handle, &nak);
+#ifdef AMANDAD_DEBUG
+           dbprintf(("%s: state_machine: %X leaving (A_SENDNAK)\n",
+               debug_prefix_time(NULL), (unsigned int)as));
+#endif
+           return;
+
+       /*
+        * Service is done.  Remove it and finish.
+        */
+       case A_FINISH:
+           service_delete(as);
+#ifdef AMANDAD_DEBUG
+           dbprintf(("%s: state_machine: %X leaving (A_FINISH)\n",
+               debug_prefix_time(NULL), (unsigned int)as));
+#endif
+           return;
 
-    amfree(errstr);
-    if(!(servp->flags & NO_AUTH)
-       && !security_ok(&in_msg.peer, in_msg.security, in_msg.cksum, &errstr)) {
-       /* XXX log on authlog? */
-       setup_rep(&in_msg, &out_msg, 0);
-       ap_snprintf(out_msg.dgram.cur,
-                   sizeof(out_msg.dgram.data)-out_msg.dgram.len,
-                   "ERROR %s\n", errstr);
-       out_msg.dgram.len = strlen(out_msg.dgram.data);
-       goto send_response;
+       default:
+           assert(0);
+           break;
+       }
     }
+    /* NOTREACHED */
+}
+
+/*
+ * This state just sends an ack.  After that, we move to the repwait
+ * state to wait for REP data to arrive from the subprocess.
+ */
+static action_t
+s_sendack(as, action, pkt)
+    struct active_service *as;
+    action_t action;
+    pkt_t *pkt;
+{
+    pkt_t ack;
 
-#if defined(KRB4_SECURITY) && defined(FORCE_USERID)
+    pkt_init(&ack, P_ACK, "");
+    if (do_sendpkt(as->security_handle, &ack) < 0) {
+       dbprintf(("%s: error sending ACK: %s\n",
+           debug_prefix_time(NULL), security_geterror(as->security_handle)));
+       return (A_FINISH);
+    }
 
     /*
-     * we held on to a root uid earlier for accessing files; since we're
-     * done doing anything requiring root, we can completely give it up.
+     * move to the repwait state
+     * Setup a listener for data on the reply fd, but also
+     * listen for packets over the wire, as the server may
+     * poll us if we take a long time.
+     * Setup a timeout that will fire if it takes too long to
+     * receive rep data.
      */
+    as->state = s_repwait;
+    as->ev_repfd = event_register(as->repfd, EV_READFD, repfd_recv, as);
+    as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
+       timeout_repfd, as);
+    security_recvpkt(as->security_handle, protocol_recv, as, -1);
+    return (A_PENDING);
+}
 
-    if(geteuid() == 0) {
-       if(client_uid == (uid_t) -1) {
-           error("error [cannot find user %s in passwd file]\n", CLIENT_LOGIN);
+/*
+ * This is the repwait state.  We have responded to the initial REQ with
+ * an ACK, and we are now waiting for the process we spawned to pass us 
+ * data to send in a REP.
+ */
+static action_t
+s_repwait(as, action, pkt)
+    struct active_service *as;
+    action_t action;
+    pkt_t *pkt;
+{
+    int n;
+
+    /*
+     * We normally shouldn't receive any packets while waiting
+     * for our REP data, but in some cases we do.
+     */
+    if (action == A_RECVPKT) {
+       assert(pkt != NULL);
+       /*
+        * Another req for something that's running.  Just send an ACK
+        * and go back and wait for more data.
+        */
+       if (pkt->type == P_REQ) {
+           dbprintf(("%s: received dup P_REQ packet, ACKing it\n",
+               debug_prefix_time(NULL)));
+           pkt_init(&as->rep_pkt, P_ACK, "");
+           do_sendpkt(as->security_handle, &as->rep_pkt);
+           return (A_PENDING);
        }
-       initgroups(CLIENT_LOGIN, client_gid);
-       setgid(client_gid);
-       setuid(client_uid);
+       /* something unexpected.  Nak it */
+       return (A_SENDNAK);
     }
 
-#endif  /* KRB4_SECURITY && FORCE_USERID */
+    if (action == A_TIMEOUT) {
+       pkt_init(&as->rep_pkt, P_NAK, "ERROR timeout on reply pipe\n");
+       dbprintf(("%s: %s timed out waiting for REP data\n",
+           debug_prefix_time(NULL), as->cmd));
+       do_sendpkt(as->security_handle, &as->rep_pkt);
+       return (A_FINISH);
+    }
 
-    dbprintf(("%s: running service \"%s\"\n", debug_prefix_time(NULL), cmd));
+    assert(action == A_RECVREP);
 
-    if(strcmp(servp->name, "noop") == 0) {
-       ap_snprintf(number, sizeof(number), "%ld", (long)getpid());
-       noop_file = vstralloc(AMANDA_TMPDIR,
-                             "/",
-                             get_pname(),
-                             ".noop.",
-                             number,
-                             NULL);
-       rep_pipe[0] = open(noop_file, O_RDWR|O_EXCL|O_CREAT);
-       if(rep_pipe[0] < 0) {
-           error("cannot open \"%s\": %s", noop_file, strerror(errno));
+    /*
+     * If the read fails, consider the process dead, and remove it.
+     * Always save room for nul termination.
+     */
+    if (as->repbufsize + 1 >= sizeof(as->repbuf)) {
+       dbprintf(("%s: more than %d bytes in reply\n",
+           debug_prefix_time(NULL), sizeof(as->repbuf)));
+       dbprintf(("%s: reply so far:\n%s\n", debug_prefix(NULL), as->repbuf));
+       pkt_init(&as->rep_pkt, P_NAK, "ERROR more than %d bytes in reply\n",
+           sizeof(as->repbuf));
+       do_sendpkt(as->security_handle, &as->rep_pkt);
+       return (A_FINISH);
+    }
+    do {
+       n = read(as->repfd, as->repbuf + as->repbufsize,
+                sizeof(as->repbuf) - as->repbufsize - 1);
+    } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
+    if (n < 0) {
+       const char *errstr = strerror(errno);
+       dbprintf(("%s: read error on reply pipe: %s\n",
+           debug_prefix_time(NULL), errstr));
+       pkt_init(&as->rep_pkt, P_NAK, "ERROR read error on reply pipe: %s\n",
+           errstr);
+       do_sendpkt(as->security_handle, &as->rep_pkt);
+       return (A_FINISH);
+    }
+    /*
+     * If we got some data, go back and wait for more, or EOF.  Nul terminate
+     * the buffer first.
+     */
+    as->repbuf[n + as->repbufsize] = '\0';
+    if (n > 0) {
+       as->repbufsize += n;
+       if(as->send_partial_reply) {
+           pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
+           do_sendpkt(as->security_handle, &as->rep_pkt);
+           pkt_init(&as->rep_pkt, P_REP, "");
        }
-       (void)unlink(noop_file);
-       s = vstralloc("OPTIONS features=", our_feature_string, ";\n", NULL);
-       s_len = strlen(s);
-       if(fullwrite(rep_pipe[0], s, s_len) != s_len) {
-           error("cannot write %d bytes to %s", s_len, noop_file);
+       return (A_PENDING);
+    }
+
+    /*
+     * If we got 0, then we hit EOF.  Process the data and release
+     * the timeout.
+     */
+    assert(n == 0);
+
+    assert(as->ev_repfd != NULL);
+    event_release(as->ev_repfd);
+    as->ev_repfd = NULL;
+
+    assert(as->ev_reptimeout != NULL);
+    event_release(as->ev_reptimeout);
+    as->ev_reptimeout = NULL;
+
+    as->state = s_processrep;
+    aclose(as->repfd);
+    return (A_CONTINUE);
+}
+
+/*
+ * After we have read in all of the rep data, we process it and send
+ * it out as a REP packet.
+ */
+static action_t
+s_processrep(as, action, pkt)
+    struct active_service *as;
+    action_t action;
+    pkt_t *pkt;
+{
+    char *tok, *repbuf;
+
+    /*
+     * Copy the rep lines into the outgoing packet.
+     *
+     * If this line is a CONNECT, translate it
+     * Format is "CONNECT <tag> <handle> <tag> <handle> etc...
+     * Example:
+     *
+     *  CONNECT DATA 4 MESG 5 INDEX 6
+     *
+     * The tags are arbitrary.  The handles are in the DATA_FD pool.
+     * We need to map these to security streams and pass them back
+     * to the amanda server.  If the handle is -1, then we don't map.
+     */
+    repbuf = stralloc(as->repbuf);
+    pkt_init(&as->rep_pkt, P_REP, "");
+    tok = strtok(repbuf, " ");
+    if (tok == NULL)
+       goto error;
+    if (strcmp(tok, "CONNECT") == 0) {
+       char *line, *nextbuf;
+
+       /* Save the entire line */
+       line = strtok(NULL, "\n");
+       /* Save the buf following the line */
+       nextbuf = strtok(NULL, "");
+
+       if (line == NULL || nextbuf == NULL)
+           goto error;
+
+       pkt_cat(&as->rep_pkt, "CONNECT");
+
+       /* loop over the id/handle pairs */
+       for (;;) {
+           /* id */
+           tok = strtok(line, " ");
+           line = NULL;        /* keep working from line */
+           if (tok == NULL)
+               break;
+           pkt_cat(&as->rep_pkt, " %s", tok);
+
+           /* handle */
+           tok = strtok(NULL, " \n");
+           if (tok == NULL)
+               goto error;
+           /* convert the handle into something the server can process */
+           pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
        }
-       amfree(noop_file);
-       amfree(s);
-       (void)lseek(rep_pipe[0], (off_t)0, SEEK_SET);
+       pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
     } else {
-       if(strcmp(servp->name, "sendsize") == 0) {
-           if(strncmp(in_msg.dgram.cur,"OPTIONS ",8) == 0) {
-               g_option_t *g_options;
-               char *option_str, *p;
-
-               option_str = stralloc(in_msg.dgram.cur+8);
-               p = strchr(option_str,'\n');
-               if(p) *p = '\0';
-
-               g_options = parse_g_options(option_str, 0);
-               if(am_has_feature(g_options->features, fe_partial_estimate)) {
-                   send_partial_reply = 1;
-               }
-               amfree(option_str);
-           }
-       }
-       if(pipe(req_pipe) == -1 || pipe(rep_pipe) == -1)
-           error("pipe: %s", strerror(errno));
+error:
+       pkt_cat(&as->rep_pkt, "%s", as->repbuf);
+    }
 
-       /* spawn first child to handle the request */
+    /*
+     * We've setup our REP packet in as->rep_pkt.  Now move to the transmission
+     * state.
+     */
+    as->state = s_sendrep;
+    as->repretry = MAX_REP_RETRIES;
+    amfree(repbuf);
+    return (A_CONTINUE);
+}
 
-       switch(fork()) {
-       case -1: error("could not fork for %s: %s", cmd, strerror(errno));
+/*
+ * This is the state where we send the REP we just collected from our child.
+ */
+static action_t
+s_sendrep(as, action, pkt)
+    struct active_service *as;
+    action_t action;
+    pkt_t *pkt;
+{
 
-       default:                /* parent */
+    /*
+     * Transmit it and move to the ack state.
+     */
+    do_sendpkt(as->security_handle, &as->rep_pkt);
+    security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
+    as->state = s_ackwait;
+    return (A_PENDING);
+}
 
-           break; 
+/*
+ * This is the state in which we wait for the server to ACK the REP
+ * we just sent it.
+ */
+static action_t
+s_ackwait(as, action, pkt)
+    struct active_service *as;
+    action_t action;
+    pkt_t *pkt;
+{
+    struct datafd_handle *dh;
+    int npipes;
 
-       case 0:         /* child */
+    /*
+     * If we got a timeout, try again, but eventually give up.
+     */
+    if (action == A_TIMEOUT) {
+       if (--as->repretry > 0) {
+           as->state = s_sendrep;
+           return (A_CONTINUE);
+       }
+       dbprintf(("%s: timeout waiting for ACK for our REP\n",
+           debug_prefix_time(NULL)));
+       return (A_FINISH);
+    }
+#ifdef AMANDAD_DEBUG
+    dbprintf(("%s: received ACK, now opening streams\n",
+       debug_prefix_time(NULL)));
+#endif
 
-            aclose(req_pipe[1]); 
-            aclose(rep_pipe[0]);
+    assert(action == A_RECVPKT);
+    if (pkt->type != P_ACK)
+       return (A_SENDNAK);
 
-            dup2(req_pipe[0], 0);
-            dup2(rep_pipe[1], 1);
+    /*
+     * Got the ack, now open the pipes
+     */
+    for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
+       if (dh->netfd == NULL)
+           continue;
+       if (security_stream_accept(dh->netfd) < 0) {
+           dbprintf(("%s: stream %d accept failed: %s\n",
+               debug_prefix_time(NULL),
+               dh - &as->data[0], security_geterror(as->security_handle)));
+           security_stream_close(dh->netfd);
+           dh->netfd = NULL;
+       }
+       /* setup an event for reads from it */
+       dh->ev_handle = event_register(dh->fd, EV_READFD, process_netfd, dh);
+    }
 
-           /* modification by BIS@BBN 4/25/2003:
-            * close these file descriptors BEFORE doing pipe magic
-            * for transferring session key; inside transfer_session_key
-            * is a dup2 to move a pipe to KEY_PIPE, which collided
-            * with req_pipe[0]; when req_pipe[0] was closed after the
-            * call to transfer_session_key, then KEY_PIPE ended up
-            * being closed. */
-            aclose(req_pipe[0]);
-            aclose(rep_pipe[1]);
+    /*
+     * Pipes are open, so auth them.  Count them at the same time.
+     */
+    for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
+       if (dh->netfd == NULL)
+           continue;
+       if (security_stream_auth(dh->netfd) < 0) {
+           security_stream_close(dh->netfd);
+           dh->netfd = NULL;
+           event_release(dh->ev_handle);
+           dh->ev_handle = NULL;
+       } else {
+           npipes++;
+       }
+    }
 
-#ifdef  KRB4_SECURITY
-           transfer_session_key();
+    /*
+     * If no pipes are open, then we're done.  Otherwise, just start running.
+     * The event handlers on all of the pipes will take it from here.
+     */
+#ifdef AMANDAD_DEBUG
+    dbprintf(("%s: at end of s_ackwait, npipes is %d\n",
+       debug_prefix_time(NULL), npipes));
 #endif
+    if (npipes == 0)
+       return (A_FINISH);
+    else {
+       security_close(as->security_handle);
+       as->security_handle = NULL;
+       return (A_PENDING);
+    }
+}
 
-           /* run service */
+/*
+ * Called when a repfd has received data
+ */
+static void
+repfd_recv(cookie)
+    void *cookie;
+{
+    struct active_service *as = cookie;
 
-           execle(cmd, cmd, NULL, safe_env());
-           error("could not exec %s: %s", cmd, strerror(errno));
-        }
-        amfree(cmd);
+    assert(as != NULL);
+    assert(as->ev_repfd != NULL);
 
-        aclose(req_pipe[0]);
-        aclose(rep_pipe[1]);
+    state_machine(as, A_RECVREP, NULL);
+}
 
-        /* spawn second child to handle writing the packet to the first child */
+/*
+ * Called when a repfd has timed out
+ */
+static void
+timeout_repfd(cookie)
+    void *cookie;
+{
+    struct active_service *as = cookie;
 
-        switch(fork()) {
-        case -1: error("could not fork for %s: %s", cmd, strerror(errno));
+    assert(as != NULL);
+    assert(as->ev_reptimeout != NULL);
 
-        default:               /* parent */
+    state_machine(as, A_TIMEOUT, NULL);
+}
 
-           break;
+/*
+ * Called when a handle has received data
+ */
+static void
+protocol_recv(cookie, pkt, status)
+    void *cookie;
+    pkt_t *pkt;
+    security_status_t status;
+{
+    struct active_service *as = cookie;
+
+    assert(as != NULL);
+
+    switch (status) {
+    case S_OK:
+       dbprintf(("%s: received %s pkt:\n<<<<<\n%s>>>>>\n",
+           debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
+       state_machine(as, A_RECVPKT, pkt);
+       break;
+    case S_TIMEOUT:
+       dbprintf(("%s: timeout\n", debug_prefix_time(NULL)));
+       state_machine(as, A_TIMEOUT, NULL);
+       break;
+    case S_ERROR:
+       dbprintf(("%s: receive error: %s\n",
+           debug_prefix_time(NULL), security_geterror(as->security_handle)));
+       break;
+    }
+}
 
-        case 0:                /* child */
+/*
+ * This is a generic relay function that just reads data from one of
+ * the process's pipes and passes it up the equivalent security_stream_t
+ */
+static void
+process_netfd(cookie)
+    void *cookie;
+{
+    pkt_t nak;
+    struct datafd_handle *dh = cookie;
+    struct active_service *as = dh->as;
+    int n;
 
-            aclose(rep_pipe[0]);
-            reqlen = strlen(in_msg.dgram.cur);
-           if((rc = fullwrite(req_pipe[1], in_msg.dgram.cur, reqlen)) != reqlen) {
-               if(rc < 0) {
-                   error("write to child pipe: %s", strerror(errno));
-               } else {
-                   error("write to child pipe: %d instead of %d", rc, reqlen);
-               }
-           }
-            aclose(req_pipe[1]);
-           exit(0);
-        }
+    do {
+       n = read(dh->fd, as->databuf, sizeof(as->databuf));
+    } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
 
-        aclose(req_pipe[1]);
+    /*
+     * Process has died.
+     */
+    if (n < 0) {
+       pkt_init(&nak, P_NAK, "ERROR data descriptor %d broken: %s\n",
+           dh->fd, strerror(errno));
+       goto sendnak;
+    }
+    /*
+     * Process has closed the pipe.  Just remove this event handler.
+     * If all pipes are closed, shut down this service.
+     */
+    if (n == 0) {
+       event_release(dh->ev_handle);
+       dh->ev_handle = NULL;
+       security_stream_close(dh->netfd);
+       dh->netfd = NULL;
+       for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
+           if (dh->netfd != NULL)
+               return;
+       }
+       service_delete(as);
+       return;
+    }
+    if (security_stream_write(dh->netfd, as->databuf, n) < 0) {
+       /* stream has croaked */
+       pkt_init(&nak, P_NAK, "ERROR write error on stream %d: %s\n",
+           security_stream_id(dh->netfd),
+           security_stream_geterror(dh->netfd));
+       goto sendnak;
     }
+    return;
+
+sendnak:
+    do_sendpkt(as->security_handle, &nak);
+    service_delete(as);
+}
+
+
+/*
+ * Convert a local stream handle (DATA_FD...) into something that
+ * can be sent to the amanda server.
+ *
+ * Returns a number that should be sent to the server in the REP packet.
+ */
+static int
+allocstream(as, handle)
+    struct active_service *as;
+    int handle;
+{
+    struct datafd_handle *dh;
+
+    /* if the handle is -1, then we don't bother */
+    if (handle < 0)
+       return (-1);
+
+    /* make sure the handle's kosher */
+    if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
+       return (-1);
+
+    /* get a pointer into our handle array */
+    dh = &as->data[handle - DATA_FD_OFFSET];
+
+    /* make sure we're not already using the net handle */
+    if (dh->netfd != NULL)
+       return (-1);
 
-    setup_rep(&in_msg, &out_msg, 0);
-    if(send_partial_reply) {
-       setup_rep(&in_msg, &out_pmsg, 1);
+    /* allocate a stream from the security layer and return */
+    dh->netfd = security_stream_server(as->security_handle);
+    if (dh->netfd == NULL) {
+       dbprintf(("%s: couldn't open stream to server: %s\n",
+           debug_prefix_time(NULL), security_geterror(as->security_handle)));
+       return (-1);
     }
-#ifdef KRB4_SECURITY
-    add_mutual_authenticator(&out_msg.dgram);
-    add_mutual_authenticator(&out_pmsg.dgram);
-#endif
 
-    while(1) {
+    /*
+     * convert the stream into a numeric id that can be sent to the
+     * remote end.
+     */
+    return (security_stream_id(dh->netfd));
+}
+
+/*
+ * Create a new service instance
+ */
+static struct active_service *
+service_new(security_handle, cmd, arguments)
+    security_handle_t *security_handle;
+    const char *cmd, *arguments;
+{
+    int data[DATA_FD_COUNT + 2][2], i;
+    struct active_service *as;
+    pid_t pid;
 
-       FD_ZERO(&insock);
-       FD_SET(rep_pipe[0], &insock);
+    assert(security_handle != NULL);
+    assert(cmd != NULL);
+    assert(arguments != NULL);
 
-       if((servp->flags & IS_INTERNAL) != 0) {
-           n = 0;
-       } else {
-           FD_SET(0, &insock);
-           n = select(rep_pipe[0] + 1,
-                      (SELECT_ARG_TYPE *)&insock,
-                      NULL,
-                      NULL,
-                      NULL);
-       }
-       if(n < 0) {
-           error("select failed: %s", strerror(errno));
-       }
+    /* a plethora of pipes */
+    for (i = 0; i < DATA_FD_COUNT + 2; i++)
+       if (pipe(data[i]) < 0)
+           error("pipe: %s", strerror(errno));
 
-       if(FD_ISSET(rep_pipe[0], &insock)) {
-           if(dglen >= MAX_DGRAM) {
-               error("more than %d bytes received from child", MAX_DGRAM);
-           }
-           rc = read(rep_pipe[0], out_msg.dgram.cur+dglen, MAX_DGRAM-dglen);
-           if(rc <= 0) {
-               if (rc < 0) {
-                   error("reading response pipe: %s", strerror(errno));
-               }
-               break;
-           }
-           else {
-               if(send_partial_reply) {
-                   strncpy(out_pmsg.dgram.cur+dglen, out_msg.dgram.cur+dglen, rc);
-                   out_pmsg.dgram.len += rc;
-                   out_pmsg.dgram.data[out_pmsg.dgram.len] = '\0';
-                   dbprintf(("%s: sending PREP packet:\n----\n%s----\n\n",
-                             debug_prefix_time(NULL), out_pmsg.dgram.data));
-                   dgram_send_addr(in_msg.peer, &out_pmsg.dgram);
-               }
-               dglen += rc;
+    switch(pid = fork()) {
+    case -1:
+       error("could not fork service %s: %s", cmd, strerror(errno));
+    default:
+       /*
+        * The parent.  Close the far ends of our pipes and return.
+        */
+       as = alloc(sizeof(*as));
+       as->cmd = stralloc(cmd);
+       as->arguments = stralloc(arguments);
+       as->security_handle = security_handle;
+       as->state = NULL;
+       as->pid = pid;
+       as->send_partial_reply = 0;
+       if(strcmp(cmd+(strlen(cmd)-8), "sendsize") == 0) {
+           g_option_t *g_options;
+           char *option_str, *p;
+
+           option_str = stralloc(as->arguments+8);
+           p = strchr(option_str,'\n');
+           if(p) *p = '\0';
+
+           g_options = parse_g_options(option_str, 1);
+           if(am_has_feature(g_options->features, fe_partial_estimate)) {
+               as->send_partial_reply = 1;
            }
+           amfree(g_options);
+           amfree(option_str);
        }
-       if(!FD_ISSET(0,&insock))
-           continue;
 
-       if((n = dgram_recv(&dup_msg.dgram, RECV_TIMEOUT, &dup_msg.peer)) <= 0) {
-           char *s;
+       /* write to the request pipe */
+       aclose(data[0][0]);
+       as->reqfd = data[0][1];
 
-           if (n == 0) {
-               s = "timeout";
-           } else {
-               s = strerror(errno);
-           }
-           error("error receiving message: %s", s);
+       /*
+        * read from the reply pipe
+        */
+       as->repfd = data[1][0];
+       aclose(data[1][1]);
+       as->ev_repfd = NULL;
+       as->repbufsize = 0;
+       as->repretry = 0;
+
+       /*
+        * read from the rest of the general-use pipes
+        * (netfds are opened as the client requests them)
+        */
+       for (i = 0; i < DATA_FD_COUNT; i++) {
+           aclose(data[i + 2][1]);
+           as->data[i].fd = data[i + 2][0];
+           as->data[i].ev_handle = NULL;
+           as->data[i].netfd = NULL;
+           as->data[i].as = as;
        }
 
-       /* 
-        * Under normal conditions, the master will resend the REQ packet
-        * to be sure we are still alive.  It expects an ACK back right away.
-        *
-        * XXX- Arguably we should parse and security check the new packet, 
-        * only sending an ACK if it passes and the request is identical to
-        * the original one.  However, that's too much work for now. :-) 
-        *
-        * It should suffice to ACK whenever the sender is identical.
+       /* add it to the service queue */
+       /* increment the active service count */
+       TAILQ_INSERT_TAIL(&serviceq.tailq, as, tq);
+       serviceq.qlength++;
+
+       return (as);
+    case 0:
+       /*
+        * The child.  Put our pipes in their advertised locations
+        * and start up.
         */
-       dbprintf(("%s: got packet:\n----\n%s----\n\n",
-                 debug_prefix_time(NULL), dup_msg.dgram.data));
-       parse_pkt_header(&dup_msg);
-       if(dup_msg.peer.sin_addr.s_addr == in_msg.peer.sin_addr.s_addr &&
-          dup_msg.peer.sin_port == in_msg.peer.sin_port) {
-           if(dup_msg.type == P_REQ) {
-               dbprintf(("%s: received dup P_REQ packet, ACKing it\n",
-                         debug_prefix_time(NULL)));
-               sendack(&in_msg, &rej_msg);
-           }
-           else {
-               dbprintf(("%s: it is not a P_REQ, ignoring it\n",
-                         debug_prefix_time(NULL)));
-           }
+#ifdef FORCE_USERID
+       seteuid((uid_t)0);
+       setuid(client_uid);
+#endif
+
+       /*
+        * The data stream is stdin in the new process
+        */
+        if (dup2(data[0][0], 0) < 0) {
+           error("dup %d to %d failed: %s\n", data[0][0], 0,
+               strerror(errno));
+       }
+       aclose(data[0][0]);
+       aclose(data[0][1]);
+
+       /*
+        * The reply stream is stdout
+        */
+        if (dup2(data[1][1], 1) < 0) {
+           error("dup %d to %d failed: %s\n", data[1][1], 1,
+               strerror(errno));
        }
-       else {
-           dbprintf(("%s: received other packet, NAKing it\n",
-                     debug_prefix_time(NULL)));
-           dbprintf(("  addr: peer %s dup %s, port: peer %d dup %d\n",
-                     inet_ntoa(in_msg.peer.sin_addr),
-                     inet_ntoa(dup_msg.peer.sin_addr),
-                     (int)ntohs(in_msg.peer.sin_port),
-                     (int)ntohs(dup_msg.peer.sin_port)));
-           /* XXX dup_msg filled in? */
-           sendnak(&dup_msg, &rej_msg, "amandad busy");
+        aclose(data[1][0]);
+        aclose(data[1][1]);
+
+       /*
+        * The rest start at the offset defined in amandad.h, and continue
+        * through the internal defined.
+        */
+       for (i = 0; i < DATA_FD_COUNT; i++) {
+           if (dup2(data[i + 2][1], i + DATA_FD_OFFSET) < 0) {
+               error("dup %d to %d failed: %s\n", data[i + 2][1],
+                   i + DATA_FD_OFFSET, strerror(errno));
+           }
+           aclose(data[i + 2][0]);
+           aclose(data[i + 2][1]);
        }
 
+       execle(cmd, cmd, NULL, safe_env());
+       error("could not exec service %s: %s", cmd, strerror(errno));
     }
+    /* NOTREACHED */
+    return NULL;
+}
 
-    /* XXX reap child?  log if non-zero status?  don't respond if non zero? */
-    /* setup header for out_msg */
+/*
+ * Unallocate a service instance
+ */
+static void
+service_delete(as)
+    struct active_service *as;
+{
+    int i;
+    struct datafd_handle *dh;
 
-    out_msg.dgram.len += dglen;
-    out_msg.dgram.data[out_msg.dgram.len] = '\0';
-    aclose(rep_pipe[0]);
+#ifdef AMANDAD_DEBUG
+       dbprintf(("%s: closing service: %s\n",
+           debug_prefix_time(NULL), (as->cmd)?as->cmd:"??UNKONWN??"));
+#endif
 
-send_response:
+    assert(as != NULL);
 
-    retry_count = 0;
+    assert(as->cmd != NULL);
+    amfree(as->cmd);
 
-    while(retry_count < max_retry_count) {
-       if(!retry_count)
-           dbprintf(("%s: sending REP packet:\n----\n%s----\n\n",
-                     debug_prefix_time(NULL), out_msg.dgram.data));
-       dgram_send_addr(in_msg.peer, &out_msg.dgram);
-       if((n = dgram_recv(&dup_msg.dgram, ack_timeout, &dup_msg.peer)) <= 0) {
-           char *s;
+    assert(as->arguments != NULL);
+    amfree(as->arguments);
 
-           if (n == 0) {
-               s = "timeout";
-           } else {
-               s = strerror(errno);
-           }
+    if (as->reqfd != -1)
+       aclose(as->reqfd);
+    if (as->repfd != -1)
+       aclose(as->repfd);
 
-           /* timed out or error, try again */
-           retry_count++;
+    if (as->ev_repfd != NULL)
+       event_release(as->ev_repfd);
+    if (as->ev_reptimeout != NULL)
+       event_release(as->ev_reptimeout);
 
-           dbprintf(("%s: waiting for ack: %s", debug_prefix_time(NULL), s));
-           if(retry_count < max_retry_count) 
-               dbprintf((", retrying\n"));
-           else 
-               dbprintf((", giving up!\n"));
+    for (i = 0; i < DATA_FD_COUNT; i++) {
+       dh = &as->data[i];
 
-           continue;
-       }
-       dbprintf(("%s: got packet:\n----\n%s----\n\n",
-                 debug_prefix_time(NULL), dup_msg.dgram.data));
-       parse_pkt_header(&dup_msg);
-
-       
-       if(dup_msg.peer.sin_addr.s_addr == in_msg.peer.sin_addr.s_addr &&
-          dup_msg.peer.sin_port == in_msg.peer.sin_port) {
-           if(dup_msg.type == P_ACK)
-               break;
-           else
-               dbprintf(("%s: it is not an ack\n", debug_prefix_time(NULL)));
-       }
-       else {
-           dbprintf(("%s: weird, it is not a proper ack\n",
-                     debug_prefix_time(NULL)));
-           dbprintf(("  addr: peer %s dup %s, port: peer %d dup %d\n",
-                     inet_ntoa(in_msg.peer.sin_addr),
-                     inet_ntoa(dup_msg.peer.sin_addr),
-                     (int)ntohs(in_msg.peer.sin_port),
-                     (int)ntohs(dup_msg.peer.sin_port)));
-       }               
-    }
-    /* XXX log if retry count exceeded */
-
-    amfree(cmd);
-    amfree(noop_file);
-    amfree(our_feature_string);
-    am_release_feature_set(our_features);
-    our_features = NULL;
-    malloc_size_2 = malloc_inuse(&malloc_hist_2);
-
-    if(malloc_size_1 != malloc_size_2) {
-#if defined(USE_DBMALLOC)
-       malloc_list(dbfd(), malloc_hist_1, malloc_hist_2);
-#endif
+       aclose(dh->fd);
+
+       if (dh->netfd != NULL)
+           security_stream_close(dh->netfd);
+
+       if (dh->ev_handle != NULL)
+           event_release(dh->ev_handle);
     }
 
-    dbclose();
-    return 0;
-}
+    if (as->security_handle != NULL)
+       security_close(as->security_handle);
 
+    assert(as->pid > 0);
+    kill(as->pid, SIGTERM);
+    waitpid(as->pid, NULL, WNOHANG);
 
-/* -------- */
+    TAILQ_REMOVE(&serviceq.tailq, as, tq);
+    assert(serviceq.qlength > 0);
+    serviceq.qlength--;
 
-void sendack(hdr, msg)
-pkt_t *hdr;
-pkt_t *msg;
-{
-    /* XXX this isn't very safe either: handle could be bogus */
-    ap_snprintf(msg->dgram.data, sizeof(msg->dgram.data),
-               "Amanda %d.%d ACK HANDLE %s SEQ %d\n",
-               VERSION_MAJOR, VERSION_MINOR,
-               hdr->handle ? hdr->handle : "",
-               hdr->sequence);
-    msg->dgram.len = strlen(msg->dgram.data);
-    dbprintf(("%s: sending ack:\n----\n%s----\n\n",
-             debug_prefix_time(NULL), msg->dgram.data));
-    dgram_send_addr(hdr->peer, &msg->dgram);
+    amfree(as);
 }
 
-void sendnak(hdr, msg, str)
-pkt_t *hdr;
-pkt_t *msg;
-char *str;
+/*
+ * Like 'fullwrite', but does the work in a child process so pipelines
+ * do not hang.
+ */
+static int
+writebuf(as, bufp, size)
+    struct active_service *as;
+    const void *bufp;
+    size_t size;
 {
-    /* XXX this isn't very safe either: handle could be bogus */
-    ap_snprintf(msg->dgram.data, sizeof(msg->dgram.data),
-               "Amanda %d.%d NAK HANDLE %s SEQ %d\nERROR %s\n",
-               VERSION_MAJOR, VERSION_MINOR,
-               hdr->handle ? hdr->handle : "",
-               hdr->sequence, str ? str : "UNKNOWN");
-
-    msg->dgram.len = strlen(msg->dgram.data);
-    dbprintf(("%s: sending nack:\n----\n%s----\n\n",
-             debug_prefix_time(NULL), msg->dgram.data));
-    dgram_send_addr(hdr->peer, &msg->dgram);
-}
+    int pid;
 
-void setup_rep(hdr, msg, partial_rep)
-pkt_t *hdr;
-pkt_t *msg;
-int partial_rep;
-{
-    /* XXX this isn't very safe either: handle could be bogus */
-    ap_snprintf(msg->dgram.data, sizeof(msg->dgram.data),
-               "Amanda %d.%d %s HANDLE %s SEQ %d\n",
-               VERSION_MAJOR, VERSION_MINOR,
-               partial_rep == 0 ? "REP" : "PREP", 
-               hdr->handle ? hdr->handle : "",
-               hdr->sequence);
+    switch (pid=fork()) {
+    case -1:
+       return -1;
 
-    msg->dgram.len = strlen(msg->dgram.data);
-    msg->dgram.cur = msg->dgram.data + msg->dgram.len;
+    default:
+       waitpid(pid, NULL, WNOHANG);
+       return 0;                       /* this is the parent */
 
+    case 0:
+       break;                          /* this is the child */
+    }
+    aclose (as->repfd);                        /* make sure we are not a reader */
+    exit (fullwrite(as->reqfd, bufp, size) != size);
 }
 
-/* -------- */
+static int
+do_sendpkt(handle, pkt)
+    security_handle_t *handle;
+    pkt_t *pkt;
+{
+    dbprintf(("%s: sending %s pkt:\n<<<<<\n%s>>>>>\n",
+       debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
+    return security_sendpkt(handle, pkt);
+}
+
+#ifdef AMANDAD_DEBUG
+/*
+ * Convert a state into a string
+ */
+static const char *
+state2str(state)
+    state_t state;
+{
+    static const struct {
+       state_t state;
+       const char str[13];
+    } states[] = {
+#define        X(state)        { state, stringize(state) }
+       X(s_sendack),
+       X(s_repwait),
+       X(s_processrep),
+       X(s_sendrep),
+       X(s_ackwait),
+#undef X
+    };
+    int i;
+
+    for (i = 0; i < sizeof(states) / sizeof(states[0]); i++)
+       if (state == states[i].state)
+           return (states[i].str);
+    return ("INVALID STATE");
+}
 
-char *strlower(str)
-char *str;
+/*
+ * Convert an action into a string
+ */
+static const char *
+action2str(action)
+    action_t action;
 {
-    char *s;
-    for(s=str; *s; s++)
-       if(isupper((int)*s)) *s = tolower(*s);
-    return str;
+    static const struct {
+       action_t action;
+       const char str[12];
+    } actions[] = {
+#define        X(action)       { action, stringize(action) }
+       X(A_START),
+       X(A_RECVPKT),
+       X(A_RECVREP),
+       X(A_PENDING),
+       X(A_FINISH),
+       X(A_CONTINUE),
+       X(A_SENDNAK),
+       X(A_TIMEOUT),
+#undef X
+    };
+    int i;
+
+    for (i = 0; i < sizeof(actions) / sizeof(actions[0]); i++)
+       if (action == actions[i].action)
+           return (actions[i].str);
+    return ("UNKNOWN ACTION");
 }
+#endif /* AMANDAD_DEBUG */