Imported Upstream version 2.5.1p3
[debian/amanda] / amandad-src / amandad.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1999 University of Maryland at College Park
4  * All Rights Reserved.
5  *
6  * Permission to use, copy, modify, distribute, and sell this software and its
7  * documentation for any purpose is hereby granted without fee, provided that
8  * the above copyright notice appear in all copies and that both that
9  * copyright notice and this permission notice appear in supporting
10  * documentation, and that the name of U.M. not be used in advertising or
11  * publicity pertaining to distribution of the software without specific,
12  * written prior permission.  U.M. makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as is"
14  * without express or implied warranty.
15  *
16  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22  *
23  * Authors: the Amanda Development Team.  Its members are listed in a
24  * file named AUTHORS, in the root directory of this distribution.
25  */
26
27 /*
28  * $Id: amandad.c,v 1.18.2.5 2007/01/10 16:26:57 martinea Exp $
29  *
30  * handle client-host side of Amanda network communications, including
31  * security checks, execution of the proper service, and acking the
32  * master side
33  */
34
35 /*#define       AMANDAD_DEBUG*/
36
37 #include "amanda.h"
38 #include "amandad.h"
39 #include "clock.h"
40 #include "event.h"
41 #include "amfeatures.h"
42 #include "packet.h"
43 #include "version.h"
44 #include "queue.h"
45 #include "security.h"
46 #include "stream.h"
47 #include "util.h"
48
49 #define REP_TIMEOUT     (6*60*60)       /* secs for service to reply */
50 #define ACK_TIMEOUT     10              /* XXX should be configurable */
51 #define MAX_REP_RETRIES 5
52
53 /*
54  * These are the actions for entering the state machine
55  */
56 typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
57     A_SENDNAK, A_TIMEOUT } action_t;
58
59 /*
60  * This is a state in the state machine.  It is a function pointer to
61  * the function that actually implements the state.
62  */
63 struct active_service;
64 typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
65
66 /*
67  * This structure describes an active running service.
68  *
69  * An active service is something running that we have received
70  * a request for.  This structure holds info on that service, including
71  * file descriptors for data, etc, as well as the security handle
72  * for communications with the amanda server.
73  */
74 struct active_service {
75     char *cmd;                          /* name of command we ran */
76     char *arguments;                    /* arguments we sent it */
77     security_handle_t *security_handle; /* remote server */
78     state_t state;                      /* how far this has progressed */
79     pid_t pid;                          /* pid of subprocess */
80     int send_partial_reply;             /* send PREP packet */
81     int reqfd;                          /* pipe to write requests */
82     int repfd;                          /* pipe to read replies */
83     event_handle_t *ev_repfd;           /* read event handle for repfd */
84     event_handle_t *ev_reptimeout;      /* timeout for rep data */
85     pkt_t rep_pkt;                      /* rep packet we're sending out */
86     char *repbuf;                       /* buffer to read the rep into */
87     size_t bufsize;                     /* length of repbuf */
88     size_t repbufsize;                  /* length of repbuf */
89     int repretry;                       /* times we'll retry sending the rep */
90     /*
91      * General user streams to the process, and their equivalent
92      * network streams.
93      */
94     struct datafd_handle {
95         int fd_read;                    /* pipe to child process */
96         int fd_write;                   /* pipe to child process */
97         event_handle_t *ev_read;        /* it's read event handle */
98         event_handle_t *ev_write;       /* it's write event handle */
99         security_stream_t *netfd;       /* stream to amanda server */
100         struct active_service *as;      /* pointer back to our enclosure */
101     } data[DATA_FD_COUNT];
102     char databuf[NETWORK_BLOCK_BYTES];  /* buffer to relay netfd data in */
103     TAILQ_ENTRY(active_service) tq;     /* queue handle */
104 };
105
106 /* 
107  * Here are the services that we allow.
108  */
109 static struct services {
110     char *name;
111     int  active;
112 } services[] = {
113     { "noop", 1 },
114     { "sendsize", 1 },
115     { "sendbackup", 1 },
116     { "selfcheck", 1 },
117     { "amindexd", 0 },
118     { "amidxtaped", 0 }
119 };
120 #define NSERVICES       (int)(sizeof(services) / sizeof(services[0]))
121
122 /*
123  * Queue of outstanding requests that we are running.
124  */
125 static struct {
126     TAILQ_HEAD(, active_service) tailq;
127     int qlength;
128 } serviceq = {
129     TAILQ_HEAD_INITIALIZER(serviceq.tailq), 0
130 };
131
132 /*
133  * Data for dbmalloc to check for memory leaks
134  */
135 #ifdef USE_DBMALLOC
136 static struct {
137     struct {
138         unsigned long size, hist;
139     } start, end;
140 } dbmalloc_info;
141 #endif
142
143 static int wait_30s = 1;
144 static int exit_on_qlength = 1;
145 static char *auth = NULL;
146
147 int main(int argc, char **argv);
148
149 static int allocstream(struct active_service *, int);
150 static void exit_check(void *);
151 static void protocol_accept(security_handle_t *, pkt_t *);
152 static void state_machine(struct active_service *, action_t, pkt_t *);
153
154 static action_t s_sendack(struct active_service *, action_t, pkt_t *);
155 static action_t s_repwait(struct active_service *, action_t, pkt_t *);
156 static action_t s_processrep(struct active_service *, action_t, pkt_t *);
157 static action_t s_sendrep(struct active_service *, action_t, pkt_t *);
158 static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
159
160 static void repfd_recv(void *);
161 static void timeout_repfd(void *);
162 static void protocol_recv(void *, pkt_t *, security_status_t);
163 static void process_readnetfd(void *);
164 static void process_writenetfd(void *, void *, ssize_t);
165 static struct active_service *service_new(security_handle_t *,
166     const char *, const char *);
167 static void service_delete(struct active_service *);
168 static int writebuf(struct active_service *, const void *, size_t);
169 static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
170
171 static void child_signal(int signal);
172
173 #ifdef AMANDAD_DEBUG
174 static const char *state2str(state_t);
175 static const char *action2str(action_t);
176 #endif
177
178 /*
179  * Harvests defunct processes...
180  */
181
182 static void
183 child_signal(
184     int         signal)
185 {
186     pid_t       rp;
187
188     (void)signal;       /* Quite compiler warning */
189     /*
190      * Reap and child status and promptly ignore since we don't care...
191      */
192     do {
193         rp = waitpid(-1, NULL, WNOHANG);
194     } while (rp > 0);
195 }
196
197 int
198 main(
199     int         argc,
200     char **     argv)
201 {
202     int i, j;
203     int have_services;
204     int in, out;
205     const security_driver_t *secdrv;
206     int no_exit = 0;
207     struct sigaction act, oact;
208     char *pgm = "amandad";              /* in case argv[0] is not set */
209 #if defined(AMANDAD_DEBUG) && defined(USE_REUSEADDR)
210     const int on = 1;
211     int r;
212 #endif
213
214     safe_fd(-1, 0);
215     safe_cd();
216
217     /*
218      * When called via inetd, it is not uncommon to forget to put the
219      * argv[0] value on the config line.  On some systems (e.g. Solaris)
220      * this causes argv and/or argv[0] to be NULL, so we have to be
221      * careful getting our name.
222      */
223     if ((argv == NULL) || (argv[0] == NULL)) {
224             pgm = "amandad";            /* in case argv[0] is not set */
225     } else {
226             pgm = basename(argv[0]);    /* Strip of leading path get debug name */
227     }
228     set_pname(pgm);
229     dbopen(DBG_SUBDIR_AMANDAD);
230
231     if(argv == NULL) {
232         error("argv == NULL\n");
233         /*NOTREACHED*/
234     }
235
236     /* Don't die when child closes pipe */
237     signal(SIGPIPE, SIG_IGN);
238
239     /* Tell me when a child exits or dies... */
240     act.sa_handler = child_signal;
241     sigemptyset(&act.sa_mask);
242     act.sa_flags = 0;
243     if(sigaction(SIGCHLD, &act, &oact) != 0) {
244         error("error setting SIGCHLD handler: %s", strerror(errno));
245         /*NOTREACHED*/
246     }
247
248 #ifdef USE_DBMALLOC
249     dbmalloc_info.start.size = malloc_inuse(&dbmalloc_info.start.hist);
250 #endif
251
252     erroutput_type = (ERR_INTERACTIVE|ERR_SYSLOG);
253
254 #ifdef FORCE_USERID
255     /* we'd rather not run as root */
256     if (geteuid() == 0) {
257         if(client_uid == (uid_t) -1) {
258             error("error [cannot find user %s in passwd file]\n", CLIENT_LOGIN);
259             /*NOTREACHED*/
260         }
261         initgroups(CLIENT_LOGIN, client_gid);
262         setgid(client_gid);
263         setegid(client_gid);
264         seteuid(client_uid);
265     }
266 #endif  /* FORCE_USERID */
267
268     /*
269      * ad-hoc argument parsing
270      *
271      * We accept        -auth=[authentication type]
272      *                  -no-exit
273 #ifdef AMANDAD_DEBUG
274      *                  -tcp=[port]
275      *                  -udp=[port]
276 #endif
277      * We also add a list of services that amandad can launch
278      */
279     secdrv = NULL;
280     in = 0; out = 1;            /* default to stdin/stdout */
281     have_services = 0;
282     for (i = 1; i < argc; i++) {
283         /*
284          * accept -krb4 as an alias for -auth=krb4 (for compatibility)
285          */
286         if (strcmp(argv[i], "-krb4") == 0) {
287             argv[i] = "-auth=krb4";
288             /* FALLTHROUGH */
289             auth = "krb4";
290         }
291
292         /*
293          * Get a driver for a security type specified after -auth=
294          */
295         else if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
296             argv[i] += strlen("-auth=");
297             secdrv = security_getdriver(argv[i]);
298             auth = argv[i];
299             if (secdrv == NULL) {
300                 error("no driver for security type '%s'\n", argv[i]);
301                 /*NOTREACHED*/
302             }
303             continue;
304         }
305
306         /*
307          * If -no-exit is specified, always run even after requests have
308          * been satisfied.
309          */
310         else if (strcmp(argv[i], "-no-exit") == 0) {
311             no_exit = 1;
312             continue;
313         }
314
315 #ifdef AMANDAD_DEBUG
316         /*
317          * Allow us to directly bind to a udp port for debugging.
318          * This may only apply to some security types.
319          */
320         else if (strncmp(argv[i], "-udp=", strlen("-udp=")) == 0) {
321             struct sockaddr_in sin;
322
323             argv[i] += strlen("-udp=");
324             in = out = socket(AF_INET, SOCK_DGRAM, 0);
325             if (in < 0) {
326                 error("can't create dgram socket: %s\n", strerror(errno));
327                 /*NOTREACHED*/
328             }
329 #ifdef USE_REUSEADDR
330             r = setsockopt(in, SOL_SOCKET, SO_REUSEADDR,
331                 (void *)&on, (socklen_t)sizeof(on));
332             if (r < 0) {
333                 dbprintf(("%s: amandad: setsockopt(SO_REUSEADDR) failed: %s\n",
334                           debug_prefix(NULL),
335                           strerror(errno)));
336             }
337 #endif
338
339             sin.sin_family = (sa_family_t)AF_INET;
340             sin.sin_addr.s_addr = INADDR_ANY;
341             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
342             if (bind(in, (struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
343                 error("can't bind to port %d: %s\n", atoi(argv[i]),
344                     strerror(errno));
345                 /*NOTREACHED*/
346             }
347         }
348         /*
349          * Ditto for tcp ports.
350          */
351         else if (strncmp(argv[i], "-tcp=", strlen("-tcp=")) == 0) {
352             struct sockaddr_in sin;
353             int sock;
354             socklen_t n;
355
356             argv[i] += strlen("-tcp=");
357             sock = socket(AF_INET, SOCK_STREAM, 0);
358             if (sock < 0) {
359                 error("can't create tcp socket: %s\n", strerror(errno));
360                 /*NOTREACHED*/
361             }
362             n = 1;
363 #ifdef USE_REUSEADDR
364             r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
365                 (void *)&on, (socklen_t)sizeof(on));
366             if (r < 0) {
367                 dbprintf(("%s: amandad: setsockopt(SO_REUSEADDR) failed: %s\n",
368                           debug_prefix(NULL),
369                           strerror(errno)));
370             }
371 #endif
372             setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
373                 (void *)&n, (socklen_t)sizeof(n));
374             sin.sin_family = (sa_family_t)AF_INET;
375             sin.sin_addr.s_addr = INADDR_ANY;
376             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
377             if (bind(sock, (struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
378                 error("can't bind to port %d: %s\n", atoi(argv[i]),
379                     strerror(errno));
380                 /*NOTREACHED*/
381             }
382             listen(sock, 10);
383             n = (socklen_t)sizeof(sin);
384             in = out = accept(sock, (struct sockaddr *)&sin, &n);
385         }
386 #endif
387         /*
388          * It must be a service name
389          */
390         else {
391             /* clear all services */
392             if(!have_services) {
393                 for (j = 0; j < (int)NSERVICES; j++)
394                     services[j].active = 0;
395             }
396             have_services = 1;
397
398             if(strcmp(argv[i],"amdump") == 0) {
399                 services[0].active = 1;
400                 services[1].active = 1;
401                 services[2].active = 1;
402                 services[3].active = 1;
403             }
404             else {
405                 for (j = 0; j < (int)NSERVICES; j++)
406                     if (strcmp(services[j].name, argv[i]) == 0)
407                         break;
408                 if (j == (int)NSERVICES) {
409                     dbprintf(("%s: %s: invalid service\n",
410                               debug_prefix_time(NULL), argv[i]));
411                     exit(1);
412                 }
413                 services[j].active = 1;
414             }
415         }
416     }
417
418     /*
419      * If no security type specified, use BSD
420      */
421     if (secdrv == NULL) {
422         secdrv = security_getdriver("BSD");
423         auth = "bsd";
424         if (secdrv == NULL) {
425             error("no driver for default security type 'BSD'\n");
426             /*NOTREACHED*/
427         }
428     }
429
430     if(strcasecmp(auth, "rsh") == 0 ||
431        strcasecmp(auth, "ssh") == 0 ||
432        strcasecmp(auth, "bsdtcp") == 0) {
433         wait_30s = 0;
434         exit_on_qlength = 1;
435     }
436
437     /* initialize */
438
439     startclock();
440
441     dbprintf(("%s: version %s\n", get_pname(), version()));
442     for (i = 0; version_info[i] != NULL; i++) {
443         dbprintf(("%s: %s", debug_prefix(NULL), version_info[i]));
444     }
445
446     if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
447         dbprintf(("%s: WARNING: argv[0] not defined: check inetd.conf\n",
448                   debug_prefix(NULL)));
449     }
450
451     /*
452      * Schedule to call protocol_accept() when new security handles
453      * are created on stdin.
454      */
455     security_accept(secdrv, in, out, protocol_accept);
456
457     /*
458      * Schedule an event that will try to exit every 30 seconds if there
459      * are no requests outstanding.
460      */
461     if(wait_30s)
462         (void)event_register((event_id_t)30, EV_TIME, exit_check, &no_exit);
463
464     /*
465      * Call event_loop() with an arg of 0, telling it to block until all
466      * events are completed.
467      */
468     event_loop(0);
469
470     close(in);
471     close(out);
472     dbclose();
473     return(0);
474 }
475
476 /*
477  * This runs periodically and checks to see if we have any active services
478  * still running.  If we don't, then we quit.
479  */
480 static void
481 exit_check(
482     void *      cookie)
483 {
484     int no_exit;
485
486     assert(cookie != NULL);
487     no_exit = *(int *)cookie;
488
489     /*
490      * If things are still running, then don't exit.
491      */
492     if (serviceq.qlength > 0)
493         return;
494
495     /*
496      * If the caller asked us to never exit, then we're done
497      */
498     if (no_exit)
499         return;
500
501 #ifdef USE_DBMALLOC
502     dbmalloc_info.end.size = malloc_inuse(&dbmalloc_info.end.hist);
503
504     if (dbmalloc_info.start.size != dbmalloc_info.end.size) {
505         malloc_list(dbfd(), dbmalloc_info.start.hist,
506             dbmalloc_info.end.hist);
507     }
508 #endif
509
510     dbclose();
511     exit(0);
512 }
513
514 /*
515  * Handles new incoming protocol handles.  This is a callback for
516  * security_accept(), which gets called when new handles are detected.
517  */
518 static void
519 protocol_accept(
520     security_handle_t * handle,
521     pkt_t *             pkt)
522 {
523     pkt_t pkt_out;
524     struct active_service *as;
525     char *pktbody, *tok, *service, *arguments;
526     char *service_path = NULL;
527     int i;
528
529     pkt_out.body = NULL;
530
531     /*
532      * If handle is NULL, then the connection is closed.
533      */
534     if(handle == NULL) {
535         return;
536     }
537
538     /*
539      * If pkt is NULL, then there was a problem with the new connection.
540      */
541     if (pkt == NULL) {
542         dbprintf(("%s: accept error: %s\n",
543             debug_prefix_time(NULL), security_geterror(handle)));
544         pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
545         do_sendpkt(handle, &pkt_out);
546         amfree(pkt_out.body);
547         security_close(handle);
548         return;
549     }
550
551     dbprintf(("%s: accept recv %s pkt:\n<<<<<\n%s>>>>>\n",
552         debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
553
554     /*
555      * If this is not a REQ packet, just forget about it.
556      */
557     if (pkt->type != P_REQ) {
558         dbprintf(("%s: received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n",
559             debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
560         security_close(handle);
561         return;
562     }
563
564     pktbody = service = arguments = NULL;
565     as = NULL;
566
567     /*
568      * Parse out the service and arguments
569      */
570
571     pktbody = stralloc(pkt->body);
572
573     tok = strtok(pktbody, " ");
574     if (tok == NULL)
575         goto badreq;
576     if (strcmp(tok, "SERVICE") != 0)
577         goto badreq;
578
579     tok = strtok(NULL, " \n");
580     if (tok == NULL)
581         goto badreq;
582     service = stralloc(tok);
583
584     /* we call everything else 'arguments' */
585     tok = strtok(NULL, "");
586     if (tok == NULL)
587         goto badreq;
588     arguments = stralloc(tok);
589
590     /* see if it's one we allow */
591     for (i = 0; i < (int)NSERVICES; i++)
592         if (services[i].active == 1 && strcmp(services[i].name, service) == 0)
593             break;
594     if (i == (int)NSERVICES) {
595         dbprintf(("%s: %s: invalid service\n",
596             debug_prefix_time(NULL), service));
597         pkt_init(&pkt_out, P_NAK, "ERROR %s: invalid service\n", service);
598         goto send_pkt_out;
599     }
600
601     service_path = vstralloc(libexecdir, "/", service, versionsuffix(), NULL);
602     if (access(service_path, X_OK) < 0) {
603         dbprintf(("%s: can't execute %s: %s\n",
604             debug_prefix_time(NULL), service_path, strerror(errno)));
605             pkt_init(&pkt_out, P_NAK,
606                      "ERROR execute access to \"%s\" denied\n",
607                      service_path);
608         goto send_pkt_out;
609     }
610
611     /* see if its already running */
612     for (as = TAILQ_FIRST(&serviceq.tailq); as != NULL;
613         as = TAILQ_NEXT(as, tq)) {
614             if (strcmp(as->cmd, service_path) == 0 &&
615                 strcmp(as->arguments, arguments) == 0) {
616                     dbprintf(("%s: %s %s: already running, acking req\n",
617                         debug_prefix_time(NULL), service, arguments));
618                     pkt_init_empty(&pkt_out, P_ACK);
619                     goto send_pkt_out_no_delete;
620             }
621     }
622
623     /*
624      * create a new service instance, and send the arguments down
625      * the request pipe.
626      */
627     dbprintf(("%s: creating new service: %s\n%s\n",
628         debug_prefix_time(NULL), service, arguments));
629     as = service_new(handle, service_path, arguments);
630     if (writebuf(as, arguments, strlen(arguments)) < 0) {
631         const char *errmsg = strerror(errno);
632         dbprintf(("%s: error sending arguments to %s: %s\n",
633             debug_prefix_time(NULL), service, errmsg));
634         pkt_init(&pkt_out, P_NAK, "ERROR error writing arguments to %s: %s\n",
635             service, errmsg);
636         goto send_pkt_out;
637     }
638     aclose(as->reqfd);
639
640     amfree(pktbody);
641     amfree(service);
642     amfree(service_path);
643     amfree(arguments);
644
645     /*
646      * Move to the sendack state, and start up the state
647      * machine.
648      */
649     as->state = s_sendack;
650     state_machine(as, A_START, NULL);
651     return;
652
653 badreq:
654     pkt_init(&pkt_out, P_NAK, "ERROR invalid REQ\n");
655     dbprintf(("%s: received invalid %s packet:\n<<<<<\n%s>>>>>\n\n",
656         debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
657
658 send_pkt_out:
659     if(as)
660         service_delete(as);
661 send_pkt_out_no_delete:
662     amfree(pktbody);
663     amfree(service_path);
664     amfree(service);
665     amfree(arguments);
666     do_sendpkt(handle, &pkt_out);
667     security_close(handle);
668     amfree(pkt_out.body);
669 }
670
671 /*
672  * Handles incoming protocol packets.  Routes responses to the proper
673  * running service.
674  */
675 static void
676 state_machine(
677     struct active_service *     as,
678     action_t                    action,
679     pkt_t *                     pkt)
680 {
681     action_t retaction;
682     state_t curstate;
683     pkt_t nak;
684
685 #ifdef AMANDAD_DEBUG
686     dbprintf(("%s: state_machine: %p entering\n",
687         debug_prefix_time(NULL), as));
688 #endif
689     for (;;) {
690         curstate = as->state;
691 #ifdef AMANDAD_DEBUG
692         dbprintf(("%s: state_machine: %p curstate=%s action=%s\n",
693             debug_prefix_time(NULL), as,
694             state2str(curstate), action2str(action)));
695 #endif
696         retaction = (*curstate)(as, action, pkt);
697 #ifdef AMANDAD_DEBUG
698         dbprintf(("%s: state_machine: %p curstate=%s returned %s (nextstate=%s)\n",
699             debug_prefix_time(NULL),
700             as, state2str(curstate), action2str(retaction),
701             state2str(as->state)));
702 #endif
703
704         switch (retaction) {
705         /*
706          * State has queued up and is now blocking on input.
707          */
708         case A_PENDING:
709 #ifdef AMANDAD_DEBUG
710             dbprintf(("%s: state_machine: %p leaving (A_PENDING)\n",
711                 debug_prefix_time(NULL), as));
712 #endif
713             return;
714
715         /*
716          * service has switched states.  Loop.
717          */
718         case A_CONTINUE:
719             break;
720
721         /*
722          * state has determined that the packet it received was bogus.
723          * Send a nak, and return.
724          */
725         case A_SENDNAK:
726             dbprintf(("%s: received unexpected %s packet\n",
727                 debug_prefix_time(NULL), pkt_type2str(pkt->type)));
728             dbprintf(("<<<<<\n%s----\n\n", pkt->body));
729             pkt_init(&nak, P_NAK, "ERROR unexpected packet type %s\n",
730                 pkt_type2str(pkt->type));
731             do_sendpkt(as->security_handle, &nak);
732             amfree(nak.body);
733             security_recvpkt(as->security_handle, protocol_recv, as, -1);
734 #ifdef AMANDAD_DEBUG
735             dbprintf(("%s: state_machine: %p leaving (A_SENDNAK)\n",
736                 debug_prefix_time(NULL), as));
737 #endif
738             return;
739
740         /*
741          * Service is done.  Remove it and finish.
742          */
743         case A_FINISH:
744             service_delete(as);
745 #ifdef AMANDAD_DEBUG
746             dbprintf(("%s: state_machine: %p leaving (A_FINISH)\n",
747                 debug_prefix_time(NULL), as));
748 #endif
749             return;
750
751         default:
752             assert(0);
753             break;
754         }
755     }
756     /*NOTREACHED*/
757 }
758
759 /*
760  * This state just sends an ack.  After that, we move to the repwait
761  * state to wait for REP data to arrive from the subprocess.
762  */
763 static action_t
764 s_sendack(
765     struct active_service *     as,
766     action_t                    action,
767     pkt_t *                     pkt)
768 {
769     pkt_t ack;
770
771     (void)action;       /* Quiet unused parameter warning */
772     (void)pkt;          /* Quiet unused parameter warning */
773
774     pkt_init_empty(&ack, P_ACK);
775     if (do_sendpkt(as->security_handle, &ack) < 0) {
776         dbprintf(("%s: error sending ACK: %s\n",
777             debug_prefix_time(NULL), security_geterror(as->security_handle)));
778         amfree(ack.body);
779         return (A_FINISH);
780     }
781     amfree(ack.body);
782
783     /*
784      * move to the repwait state
785      * Setup a listener for data on the reply fd, but also
786      * listen for packets over the wire, as the server may
787      * poll us if we take a long time.
788      * Setup a timeout that will fire if it takes too long to
789      * receive rep data.
790      */
791     as->state = s_repwait;
792     as->ev_repfd = event_register((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
793     as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
794         timeout_repfd, as);
795     security_recvpkt(as->security_handle, protocol_recv, as, -1);
796     return (A_PENDING);
797 }
798
799 /*
800  * This is the repwait state.  We have responded to the initial REQ with
801  * an ACK, and we are now waiting for the process we spawned to pass us 
802  * data to send in a REP.
803  */
804 static action_t
805 s_repwait(
806     struct active_service *     as,
807     action_t                    action,
808     pkt_t *                     pkt)
809 {
810     ssize_t n;
811     char *repbuf_temp;
812
813     /*
814      * We normally shouldn't receive any packets while waiting
815      * for our REP data, but in some cases we do.
816      */
817     if (action == A_RECVPKT) {
818         assert(pkt != NULL);
819         /*
820          * Another req for something that's running.  Just send an ACK
821          * and go back and wait for more data.
822          */
823         if (pkt->type == P_REQ) {
824             dbprintf(("%s: received dup P_REQ packet, ACKing it\n",
825                 debug_prefix_time(NULL)));
826             amfree(as->rep_pkt.body);
827             pkt_init_empty(&as->rep_pkt, P_ACK);
828             do_sendpkt(as->security_handle, &as->rep_pkt);
829             security_recvpkt(as->security_handle, protocol_recv, as, -1);
830             return (A_PENDING);
831         }
832         /* something unexpected.  Nak it */
833         return (A_SENDNAK);
834     }
835
836     if (action == A_TIMEOUT) {
837         amfree(as->rep_pkt.body);
838         pkt_init(&as->rep_pkt, P_NAK, "ERROR timeout on reply pipe\n");
839         dbprintf(("%s: %s timed out waiting for REP data\n",
840             debug_prefix_time(NULL), as->cmd));
841         do_sendpkt(as->security_handle, &as->rep_pkt);
842         return (A_FINISH);
843     }
844
845     assert(action == A_RECVREP);
846     if(as->bufsize == 0) {
847         as->bufsize = NETWORK_BLOCK_BYTES;
848         as->repbuf = alloc(as->bufsize);
849     }
850
851     do {
852         n = read(as->repfd, as->repbuf + as->repbufsize,
853                  as->bufsize - as->repbufsize - 1);
854     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
855     if (n < 0) {
856         const char *errstr = strerror(errno);
857         dbprintf(("%s: read error on reply pipe: %s\n",
858                   debug_prefix_time(NULL), errstr));
859         amfree(as->rep_pkt.body);
860         pkt_init(&as->rep_pkt, P_NAK, "ERROR read error on reply pipe: %s\n",
861                  errstr);
862         do_sendpkt(as->security_handle, &as->rep_pkt);
863         return (A_FINISH);
864     }
865     /*
866      * If we got some data, go back and wait for more, or EOF.  Nul terminate
867      * the buffer first.
868      */
869     as->repbuf[n + as->repbufsize] = '\0';
870     if (n > 0) {
871         as->repbufsize += n;
872         if(as->repbufsize >= (as->bufsize - 1)) {
873             as->bufsize *= 2;
874             repbuf_temp = alloc(as->bufsize);
875             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
876             amfree(as->repbuf);
877             as->repbuf = repbuf_temp;
878         }
879         else if(as->send_partial_reply) {
880             amfree(as->rep_pkt.body);
881             pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
882             do_sendpkt(as->security_handle, &as->rep_pkt);
883             amfree(as->rep_pkt.body);
884             pkt_init_empty(&as->rep_pkt, P_REP);
885         }
886  
887         return (A_PENDING);
888     }
889
890     /*
891      * If we got 0, then we hit EOF.  Process the data and release
892      * the timeout.
893      */
894     assert(n == 0);
895
896     assert(as->ev_repfd != NULL);
897     event_release(as->ev_repfd);
898     as->ev_repfd = NULL;
899
900     assert(as->ev_reptimeout != NULL);
901     event_release(as->ev_reptimeout);
902     as->ev_reptimeout = NULL;
903
904     as->state = s_processrep;
905     aclose(as->repfd);
906     return (A_CONTINUE);
907 }
908
909 /*
910  * After we have read in all of the rep data, we process it and send
911  * it out as a REP packet.
912  */
913 static action_t
914 s_processrep(
915     struct active_service *     as,
916     action_t                    action,
917     pkt_t *                     pkt)
918 {
919     char *tok, *repbuf;
920
921     (void)action;       /* Quiet unused parameter warning */
922     (void)pkt;          /* Quiet unused parameter warning */
923
924     /*
925      * Copy the rep lines into the outgoing packet.
926      *
927      * If this line is a CONNECT, translate it
928      * Format is "CONNECT <tag> <handle> <tag> <handle> etc...
929      * Example:
930      *
931      *  CONNECT DATA 4 MESG 5 INDEX 6
932      *
933      * The tags are arbitrary.  The handles are in the DATA_FD pool.
934      * We need to map these to security streams and pass them back
935      * to the amanda server.  If the handle is -1, then we don't map.
936      */
937     repbuf = stralloc(as->repbuf);
938     amfree(as->rep_pkt.body);
939     pkt_init_empty(&as->rep_pkt, P_REP);
940     tok = strtok(repbuf, " ");
941     if (tok == NULL)
942         goto error;
943     if (strcmp(tok, "CONNECT") == 0) {
944         char *line, *nextbuf;
945
946         /* Save the entire line */
947         line = strtok(NULL, "\n");
948         /* Save the buf following the line */
949         nextbuf = strtok(NULL, "");
950
951         if (line == NULL || nextbuf == NULL)
952             goto error;
953
954         pkt_cat(&as->rep_pkt, "CONNECT");
955
956         /* loop over the id/handle pairs */
957         for (;;) {
958             /* id */
959             tok = strtok(line, " ");
960             line = NULL;        /* keep working from line */
961             if (tok == NULL)
962                 break;
963             pkt_cat(&as->rep_pkt, " %s", tok);
964
965             /* handle */
966             tok = strtok(NULL, " \n");
967             if (tok == NULL)
968                 goto error;
969             /* convert the handle into something the server can process */
970             pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
971         }
972         pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
973     } else {
974 error:
975         pkt_cat(&as->rep_pkt, "%s", as->repbuf);
976     }
977
978     /*
979      * We've setup our REP packet in as->rep_pkt.  Now move to the transmission
980      * state.
981      */
982     as->state = s_sendrep;
983     as->repretry = MAX_REP_RETRIES;
984     amfree(repbuf);
985     return (A_CONTINUE);
986 }
987
988 /*
989  * This is the state where we send the REP we just collected from our child.
990  */
991 static action_t
992 s_sendrep(
993     struct active_service *     as,
994     action_t                    action,
995     pkt_t *                     pkt)
996 {
997     (void)action;       /* Quiet unused parameter warning */
998     (void)pkt;          /* Quiet unused parameter warning */
999
1000     /*
1001      * Transmit it and move to the ack state.
1002      */
1003     do_sendpkt(as->security_handle, &as->rep_pkt);
1004     security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
1005     as->state = s_ackwait;
1006     return (A_PENDING);
1007 }
1008
1009 /*
1010  * This is the state in which we wait for the server to ACK the REP
1011  * we just sent it.
1012  */
1013 static action_t
1014 s_ackwait(
1015     struct active_service *     as,
1016     action_t                    action,
1017     pkt_t *                     pkt)
1018 {
1019     struct datafd_handle *dh;
1020     int npipes;
1021
1022     /*
1023      * If we got a timeout, try again, but eventually give up.
1024      */
1025     if (action == A_TIMEOUT) {
1026         if (--as->repretry > 0) {
1027             as->state = s_sendrep;
1028             return (A_CONTINUE);
1029         }
1030         dbprintf(("%s: timeout waiting for ACK for our REP\n",
1031             debug_prefix_time(NULL)));
1032         return (A_FINISH);
1033     }
1034 #ifdef AMANDAD_DEBUG
1035     dbprintf(("%s: received ACK, now opening streams\n",
1036         debug_prefix_time(NULL)));
1037 #endif
1038
1039     assert(action == A_RECVPKT);
1040
1041     if (pkt->type == P_REQ) {
1042         dbprintf(("%s: received dup P_REQ packet, resending REP\n",
1043                   debug_prefix_time(NULL)));
1044         as->state = s_sendrep;
1045         return (A_CONTINUE);
1046     }
1047
1048     if (pkt->type != P_ACK)
1049         return (A_SENDNAK);
1050
1051     /*
1052      * Got the ack, now open the pipes
1053      */
1054     for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1055         if (dh->netfd == NULL)
1056             continue;
1057         if (security_stream_accept(dh->netfd) < 0) {
1058             dbprintf(("%s: stream %ld accept failed: %s\n",
1059                 debug_prefix_time(NULL),
1060                 dh - &as->data[0], security_geterror(as->security_handle)));
1061             security_stream_close(dh->netfd);
1062             dh->netfd = NULL;
1063         }
1064         /* setup an event for reads from it */
1065         dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1066                                      process_readnetfd, dh);
1067
1068         security_stream_read(dh->netfd, process_writenetfd, dh);
1069
1070     }
1071
1072     /*
1073      * Pipes are open, so auth them.  Count them at the same time.
1074      */
1075     for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1076         if (dh->netfd == NULL)
1077             continue;
1078         if (security_stream_auth(dh->netfd) < 0) {
1079             security_stream_close(dh->netfd);
1080             dh->netfd = NULL;
1081             event_release(dh->ev_read);
1082             event_release(dh->ev_write);
1083             dh->ev_read = NULL;
1084             dh->ev_write = NULL;
1085         } else {
1086             npipes++;
1087         }
1088     }
1089
1090     /*
1091      * If no pipes are open, then we're done.  Otherwise, just start running.
1092      * The event handlers on all of the pipes will take it from here.
1093      */
1094 #ifdef AMANDAD_DEBUG
1095     dbprintf(("%s: at end of s_ackwait, npipes is %d\n",
1096         debug_prefix_time(NULL), npipes));
1097 #endif
1098     if (npipes == 0)
1099         return (A_FINISH);
1100     else {
1101         security_close(as->security_handle);
1102         as->security_handle = NULL;
1103         return (A_PENDING);
1104     }
1105 }
1106
1107 /*
1108  * Called when a repfd has received data
1109  */
1110 static void
1111 repfd_recv(
1112     void *      cookie)
1113 {
1114     struct active_service *as = cookie;
1115
1116     assert(as != NULL);
1117     assert(as->ev_repfd != NULL);
1118
1119     state_machine(as, A_RECVREP, NULL);
1120 }
1121
1122 /*
1123  * Called when a repfd has timed out
1124  */
1125 static void
1126 timeout_repfd(
1127     void *      cookie)
1128 {
1129     struct active_service *as = cookie;
1130
1131     assert(as != NULL);
1132     assert(as->ev_reptimeout != NULL);
1133
1134     state_machine(as, A_TIMEOUT, NULL);
1135 }
1136
1137 /*
1138  * Called when a handle has received data
1139  */
1140 static void
1141 protocol_recv(
1142     void *              cookie,
1143     pkt_t *             pkt,
1144     security_status_t   status)
1145 {
1146     struct active_service *as = cookie;
1147
1148     assert(as != NULL);
1149
1150     switch (status) {
1151     case S_OK:
1152         dbprintf(("%s: received %s pkt:\n<<<<<\n%s>>>>>\n",
1153             debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
1154         state_machine(as, A_RECVPKT, pkt);
1155         break;
1156     case S_TIMEOUT:
1157         dbprintf(("%s: timeout\n", debug_prefix_time(NULL)));
1158         state_machine(as, A_TIMEOUT, NULL);
1159         break;
1160     case S_ERROR:
1161         dbprintf(("%s: receive error: %s\n",
1162             debug_prefix_time(NULL), security_geterror(as->security_handle)));
1163         break;
1164     }
1165 }
1166
1167 /*
1168  * This is a generic relay function that just reads data from one of
1169  * the process's pipes and passes it up the equivalent security_stream_t
1170  */
1171 static void
1172 process_readnetfd(
1173     void *      cookie)
1174 {
1175     pkt_t nak;
1176     struct datafd_handle *dh = cookie;
1177     struct active_service *as = dh->as;
1178     ssize_t n;
1179
1180     nak.body = NULL;
1181
1182     do {
1183         n = read(dh->fd_read, as->databuf, SIZEOF(as->databuf));
1184     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
1185
1186     /*
1187      * Process has died.
1188      */
1189     if (n < 0) {
1190         pkt_init(&nak, P_NAK, "A ERROR data descriptor %d broken: %s\n",
1191             dh->fd_read, strerror(errno));
1192         goto sendnak;
1193     }
1194     /*
1195      * Process has closed the pipe.  Just remove this event handler.
1196      * If all pipes are closed, shut down this service.
1197      */
1198     if (n == 0) {
1199         event_release(dh->ev_read);
1200         dh->ev_read = NULL;
1201         if(dh->ev_write == NULL) {
1202             security_stream_close(dh->netfd);
1203             dh->netfd = NULL;
1204         }
1205         for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1206             if (dh->netfd != NULL)
1207                 return;
1208         }
1209         service_delete(as);
1210         return;
1211     }
1212     if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
1213         /* stream has croaked */
1214         pkt_init(&nak, P_NAK, "ERROR write error on stream %d: %s\n",
1215             security_stream_id(dh->netfd),
1216             security_stream_geterror(dh->netfd));
1217         goto sendnak;
1218     }
1219     return;
1220
1221 sendnak:
1222     do_sendpkt(as->security_handle, &nak);
1223     service_delete(as);
1224     amfree(nak.body);
1225 }
1226
1227 /*
1228  * This is a generic relay function that just read data from one of
1229  * the security_stream_t and passes it up the equivalent process's pipes
1230  */
1231 static void
1232 process_writenetfd(
1233     void *      cookie,
1234     void *      buf,
1235     ssize_t     size)
1236 {
1237     struct datafd_handle *dh;
1238
1239     assert(cookie != NULL);
1240     dh = cookie;
1241
1242     if (dh->fd_write <= 0) {
1243         dbprintf(("%s: process_writenetfd: dh->fd_write <= 0\n",
1244             debug_prefix_time(NULL)));
1245     } else if (size > 0) {
1246         fullwrite(dh->fd_write, buf, (size_t)size);
1247         security_stream_read(dh->netfd, process_writenetfd, dh);
1248     }
1249     else {
1250         aclose(dh->fd_write);
1251     }
1252 }
1253
1254
1255 /*
1256  * Convert a local stream handle (DATA_FD...) into something that
1257  * can be sent to the amanda server.
1258  *
1259  * Returns a number that should be sent to the server in the REP packet.
1260  */
1261 static int
1262 allocstream(
1263     struct active_service *     as,
1264     int                         handle)
1265 {
1266     struct datafd_handle *dh;
1267
1268     /* if the handle is -1, then we don't bother */
1269     if (handle < 0)
1270         return (-1);
1271
1272     /* make sure the handle's kosher */
1273     if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
1274         return (-1);
1275
1276     /* get a pointer into our handle array */
1277     dh = &as->data[handle - DATA_FD_OFFSET];
1278
1279     /* make sure we're not already using the net handle */
1280     if (dh->netfd != NULL)
1281         return (-1);
1282
1283     /* allocate a stream from the security layer and return */
1284     dh->netfd = security_stream_server(as->security_handle);
1285     if (dh->netfd == NULL) {
1286         dbprintf(("%s: couldn't open stream to server: %s\n",
1287             debug_prefix_time(NULL), security_geterror(as->security_handle)));
1288         return (-1);
1289     }
1290
1291     /*
1292      * convert the stream into a numeric id that can be sent to the
1293      * remote end.
1294      */
1295     return (security_stream_id(dh->netfd));
1296 }
1297
1298 /*
1299  * Create a new service instance
1300  */
1301 static struct active_service *
1302 service_new(
1303     security_handle_t * security_handle,
1304     const char *        cmd,
1305     const char *        arguments)
1306 {
1307     int i;
1308     int data_read[DATA_FD_COUNT + 1][2];
1309     int data_write[DATA_FD_COUNT + 1][2];
1310     struct active_service *as;
1311     pid_t pid;
1312     int newfd;
1313
1314     assert(security_handle != NULL);
1315     assert(cmd != NULL);
1316     assert(arguments != NULL);
1317
1318     /* a plethora of pipes */
1319     for (i = 0; i < DATA_FD_COUNT + 1; i++) {
1320         if (pipe(data_read[i]) < 0) {
1321             error("pipe: %s\n", strerror(errno));
1322             /*NOTREACHED*/
1323         }
1324         if (pipe(data_write[i]) < 0) {
1325             error("pipe: %s\n", strerror(errno));
1326             /*NOTREACHED*/
1327         }
1328     }
1329
1330     switch(pid = fork()) {
1331     case -1:
1332         error("could not fork service %s: %s\n", cmd, strerror(errno));
1333         /*NOTREACHED*/
1334     default:
1335         /*
1336          * The parent.  Close the far ends of our pipes and return.
1337          */
1338         as = alloc(SIZEOF(*as));
1339         as->cmd = stralloc(cmd);
1340         as->arguments = stralloc(arguments);
1341         as->security_handle = security_handle;
1342         as->state = NULL;
1343         as->pid = pid;
1344         as->send_partial_reply = 0;
1345         if(strcmp(cmd+(strlen(cmd)-8), "sendsize") == 0) {
1346             g_option_t *g_options;
1347             char *option_str, *p;
1348
1349             option_str = stralloc(as->arguments+8);
1350             p = strchr(option_str,'\n');
1351             if(p) *p = '\0';
1352
1353             g_options = parse_g_options(option_str, 1);
1354             if(am_has_feature(g_options->features, fe_partial_estimate)) {
1355                 as->send_partial_reply = 1;
1356             }
1357             free_g_options(g_options);
1358             amfree(option_str);
1359         }
1360
1361         /* write to the request pipe */
1362         aclose(data_read[0][0]);
1363         as->reqfd = data_read[0][1];
1364
1365         /*
1366          * read from the reply pipe
1367          */
1368         as->repfd = data_write[0][0];
1369         aclose(data_write[0][1]);
1370         as->ev_repfd = NULL;
1371         as->repbuf = NULL;
1372         as->repbufsize = 0;
1373         as->bufsize = 0;
1374         as->repretry = 0;
1375         as->rep_pkt.body = NULL;
1376
1377         /*
1378          * read from the rest of the general-use pipes
1379          * (netfds are opened as the client requests them)
1380          */
1381         for (i = 0; i < DATA_FD_COUNT; i++) {
1382             aclose(data_read[i + 1][1]);
1383             aclose(data_write[i + 1][0]);
1384             as->data[i].fd_read = data_read[i + 1][0];
1385             as->data[i].fd_write = data_write[i + 1][1];
1386             as->data[i].ev_read = NULL;
1387             as->data[i].ev_write = NULL;
1388             as->data[i].netfd = NULL;
1389             as->data[i].as = as;
1390         }
1391
1392         /* add it to the service queue */
1393         /* increment the active service count */
1394         TAILQ_INSERT_TAIL(&serviceq.tailq, as, tq);
1395         serviceq.qlength++;
1396
1397         return (as);
1398     case 0:
1399         /*
1400          * The child.  Put our pipes in their advertised locations
1401          * and start up.
1402          */
1403 #ifdef FORCE_USERID
1404         seteuid((uid_t)0);
1405         setuid(client_uid);
1406 #endif
1407
1408         /*
1409          * The data stream is stdin in the new process
1410          */
1411         if (dup2(data_read[0][0], 0) < 0) {
1412             error("dup %d to %d failed: %s\n", data_read[0][0], 0,
1413                 strerror(errno));
1414             /*NOTREACHED*/
1415         }
1416         aclose(data_read[0][0]);
1417         aclose(data_read[0][1]);
1418
1419         /*
1420          * The reply stream is stdout
1421          */
1422         if (dup2(data_write[0][1], 1) < 0) {
1423             error("dup %d to %d failed: %s\n", data_write[0][1], 1,
1424                 strerror(errno));
1425         }
1426         aclose(data_write[0][0]);
1427         aclose(data_write[0][1]);
1428
1429         for (i = 0; i < DATA_FD_COUNT; i++) {
1430             aclose(data_read[i + 1][0]);
1431             aclose(data_write[i + 1][1]);
1432         }
1433
1434         /*
1435          *  Make sure they are not open in the range DATA_FD_OFFSET to
1436          *      DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1
1437          */
1438         for (i = 0; i < DATA_FD_COUNT; i++) {
1439             while(data_read[i + 1][1] >= DATA_FD_OFFSET &&
1440                   data_read[i + 1][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1441                 newfd = dup(data_read[i + 1][1]);
1442                 if(newfd == -1)
1443                     error("Can't dup out off DATA_FD range");
1444                 data_read[i + 1][1] = newfd;
1445             }
1446             while(data_write[i + 1][0] >= DATA_FD_OFFSET &&
1447                   data_write[i + 1][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1448                 newfd = dup(data_write[i + 1][0]);
1449                 if(newfd == -1)
1450                     error("Can't dup out off DATA_FD range");
1451                 data_write[i + 1][0] = newfd;
1452             }
1453         }
1454         for (i = 0; i < DATA_FD_COUNT*2; i++)
1455             close(DATA_FD_OFFSET + i);
1456
1457         /*
1458          * The rest start at the offset defined in amandad.h, and continue
1459          * through the internal defined.
1460          */
1461         for (i = 0; i < DATA_FD_COUNT; i++) {
1462             if (dup2(data_read[i + 1][1], i*2 + DATA_FD_OFFSET) < 0) {
1463                 error("dup %d to %d failed: %s\n", data_read[i + 1][1],
1464                     i + DATA_FD_OFFSET, strerror(errno));
1465             }
1466             aclose(data_read[i + 1][1]);
1467
1468             if (dup2(data_write[i + 1][0], i*2 + 1 + DATA_FD_OFFSET) < 0) {
1469                 error("dup %d to %d failed: %s\n", data_write[i + 1][0],
1470                     i + DATA_FD_OFFSET, strerror(errno));
1471             }
1472             aclose(data_write[i + 1][0]);
1473         }
1474
1475         /* close all unneeded fd */
1476         safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
1477         close(2);
1478
1479         execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env());
1480         error("could not exec service %s: %s\n", cmd, strerror(errno));
1481         /*NOTREACHED*/
1482     }
1483     return NULL;
1484 }
1485
1486 /*
1487  * Unallocate a service instance
1488  */
1489 static void
1490 service_delete(
1491     struct active_service *     as)
1492 {
1493     int i;
1494     struct datafd_handle *dh;
1495
1496 #ifdef AMANDAD_DEBUG
1497         dbprintf(("%s: closing service: %s\n",
1498             debug_prefix_time(NULL), (as->cmd)?as->cmd:"??UNKONWN??"));
1499 #endif
1500
1501     assert(as != NULL);
1502
1503     assert(as->cmd != NULL);
1504     amfree(as->cmd);
1505
1506     assert(as->arguments != NULL);
1507     amfree(as->arguments);
1508
1509     if (as->reqfd != -1)
1510         aclose(as->reqfd);
1511     if (as->repfd != -1)
1512         aclose(as->repfd);
1513
1514     if (as->ev_repfd != NULL)
1515         event_release(as->ev_repfd);
1516     if (as->ev_reptimeout != NULL)
1517         event_release(as->ev_reptimeout);
1518
1519     for (i = 0; i < DATA_FD_COUNT; i++) {
1520         dh = &as->data[i];
1521
1522         aclose(dh->fd_read);
1523         aclose(dh->fd_write);
1524
1525         if (dh->netfd != NULL)
1526             security_stream_close(dh->netfd);
1527
1528         if (dh->ev_read != NULL)
1529             event_release(dh->ev_read);
1530         if (dh->ev_write != NULL)
1531             event_release(dh->ev_write);
1532     }
1533
1534     if (as->security_handle != NULL)
1535         security_close(as->security_handle);
1536
1537     assert(as->pid > 0);
1538     kill(as->pid, SIGTERM);
1539     waitpid(as->pid, NULL, WNOHANG);
1540
1541     TAILQ_REMOVE(&serviceq.tailq, as, tq);
1542     assert(serviceq.qlength > 0);
1543     serviceq.qlength--;
1544
1545     amfree(as->cmd);
1546     amfree(as->arguments);
1547     amfree(as->repbuf);
1548     amfree(as->rep_pkt.body);
1549     amfree(as);
1550
1551     if(exit_on_qlength == 0 && serviceq.qlength == 0) {
1552         dbclose();
1553         exit(0);
1554     }
1555 }
1556
1557 /*
1558  * Like 'fullwrite', but does the work in a child process so pipelines
1559  * do not hang.
1560  */
1561 static int
1562 writebuf(
1563     struct active_service *     as,
1564     const void *                bufp,
1565     size_t                      size)
1566 {
1567     pid_t pid;
1568     ssize_t    writesize;
1569
1570     switch (pid=fork()) {
1571     case -1:
1572         break;
1573
1574     default:
1575         waitpid(pid, NULL, WNOHANG);
1576         return 0;                       /* this is the parent */
1577
1578     case 0:                             /* this is the child */
1579         close(as->repfd);
1580         writesize = fullwrite(as->reqfd, bufp, size);
1581         exit(writesize != (ssize_t)size);
1582         /* NOTREACHED */
1583     }
1584     return -1;
1585 }
1586
1587 static ssize_t
1588 do_sendpkt(
1589     security_handle_t * handle,
1590     pkt_t *             pkt)
1591 {
1592     dbprintf(("%s: sending %s pkt:\n<<<<<\n%s>>>>>\n",
1593         debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
1594     if (handle)
1595         return security_sendpkt(handle, pkt);
1596     else
1597         return 1;
1598 }
1599
1600 #ifdef AMANDAD_DEBUG
1601 /*
1602  * Convert a state into a string
1603  */
1604 static const char *
1605 state2str(
1606     state_t     state)
1607 {
1608     static const struct {
1609         state_t state;
1610         const char str[13];
1611     } states[] = {
1612 #define X(state)        { state, stringize(state) }
1613         X(s_sendack),
1614         X(s_repwait),
1615         X(s_processrep),
1616         X(s_sendrep),
1617         X(s_ackwait),
1618 #undef X
1619     };
1620     int i;
1621
1622     for (i = 0; i < (int)(sizeof(states) / sizeof(states[0])); i++)
1623         if (state == states[i].state)
1624             return (states[i].str);
1625     return ("INVALID STATE");
1626 }
1627
1628 /*
1629  * Convert an action into a string
1630  */
1631 static const char *
1632 action2str(
1633     action_t    action)
1634 {
1635     static const struct {
1636         action_t action;
1637         const char str[12];
1638     } actions[] = {
1639 #define X(action)       { action, stringize(action) }
1640         X(A_START),
1641         X(A_RECVPKT),
1642         X(A_RECVREP),
1643         X(A_PENDING),
1644         X(A_FINISH),
1645         X(A_CONTINUE),
1646         X(A_SENDNAK),
1647         X(A_TIMEOUT),
1648 #undef X
1649     };
1650     int i;
1651
1652     for (i = 0; i < (int)(sizeof(actions) / sizeof(actions[0])); i++)
1653         if (action == actions[i].action)
1654             return (actions[i].str);
1655     return ("UNKNOWN ACTION");
1656 }
1657 #endif  /* AMANDAD_DEBUG */