25a51830134b5f380446937606e27af0a1ffa6c5
[debian/amanda] / amandad-src / amandad.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1999 University of Maryland at College Park
4  * All Rights Reserved.
5  *
6  * Permission to use, copy, modify, distribute, and sell this software and its
7  * documentation for any purpose is hereby granted without fee, provided that
8  * the above copyright notice appear in all copies and that both that
9  * copyright notice and this permission notice appear in supporting
10  * documentation, and that the name of U.M. not be used in advertising or
11  * publicity pertaining to distribution of the software without specific,
12  * written prior permission.  U.M. makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as is"
14  * without express or implied warranty.
15  *
16  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22  *
23  * Authors: the Amanda Development Team.  Its members are listed in a
24  * file named AUTHORS, in the root directory of this distribution.
25  */
26
27 /*
28  * $Id: amandad.c,v 1.18.2.1 2006/09/15 17:18:06 martinea Exp $
29  *
30  * handle client-host side of Amanda network communications, including
31  * security checks, execution of the proper service, and acking the
32  * master side
33  */
34
35 /*#define       AMANDAD_DEBUG*/
36
37 #include "amanda.h"
38 #include "amandad.h"
39 #include "clock.h"
40 #include "event.h"
41 #include "amfeatures.h"
42 #include "packet.h"
43 #include "version.h"
44 #include "queue.h"
45 #include "security.h"
46 #include "stream.h"
47 #include "util.h"
48
49 #define REP_TIMEOUT     (6*60*60)       /* secs for service to reply */
50 #define ACK_TIMEOUT     10              /* XXX should be configurable */
51 #define MAX_REP_RETRIES 5
52
53 /*
54  * These are the actions for entering the state machine
55  */
56 typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
57     A_SENDNAK, A_TIMEOUT } action_t;
58
59 /*
60  * This is a state in the state machine.  It is a function pointer to
61  * the function that actually implements the state.
62  */
63 struct active_service;
64 typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
65
66 /*
67  * This structure describes an active running service.
68  *
69  * An active service is something running that we have received
70  * a request for.  This structure holds info on that service, including
71  * file descriptors for data, etc, as well as the security handle
72  * for communications with the amanda server.
73  */
74 struct active_service {
75     char *cmd;                          /* name of command we ran */
76     char *arguments;                    /* arguments we sent it */
77     security_handle_t *security_handle; /* remote server */
78     state_t state;                      /* how far this has progressed */
79     pid_t pid;                          /* pid of subprocess */
80     int send_partial_reply;             /* send PREP packet */
81     int reqfd;                          /* pipe to write requests */
82     int repfd;                          /* pipe to read replies */
83     event_handle_t *ev_repfd;           /* read event handle for repfd */
84     event_handle_t *ev_reptimeout;      /* timeout for rep data */
85     pkt_t rep_pkt;                      /* rep packet we're sending out */
86     char *repbuf;                       /* buffer to read the rep into */
87     size_t bufsize;                     /* length of repbuf */
88     size_t repbufsize;                  /* length of repbuf */
89     int repretry;                       /* times we'll retry sending the rep */
90     /*
91      * General user streams to the process, and their equivalent
92      * network streams.
93      */
94     struct datafd_handle {
95         int fd_read;                    /* pipe to child process */
96         int fd_write;                   /* pipe to child process */
97         event_handle_t *ev_read;        /* it's read event handle */
98         event_handle_t *ev_write;       /* it's write event handle */
99         security_stream_t *netfd;       /* stream to amanda server */
100         struct active_service *as;      /* pointer back to our enclosure */
101     } data[DATA_FD_COUNT];
102     char databuf[NETWORK_BLOCK_BYTES];  /* buffer to relay netfd data in */
103     TAILQ_ENTRY(active_service) tq;     /* queue handle */
104 };
105
106 /* 
107  * Here are the services that we allow.
108  */
109 static struct services {
110     char *name;
111     int  active;
112 } services[] = {
113     { "noop", 1 },
114     { "sendsize", 1 },
115     { "sendbackup", 1 },
116     { "selfcheck", 1 },
117     { "amindexd", 0 },
118     { "amidxtaped", 0 }
119 };
120 #define NSERVICES       (int)(sizeof(services) / sizeof(services[0]))
121
122 /*
123  * Queue of outstanding requests that we are running.
124  */
125 static struct {
126     TAILQ_HEAD(, active_service) tailq;
127     int qlength;
128 } serviceq = {
129     TAILQ_HEAD_INITIALIZER(serviceq.tailq), 0
130 };
131
132 /*
133  * Data for dbmalloc to check for memory leaks
134  */
135 #ifdef USE_DBMALLOC
136 static struct {
137     struct {
138         unsigned long size, hist;
139     } start, end;
140 } dbmalloc_info;
141 #endif
142
143 static int wait_30s = 1;
144 static int exit_on_qlength = 1;
145 static char *auth = NULL;
146
147 int main(int argc, char **argv);
148
149 static int allocstream(struct active_service *, int);
150 static void exit_check(void *);
151 static void protocol_accept(security_handle_t *, pkt_t *);
152 static void state_machine(struct active_service *, action_t, pkt_t *);
153
154 static action_t s_sendack(struct active_service *, action_t, pkt_t *);
155 static action_t s_repwait(struct active_service *, action_t, pkt_t *);
156 static action_t s_processrep(struct active_service *, action_t, pkt_t *);
157 static action_t s_sendrep(struct active_service *, action_t, pkt_t *);
158 static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
159
160 static void repfd_recv(void *);
161 static void timeout_repfd(void *);
162 static void protocol_recv(void *, pkt_t *, security_status_t);
163 static void process_readnetfd(void *);
164 static void process_writenetfd(void *, void *, ssize_t);
165 static struct active_service *service_new(security_handle_t *,
166     const char *, const char *);
167 static void service_delete(struct active_service *);
168 static int writebuf(struct active_service *, const void *, size_t);
169 static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
170
171 static void child_signal(int signal);
172
173 #ifdef AMANDAD_DEBUG
174 static const char *state2str(state_t);
175 static const char *action2str(action_t);
176 #endif
177
178 /*
179  * Harvests defunct processes...
180  */
181
182 static void
183 child_signal(
184     int         signal)
185 {
186     pid_t       rp;
187
188     (void)signal;       /* Quite compiler warning */
189     /*
190      * Reap and child status and promptly ignore since we don't care...
191      */
192     do {
193         rp = waitpid(-1, NULL, WNOHANG);
194     } while (rp > 0);
195 }
196
197 int
198 main(
199     int         argc,
200     char **     argv)
201 {
202     int i, j;
203     int have_services;
204     int in, out;
205     const security_driver_t *secdrv;
206     int no_exit = 0;
207     struct sigaction act, oact;
208     char *pgm = "amandad";              /* in case argv[0] is not set */
209 #if defined(AMANDAD_DEBUG) && defined(USE_REUSEADDR)
210     const int on = 1;
211     int r;
212 #endif
213
214     safe_fd(-1, 0);
215     safe_cd();
216
217     /*
218      * When called via inetd, it is not uncommon to forget to put the
219      * argv[0] value on the config line.  On some systems (e.g. Solaris)
220      * this causes argv and/or argv[0] to be NULL, so we have to be
221      * careful getting our name.
222      */
223     if ((argv == NULL) || (argv[0] == NULL)) {
224             pgm = "amandad";            /* in case argv[0] is not set */
225     } else {
226             pgm = basename(argv[0]);    /* Strip of leading path get debug name */
227     }
228     set_pname(pgm);
229     dbopen(DBG_SUBDIR_AMANDAD);
230
231     if(argv == NULL) {
232         error("argv == NULL\n");
233         /*NOTREACHED*/
234     }
235
236     /* Don't die when child closes pipe */
237     signal(SIGPIPE, SIG_IGN);
238
239     /* Tell me when a child exits or dies... */
240     act.sa_handler = child_signal;
241     sigemptyset(&act.sa_mask);
242     act.sa_flags = 0;
243     if(sigaction(SIGCHLD, &act, &oact) != 0) {
244         error("error setting SIGCHLD handler: %s", strerror(errno));
245         /*NOTREACHED*/
246     }
247
248 #ifdef USE_DBMALLOC
249     dbmalloc_info.start.size = malloc_inuse(&dbmalloc_info.start.hist);
250 #endif
251
252     erroutput_type = (ERR_INTERACTIVE|ERR_SYSLOG);
253
254 #ifdef FORCE_USERID
255     /* we'd rather not run as root */
256     if (geteuid() == 0) {
257         if(client_uid == (uid_t) -1) {
258             error("error [cannot find user %s in passwd file]\n", CLIENT_LOGIN);
259             /*NOTREACHED*/
260         }
261         initgroups(CLIENT_LOGIN, client_gid);
262         setgid(client_gid);
263         setegid(client_gid);
264         seteuid(client_uid);
265     }
266 #endif  /* FORCE_USERID */
267
268     /*
269      * ad-hoc argument parsing
270      *
271      * We accept        -auth=[authentication type]
272      *                  -no-exit
273 #ifdef AMANDAD_DEBUG
274      *                  -tcp=[port]
275      *                  -udp=[port]
276 #endif
277      * We also add a list of services that amandad can launch
278      */
279     secdrv = NULL;
280     in = 0; out = 1;            /* default to stdin/stdout */
281     have_services = 0;
282     for (i = 1; i < argc; i++) {
283         /*
284          * accept -krb4 as an alias for -auth=krb4 (for compatibility)
285          */
286         if (strcmp(argv[i], "-krb4") == 0) {
287             argv[i] = "-auth=krb4";
288             /* FALLTHROUGH */
289             auth = "krb4";
290         }
291
292         /*
293          * Get a driver for a security type specified after -auth=
294          */
295         else if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
296             argv[i] += strlen("-auth=");
297             secdrv = security_getdriver(argv[i]);
298             auth = argv[i];
299             if (secdrv == NULL) {
300                 error("no driver for security type '%s'\n", argv[i]);
301                 /*NOTREACHED*/
302             }
303             continue;
304         }
305
306         /*
307          * If -no-exit is specified, always run even after requests have
308          * been satisfied.
309          */
310         else if (strcmp(argv[i], "-no-exit") == 0) {
311             no_exit = 1;
312             continue;
313         }
314
315 #ifdef AMANDAD_DEBUG
316         /*
317          * Allow us to directly bind to a udp port for debugging.
318          * This may only apply to some security types.
319          */
320         else if (strncmp(argv[i], "-udp=", strlen("-udp=")) == 0) {
321             struct sockaddr_in sin;
322
323             argv[i] += strlen("-udp=");
324             in = out = socket(AF_INET, SOCK_DGRAM, 0);
325             if (in < 0) {
326                 error("can't create dgram socket: %s\n", strerror(errno));
327                 /*NOTREACHED*/
328             }
329 #ifdef USE_REUSEADDR
330             r = setsockopt(in, SOL_SOCKET, SO_REUSEADDR,
331                 (void *)&on, (socklen_t)sizeof(on));
332             if (r < 0) {
333                 dbprintf(("%s: amandad: setsockopt(SO_REUSEADDR) failed: %s\n",
334                           debug_prefix(NULL),
335                           strerror(errno)));
336             }
337 #endif
338
339             sin.sin_family = (sa_family_t)AF_INET;
340             sin.sin_addr.s_addr = INADDR_ANY;
341             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
342             if (bind(in, (struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
343                 error("can't bind to port %d: %s\n", atoi(argv[i]),
344                     strerror(errno));
345                 /*NOTREACHED*/
346             }
347         }
348         /*
349          * Ditto for tcp ports.
350          */
351         else if (strncmp(argv[i], "-tcp=", strlen("-tcp=")) == 0) {
352             struct sockaddr_in sin;
353             int sock;
354             socklen_t n;
355
356             argv[i] += strlen("-tcp=");
357             sock = socket(AF_INET, SOCK_STREAM, 0);
358             if (sock < 0) {
359                 error("can't create tcp socket: %s\n", strerror(errno));
360                 /*NOTREACHED*/
361             }
362             n = 1;
363 #ifdef USE_REUSEADDR
364             r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
365                 (void *)&on, (socklen_t)sizeof(on));
366             if (r < 0) {
367                 dbprintf(("%s: amandad: setsockopt(SO_REUSEADDR) failed: %s\n",
368                           debug_prefix(NULL),
369                           strerror(errno)));
370             }
371 #endif
372             setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
373                 (void *)&n, (socklen_t)sizeof(n));
374             sin.sin_family = (sa_family_t)AF_INET;
375             sin.sin_addr.s_addr = INADDR_ANY;
376             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
377             if (bind(sock, (struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
378                 error("can't bind to port %d: %s\n", atoi(argv[i]),
379                     strerror(errno));
380                 /*NOTREACHED*/
381             }
382             listen(sock, 10);
383             n = (socklen_t)sizeof(sin);
384             in = out = accept(sock, (struct sockaddr *)&sin, &n);
385         }
386 #endif
387         /*
388          * It must be a service name
389          */
390         else {
391             /* clear all services */
392             if(!have_services) {
393                 for (j = 0; j < (int)NSERVICES; j++)
394                     services[j].active = 0;
395             }
396             have_services = 1;
397
398             if(strcmp(argv[i],"amdump") == 0) {
399                 services[0].active = 1;
400                 services[1].active = 1;
401                 services[2].active = 1;
402                 services[3].active = 1;
403             }
404             else {
405                 for (j = 0; j < (int)NSERVICES; j++)
406                     if (strcmp(services[j].name, argv[i]) == 0)
407                         break;
408                 if (j == (int)NSERVICES) {
409                     dbprintf(("%s: %s: invalid service\n",
410                               debug_prefix_time(NULL), argv[i]));
411                     exit(1);
412                 }
413                 services[j].active = 1;
414             }
415         }
416     }
417
418     /*
419      * If no security type specified, use BSD
420      */
421     if (secdrv == NULL) {
422         secdrv = security_getdriver("BSD");
423         auth = "bsd";
424         if (secdrv == NULL) {
425             error("no driver for default security type 'BSD'\n");
426             /*NOTREACHED*/
427         }
428     }
429
430     if(strcasecmp(auth, "rsh") == 0 ||
431        strcasecmp(auth, "ssh") == 0 ||
432        strcasecmp(auth, "bsdtcp") == 0) {
433         wait_30s = 0;
434         exit_on_qlength = 1;
435     }
436
437     /* initialize */
438
439     startclock();
440
441     dbprintf(("%s: version %s\n", get_pname(), version()));
442     for (i = 0; version_info[i] != NULL; i++) {
443         dbprintf(("%s: %s", debug_prefix(NULL), version_info[i]));
444     }
445
446     if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
447         dbprintf(("%s: WARNING: argv[0] not defined: check inetd.conf\n",
448                   debug_prefix(NULL)));
449     }
450
451     /*
452      * Schedule to call protocol_accept() when new security handles
453      * are created on stdin.
454      */
455     security_accept(secdrv, in, out, protocol_accept);
456
457     /*
458      * Schedule an event that will try to exit every 30 seconds if there
459      * are no requests outstanding.
460      */
461     if(wait_30s)
462         (void)event_register((event_id_t)30, EV_TIME, exit_check, &no_exit);
463
464     /*
465      * Call event_loop() with an arg of 0, telling it to block until all
466      * events are completed.
467      */
468     event_loop(0);
469
470     close(in);
471     close(out);
472     dbclose();
473     return(0);
474 }
475
476 /*
477  * This runs periodically and checks to see if we have any active services
478  * still running.  If we don't, then we quit.
479  */
480 static void
481 exit_check(
482     void *      cookie)
483 {
484     int no_exit;
485
486     assert(cookie != NULL);
487     no_exit = *(int *)cookie;
488
489     /*
490      * If things are still running, then don't exit.
491      */
492     if (serviceq.qlength > 0)
493         return;
494
495     /*
496      * If the caller asked us to never exit, then we're done
497      */
498     if (no_exit)
499         return;
500
501 #ifdef USE_DBMALLOC
502     dbmalloc_info.end.size = malloc_inuse(&dbmalloc_info.end.hist);
503
504     if (dbmalloc_info.start.size != dbmalloc_info.end.size) {
505         malloc_list(dbfd(), dbmalloc_info.start.hist,
506             dbmalloc_info.end.hist);
507     }
508 #endif
509
510     dbclose();
511     exit(0);
512 }
513
514 /*
515  * Handles new incoming protocol handles.  This is a callback for
516  * security_accept(), which gets called when new handles are detected.
517  */
518 static void
519 protocol_accept(
520     security_handle_t * handle,
521     pkt_t *             pkt)
522 {
523     pkt_t pkt_out;
524     struct active_service *as;
525     char *pktbody, *tok, *service, *arguments;
526     char *service_path = NULL;
527     int i;
528
529     pkt_out.body = NULL;
530
531     /*
532      * If handle is NULL, then the connection is closed.
533      */
534     if(handle == NULL) {
535         return;
536     }
537
538     /*
539      * If pkt is NULL, then there was a problem with the new connection.
540      */
541     if (pkt == NULL) {
542         dbprintf(("%s: accept error: %s\n",
543             debug_prefix_time(NULL), security_geterror(handle)));
544         pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
545         do_sendpkt(handle, &pkt_out);
546         amfree(pkt_out.body);
547         security_close(handle);
548         return;
549     }
550
551     dbprintf(("%s: accept recv %s pkt:\n<<<<<\n%s>>>>>\n",
552         debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
553
554     /*
555      * If this is not a REQ packet, just forget about it.
556      */
557     if (pkt->type != P_REQ) {
558         dbprintf(("%s: received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n",
559             debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
560         security_close(handle);
561         return;
562     }
563
564     pktbody = service = arguments = NULL;
565     as = NULL;
566
567     /*
568      * Parse out the service and arguments
569      */
570
571     pktbody = stralloc(pkt->body);
572
573     tok = strtok(pktbody, " ");
574     if (tok == NULL)
575         goto badreq;
576     if (strcmp(tok, "SERVICE") != 0)
577         goto badreq;
578
579     tok = strtok(NULL, " \n");
580     if (tok == NULL)
581         goto badreq;
582     service = stralloc(tok);
583
584     /* we call everything else 'arguments' */
585     tok = strtok(NULL, "");
586     if (tok == NULL)
587         goto badreq;
588     arguments = stralloc(tok);
589
590     /* see if it's one we allow */
591     for (i = 0; i < (int)NSERVICES; i++)
592         if (services[i].active == 1 && strcmp(services[i].name, service) == 0)
593             break;
594     if (i == (int)NSERVICES) {
595         dbprintf(("%s: %s: invalid service\n",
596             debug_prefix_time(NULL), service));
597         pkt_init(&pkt_out, P_NAK, "ERROR %s: invalid service\n", service);
598         goto send_pkt_out;
599     }
600
601     service_path = vstralloc(libexecdir, "/", service, versionsuffix(), NULL);
602     if (access(service_path, X_OK) < 0) {
603         dbprintf(("%s: can't execute %s: %s\n",
604             debug_prefix_time(NULL), service_path, strerror(errno)));
605             pkt_init(&pkt_out, P_NAK,
606                      "ERROR execute access to \"%s\" denied\n",
607                      service_path);
608         goto send_pkt_out;
609     }
610
611     /* see if its already running */
612     for (as = TAILQ_FIRST(&serviceq.tailq); as != NULL;
613         as = TAILQ_NEXT(as, tq)) {
614             if (strcmp(as->cmd, service_path) == 0 &&
615                 strcmp(as->arguments, arguments) == 0) {
616                     dbprintf(("%s: %s %s: already running, acking req\n",
617                         debug_prefix_time(NULL), service, arguments));
618                     pkt_init(&pkt_out, P_ACK, "");
619                     goto send_pkt_out_no_delete;
620             }
621     }
622
623     /*
624      * create a new service instance, and send the arguments down
625      * the request pipe.
626      */
627     dbprintf(("%s: creating new service: %s\n%s\n",
628         debug_prefix_time(NULL), service, arguments));
629     as = service_new(handle, service_path, arguments);
630     if (writebuf(as, arguments, strlen(arguments)) < 0) {
631         const char *errmsg = strerror(errno);
632         dbprintf(("%s: error sending arguments to %s: %s\n",
633             debug_prefix_time(NULL), service, errmsg));
634         pkt_init(&pkt_out, P_NAK, "ERROR error writing arguments to %s: %s\n",
635             service, errmsg);
636         goto send_pkt_out;
637     }
638     aclose(as->reqfd);
639
640     amfree(pktbody);
641     amfree(service);
642     amfree(service_path);
643     amfree(arguments);
644
645     /*
646      * Move to the sendack state, and start up the state
647      * machine.
648      */
649     as->state = s_sendack;
650     state_machine(as, A_START, NULL);
651     return;
652
653 badreq:
654     pkt_init(&pkt_out, P_NAK, "ERROR invalid REQ\n");
655     dbprintf(("%s: received invalid %s packet:\n<<<<<\n%s>>>>>\n\n",
656         debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
657
658 send_pkt_out:
659     if(as)
660         service_delete(as);
661 send_pkt_out_no_delete:
662     amfree(pktbody);
663     amfree(service_path);
664     amfree(service);
665     amfree(arguments);
666     do_sendpkt(handle, &pkt_out);
667     security_close(handle);
668     amfree(pkt_out.body);
669 }
670
671 /*
672  * Handles incoming protocol packets.  Routes responses to the proper
673  * running service.
674  */
675 static void
676 state_machine(
677     struct active_service *     as,
678     action_t                    action,
679     pkt_t *                     pkt)
680 {
681     action_t retaction;
682     state_t curstate;
683     pkt_t nak;
684
685 #ifdef AMANDAD_DEBUG
686     dbprintf(("%s: state_machine: %p entering\n",
687         debug_prefix_time(NULL), as));
688 #endif
689     for (;;) {
690         curstate = as->state;
691 #ifdef AMANDAD_DEBUG
692         dbprintf(("%s: state_machine: %p curstate=%s action=%s\n",
693             debug_prefix_time(NULL), as,
694             state2str(curstate), action2str(action)));
695 #endif
696         retaction = (*curstate)(as, action, pkt);
697 #ifdef AMANDAD_DEBUG
698         dbprintf(("%s: state_machine: %p curstate=%s returned %s (nextstate=%s)\n",
699             debug_prefix_time(NULL),
700             as, state2str(curstate), action2str(retaction),
701             state2str(as->state)));
702 #endif
703
704         switch (retaction) {
705         /*
706          * State has queued up and is now blocking on input.
707          */
708         case A_PENDING:
709 #ifdef AMANDAD_DEBUG
710             dbprintf(("%s: state_machine: %p leaving (A_PENDING)\n",
711                 debug_prefix_time(NULL), as));
712 #endif
713             return;
714
715         /*
716          * service has switched states.  Loop.
717          */
718         case A_CONTINUE:
719             break;
720
721         /*
722          * state has determined that the packet it received was bogus.
723          * Send a nak, and return.
724          */
725         case A_SENDNAK:
726             dbprintf(("%s: received unexpected %s packet\n",
727                 debug_prefix_time(NULL), pkt_type2str(pkt->type)));
728             dbprintf(("<<<<<\n%s----\n\n", pkt->body));
729             pkt_init(&nak, P_NAK, "ERROR unexpected packet type %s\n",
730                 pkt_type2str(pkt->type));
731             do_sendpkt(as->security_handle, &nak);
732             amfree(nak.body);
733 #ifdef AMANDAD_DEBUG
734             dbprintf(("%s: state_machine: %p leaving (A_SENDNAK)\n",
735                 debug_prefix_time(NULL), as));
736 #endif
737             return;
738
739         /*
740          * Service is done.  Remove it and finish.
741          */
742         case A_FINISH:
743             service_delete(as);
744 #ifdef AMANDAD_DEBUG
745             dbprintf(("%s: state_machine: %p leaving (A_FINISH)\n",
746                 debug_prefix_time(NULL), as));
747 #endif
748             return;
749
750         default:
751             assert(0);
752             break;
753         }
754     }
755     /*NOTREACHED*/
756 }
757
758 /*
759  * This state just sends an ack.  After that, we move to the repwait
760  * state to wait for REP data to arrive from the subprocess.
761  */
762 static action_t
763 s_sendack(
764     struct active_service *     as,
765     action_t                    action,
766     pkt_t *                     pkt)
767 {
768     pkt_t ack;
769
770     (void)action;       /* Quiet unused parameter warning */
771     (void)pkt;          /* Quiet unused parameter warning */
772
773     pkt_init(&ack, P_ACK, "");
774     if (do_sendpkt(as->security_handle, &ack) < 0) {
775         dbprintf(("%s: error sending ACK: %s\n",
776             debug_prefix_time(NULL), security_geterror(as->security_handle)));
777         amfree(ack.body);
778         return (A_FINISH);
779     }
780     amfree(ack.body);
781
782     /*
783      * move to the repwait state
784      * Setup a listener for data on the reply fd, but also
785      * listen for packets over the wire, as the server may
786      * poll us if we take a long time.
787      * Setup a timeout that will fire if it takes too long to
788      * receive rep data.
789      */
790     as->state = s_repwait;
791     as->ev_repfd = event_register((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
792     as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
793         timeout_repfd, as);
794     security_recvpkt(as->security_handle, protocol_recv, as, -1);
795     return (A_PENDING);
796 }
797
798 /*
799  * This is the repwait state.  We have responded to the initial REQ with
800  * an ACK, and we are now waiting for the process we spawned to pass us 
801  * data to send in a REP.
802  */
803 static action_t
804 s_repwait(
805     struct active_service *     as,
806     action_t                    action,
807     pkt_t *                     pkt)
808 {
809     ssize_t n;
810     char *repbuf_temp;
811
812     /*
813      * We normally shouldn't receive any packets while waiting
814      * for our REP data, but in some cases we do.
815      */
816     if (action == A_RECVPKT) {
817         assert(pkt != NULL);
818         /*
819          * Another req for something that's running.  Just send an ACK
820          * and go back and wait for more data.
821          */
822         if (pkt->type == P_REQ) {
823             dbprintf(("%s: received dup P_REQ packet, ACKing it\n",
824                 debug_prefix_time(NULL)));
825             amfree(as->rep_pkt.body);
826             pkt_init(&as->rep_pkt, P_ACK, "");
827             do_sendpkt(as->security_handle, &as->rep_pkt);
828             return (A_PENDING);
829         }
830         /* something unexpected.  Nak it */
831         return (A_SENDNAK);
832     }
833
834     if (action == A_TIMEOUT) {
835         amfree(as->rep_pkt.body);
836         pkt_init(&as->rep_pkt, P_NAK, "ERROR timeout on reply pipe\n");
837         dbprintf(("%s: %s timed out waiting for REP data\n",
838             debug_prefix_time(NULL), as->cmd));
839         do_sendpkt(as->security_handle, &as->rep_pkt);
840         return (A_FINISH);
841     }
842
843     assert(action == A_RECVREP);
844     if(as->bufsize == 0) {
845         as->bufsize = NETWORK_BLOCK_BYTES;
846         as->repbuf = alloc(as->bufsize);
847     }
848
849     do {
850         n = read(as->repfd, as->repbuf + as->repbufsize,
851                  as->bufsize - as->repbufsize - 1);
852     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
853     if (n < 0) {
854         const char *errstr = strerror(errno);
855         dbprintf(("%s: read error on reply pipe: %s\n",
856                   debug_prefix_time(NULL), errstr));
857         amfree(as->rep_pkt.body);
858         pkt_init(&as->rep_pkt, P_NAK, "ERROR read error on reply pipe: %s\n",
859                  errstr);
860         do_sendpkt(as->security_handle, &as->rep_pkt);
861         return (A_FINISH);
862     }
863     /*
864      * If we got some data, go back and wait for more, or EOF.  Nul terminate
865      * the buffer first.
866      */
867     as->repbuf[n + as->repbufsize] = '\0';
868     if (n > 0) {
869         as->repbufsize += n;
870         if(as->repbufsize >= (as->bufsize - 1)) {
871             as->bufsize *= 2;
872             repbuf_temp = alloc(as->bufsize);
873             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
874             amfree(as->repbuf);
875             as->repbuf = repbuf_temp;
876         }
877         else if(as->send_partial_reply) {
878             amfree(as->rep_pkt.body);
879             pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
880             do_sendpkt(as->security_handle, &as->rep_pkt);
881             amfree(as->rep_pkt.body);
882             pkt_init(&as->rep_pkt, P_REP, "");
883         }
884  
885         return (A_PENDING);
886     }
887
888     /*
889      * If we got 0, then we hit EOF.  Process the data and release
890      * the timeout.
891      */
892     assert(n == 0);
893
894     assert(as->ev_repfd != NULL);
895     event_release(as->ev_repfd);
896     as->ev_repfd = NULL;
897
898     assert(as->ev_reptimeout != NULL);
899     event_release(as->ev_reptimeout);
900     as->ev_reptimeout = NULL;
901
902     as->state = s_processrep;
903     aclose(as->repfd);
904     return (A_CONTINUE);
905 }
906
907 /*
908  * After we have read in all of the rep data, we process it and send
909  * it out as a REP packet.
910  */
911 static action_t
912 s_processrep(
913     struct active_service *     as,
914     action_t                    action,
915     pkt_t *                     pkt)
916 {
917     char *tok, *repbuf;
918
919     (void)action;       /* Quiet unused parameter warning */
920     (void)pkt;          /* Quiet unused parameter warning */
921
922     /*
923      * Copy the rep lines into the outgoing packet.
924      *
925      * If this line is a CONNECT, translate it
926      * Format is "CONNECT <tag> <handle> <tag> <handle> etc...
927      * Example:
928      *
929      *  CONNECT DATA 4 MESG 5 INDEX 6
930      *
931      * The tags are arbitrary.  The handles are in the DATA_FD pool.
932      * We need to map these to security streams and pass them back
933      * to the amanda server.  If the handle is -1, then we don't map.
934      */
935     repbuf = stralloc(as->repbuf);
936     amfree(as->rep_pkt.body);
937     pkt_init(&as->rep_pkt, P_REP, "");
938     tok = strtok(repbuf, " ");
939     if (tok == NULL)
940         goto error;
941     if (strcmp(tok, "CONNECT") == 0) {
942         char *line, *nextbuf;
943
944         /* Save the entire line */
945         line = strtok(NULL, "\n");
946         /* Save the buf following the line */
947         nextbuf = strtok(NULL, "");
948
949         if (line == NULL || nextbuf == NULL)
950             goto error;
951
952         pkt_cat(&as->rep_pkt, "CONNECT");
953
954         /* loop over the id/handle pairs */
955         for (;;) {
956             /* id */
957             tok = strtok(line, " ");
958             line = NULL;        /* keep working from line */
959             if (tok == NULL)
960                 break;
961             pkt_cat(&as->rep_pkt, " %s", tok);
962
963             /* handle */
964             tok = strtok(NULL, " \n");
965             if (tok == NULL)
966                 goto error;
967             /* convert the handle into something the server can process */
968             pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
969         }
970         pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
971     } else {
972 error:
973         pkt_cat(&as->rep_pkt, "%s", as->repbuf);
974     }
975
976     /*
977      * We've setup our REP packet in as->rep_pkt.  Now move to the transmission
978      * state.
979      */
980     as->state = s_sendrep;
981     as->repretry = MAX_REP_RETRIES;
982     amfree(repbuf);
983     return (A_CONTINUE);
984 }
985
986 /*
987  * This is the state where we send the REP we just collected from our child.
988  */
989 static action_t
990 s_sendrep(
991     struct active_service *     as,
992     action_t                    action,
993     pkt_t *                     pkt)
994 {
995     (void)action;       /* Quiet unused parameter warning */
996     (void)pkt;          /* Quiet unused parameter warning */
997
998     /*
999      * Transmit it and move to the ack state.
1000      */
1001     do_sendpkt(as->security_handle, &as->rep_pkt);
1002     security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
1003     as->state = s_ackwait;
1004     return (A_PENDING);
1005 }
1006
1007 /*
1008  * This is the state in which we wait for the server to ACK the REP
1009  * we just sent it.
1010  */
1011 static action_t
1012 s_ackwait(
1013     struct active_service *     as,
1014     action_t                    action,
1015     pkt_t *                     pkt)
1016 {
1017     struct datafd_handle *dh;
1018     int npipes;
1019
1020     /*
1021      * If we got a timeout, try again, but eventually give up.
1022      */
1023     if (action == A_TIMEOUT) {
1024         if (--as->repretry > 0) {
1025             as->state = s_sendrep;
1026             return (A_CONTINUE);
1027         }
1028         dbprintf(("%s: timeout waiting for ACK for our REP\n",
1029             debug_prefix_time(NULL)));
1030         return (A_FINISH);
1031     }
1032 #ifdef AMANDAD_DEBUG
1033     dbprintf(("%s: received ACK, now opening streams\n",
1034         debug_prefix_time(NULL)));
1035 #endif
1036
1037     assert(action == A_RECVPKT);
1038
1039     if (pkt->type == P_REQ) {
1040         dbprintf(("%s: received dup P_REQ packet, resending REP\n",
1041                   debug_prefix_time(NULL)));
1042         as->state = s_sendrep;
1043         return (A_CONTINUE);
1044     }
1045
1046     if (pkt->type != P_ACK)
1047         return (A_SENDNAK);
1048
1049     /*
1050      * Got the ack, now open the pipes
1051      */
1052     for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1053         if (dh->netfd == NULL)
1054             continue;
1055         if (security_stream_accept(dh->netfd) < 0) {
1056             dbprintf(("%s: stream %d accept failed: %s\n",
1057                 debug_prefix_time(NULL),
1058                 dh - &as->data[0], security_geterror(as->security_handle)));
1059             security_stream_close(dh->netfd);
1060             dh->netfd = NULL;
1061         }
1062         /* setup an event for reads from it */
1063         dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1064                                      process_readnetfd, dh);
1065
1066         security_stream_read(dh->netfd, process_writenetfd, dh);
1067
1068     }
1069
1070     /*
1071      * Pipes are open, so auth them.  Count them at the same time.
1072      */
1073     for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1074         if (dh->netfd == NULL)
1075             continue;
1076         if (security_stream_auth(dh->netfd) < 0) {
1077             security_stream_close(dh->netfd);
1078             dh->netfd = NULL;
1079             event_release(dh->ev_read);
1080             event_release(dh->ev_write);
1081             dh->ev_read = NULL;
1082             dh->ev_write = NULL;
1083         } else {
1084             npipes++;
1085         }
1086     }
1087
1088     /*
1089      * If no pipes are open, then we're done.  Otherwise, just start running.
1090      * The event handlers on all of the pipes will take it from here.
1091      */
1092 #ifdef AMANDAD_DEBUG
1093     dbprintf(("%s: at end of s_ackwait, npipes is %d\n",
1094         debug_prefix_time(NULL), npipes));
1095 #endif
1096     if (npipes == 0)
1097         return (A_FINISH);
1098     else {
1099         security_close(as->security_handle);
1100         as->security_handle = NULL;
1101         return (A_PENDING);
1102     }
1103 }
1104
1105 /*
1106  * Called when a repfd has received data
1107  */
1108 static void
1109 repfd_recv(
1110     void *      cookie)
1111 {
1112     struct active_service *as = cookie;
1113
1114     assert(as != NULL);
1115     assert(as->ev_repfd != NULL);
1116
1117     state_machine(as, A_RECVREP, NULL);
1118 }
1119
1120 /*
1121  * Called when a repfd has timed out
1122  */
1123 static void
1124 timeout_repfd(
1125     void *      cookie)
1126 {
1127     struct active_service *as = cookie;
1128
1129     assert(as != NULL);
1130     assert(as->ev_reptimeout != NULL);
1131
1132     state_machine(as, A_TIMEOUT, NULL);
1133 }
1134
1135 /*
1136  * Called when a handle has received data
1137  */
1138 static void
1139 protocol_recv(
1140     void *              cookie,
1141     pkt_t *             pkt,
1142     security_status_t   status)
1143 {
1144     struct active_service *as = cookie;
1145
1146     assert(as != NULL);
1147
1148     switch (status) {
1149     case S_OK:
1150         dbprintf(("%s: received %s pkt:\n<<<<<\n%s>>>>>\n",
1151             debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
1152         state_machine(as, A_RECVPKT, pkt);
1153         break;
1154     case S_TIMEOUT:
1155         dbprintf(("%s: timeout\n", debug_prefix_time(NULL)));
1156         state_machine(as, A_TIMEOUT, NULL);
1157         break;
1158     case S_ERROR:
1159         dbprintf(("%s: receive error: %s\n",
1160             debug_prefix_time(NULL), security_geterror(as->security_handle)));
1161         break;
1162     }
1163 }
1164
1165 /*
1166  * This is a generic relay function that just reads data from one of
1167  * the process's pipes and passes it up the equivalent security_stream_t
1168  */
1169 static void
1170 process_readnetfd(
1171     void *      cookie)
1172 {
1173     pkt_t nak;
1174     struct datafd_handle *dh = cookie;
1175     struct active_service *as = dh->as;
1176     ssize_t n;
1177
1178     nak.body = NULL;
1179
1180     do {
1181         n = read(dh->fd_read, as->databuf, SIZEOF(as->databuf));
1182     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
1183
1184     /*
1185      * Process has died.
1186      */
1187     if (n < 0) {
1188         pkt_init(&nak, P_NAK, "A ERROR data descriptor %d broken: %s\n",
1189             dh->fd_read, strerror(errno));
1190         goto sendnak;
1191     }
1192     /*
1193      * Process has closed the pipe.  Just remove this event handler.
1194      * If all pipes are closed, shut down this service.
1195      */
1196     if (n == 0) {
1197         event_release(dh->ev_read);
1198         dh->ev_read = NULL;
1199         if(dh->ev_write == NULL) {
1200             security_stream_close(dh->netfd);
1201             dh->netfd = NULL;
1202         }
1203         for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1204             if (dh->netfd != NULL)
1205                 return;
1206         }
1207         service_delete(as);
1208         return;
1209     }
1210     if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
1211         /* stream has croaked */
1212         pkt_init(&nak, P_NAK, "ERROR write error on stream %d: %s\n",
1213             security_stream_id(dh->netfd),
1214             security_stream_geterror(dh->netfd));
1215         goto sendnak;
1216     }
1217     return;
1218
1219 sendnak:
1220     do_sendpkt(as->security_handle, &nak);
1221     service_delete(as);
1222     amfree(nak.body);
1223 }
1224
1225 /*
1226  * This is a generic relay function that just read data from one of
1227  * the security_stream_t and passes it up the equivalent process's pipes
1228  */
1229 static void
1230 process_writenetfd(
1231     void *      cookie,
1232     void *      buf,
1233     ssize_t     size)
1234 {
1235     struct datafd_handle *dh;
1236
1237     assert(cookie != NULL);
1238     dh = cookie;
1239
1240     if (dh->fd_write <= 0) {
1241         dbprintf(("%s: process_writenetfd: dh->fd_write <= 0\n",
1242             debug_prefix_time(NULL)));
1243     } else if (size > 0) {
1244         fullwrite(dh->fd_write, buf, (size_t)size);
1245         security_stream_read(dh->netfd, process_writenetfd, dh);
1246     }
1247     else {
1248         aclose(dh->fd_write);
1249     }
1250 }
1251
1252
1253 /*
1254  * Convert a local stream handle (DATA_FD...) into something that
1255  * can be sent to the amanda server.
1256  *
1257  * Returns a number that should be sent to the server in the REP packet.
1258  */
1259 static int
1260 allocstream(
1261     struct active_service *     as,
1262     int                         handle)
1263 {
1264     struct datafd_handle *dh;
1265
1266     /* if the handle is -1, then we don't bother */
1267     if (handle < 0)
1268         return (-1);
1269
1270     /* make sure the handle's kosher */
1271     if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
1272         return (-1);
1273
1274     /* get a pointer into our handle array */
1275     dh = &as->data[handle - DATA_FD_OFFSET];
1276
1277     /* make sure we're not already using the net handle */
1278     if (dh->netfd != NULL)
1279         return (-1);
1280
1281     /* allocate a stream from the security layer and return */
1282     dh->netfd = security_stream_server(as->security_handle);
1283     if (dh->netfd == NULL) {
1284         dbprintf(("%s: couldn't open stream to server: %s\n",
1285             debug_prefix_time(NULL), security_geterror(as->security_handle)));
1286         return (-1);
1287     }
1288
1289     /*
1290      * convert the stream into a numeric id that can be sent to the
1291      * remote end.
1292      */
1293     return (security_stream_id(dh->netfd));
1294 }
1295
1296 /*
1297  * Create a new service instance
1298  */
1299 static struct active_service *
1300 service_new(
1301     security_handle_t * security_handle,
1302     const char *        cmd,
1303     const char *        arguments)
1304 {
1305     int i;
1306     int data_read[DATA_FD_COUNT + 1][2];
1307     int data_write[DATA_FD_COUNT + 1][2];
1308     struct active_service *as;
1309     pid_t pid;
1310     int newfd;
1311
1312     assert(security_handle != NULL);
1313     assert(cmd != NULL);
1314     assert(arguments != NULL);
1315
1316     /* a plethora of pipes */
1317     for (i = 0; i < DATA_FD_COUNT + 1; i++) {
1318         if (pipe(data_read[i]) < 0) {
1319             error("pipe: %s\n", strerror(errno));
1320             /*NOTREACHED*/
1321         }
1322         if (pipe(data_write[i]) < 0) {
1323             error("pipe: %s\n", strerror(errno));
1324             /*NOTREACHED*/
1325         }
1326     }
1327
1328     switch(pid = fork()) {
1329     case -1:
1330         error("could not fork service %s: %s\n", cmd, strerror(errno));
1331         /*NOTREACHED*/
1332     default:
1333         /*
1334          * The parent.  Close the far ends of our pipes and return.
1335          */
1336         as = alloc(SIZEOF(*as));
1337         as->cmd = stralloc(cmd);
1338         as->arguments = stralloc(arguments);
1339         as->security_handle = security_handle;
1340         as->state = NULL;
1341         as->pid = pid;
1342         as->send_partial_reply = 0;
1343         if(strcmp(cmd+(strlen(cmd)-8), "sendsize") == 0) {
1344             g_option_t *g_options;
1345             char *option_str, *p;
1346
1347             option_str = stralloc(as->arguments+8);
1348             p = strchr(option_str,'\n');
1349             if(p) *p = '\0';
1350
1351             g_options = parse_g_options(option_str, 1);
1352             if(am_has_feature(g_options->features, fe_partial_estimate)) {
1353                 as->send_partial_reply = 1;
1354             }
1355             free_g_options(g_options);
1356             amfree(option_str);
1357         }
1358
1359         /* write to the request pipe */
1360         aclose(data_read[0][0]);
1361         as->reqfd = data_read[0][1];
1362
1363         /*
1364          * read from the reply pipe
1365          */
1366         as->repfd = data_write[0][0];
1367         aclose(data_write[0][1]);
1368         as->ev_repfd = NULL;
1369         as->repbuf = NULL;
1370         as->repbufsize = 0;
1371         as->bufsize = 0;
1372         as->repretry = 0;
1373         as->rep_pkt.body = NULL;
1374
1375         /*
1376          * read from the rest of the general-use pipes
1377          * (netfds are opened as the client requests them)
1378          */
1379         for (i = 0; i < DATA_FD_COUNT; i++) {
1380             aclose(data_read[i + 1][1]);
1381             aclose(data_write[i + 1][0]);
1382             as->data[i].fd_read = data_read[i + 1][0];
1383             as->data[i].fd_write = data_write[i + 1][1];
1384             as->data[i].ev_read = NULL;
1385             as->data[i].ev_write = NULL;
1386             as->data[i].netfd = NULL;
1387             as->data[i].as = as;
1388         }
1389
1390         /* add it to the service queue */
1391         /* increment the active service count */
1392         TAILQ_INSERT_TAIL(&serviceq.tailq, as, tq);
1393         serviceq.qlength++;
1394
1395         return (as);
1396     case 0:
1397         /*
1398          * The child.  Put our pipes in their advertised locations
1399          * and start up.
1400          */
1401 #ifdef FORCE_USERID
1402         seteuid((uid_t)0);
1403         setuid(client_uid);
1404 #endif
1405
1406         /*
1407          * The data stream is stdin in the new process
1408          */
1409         if (dup2(data_read[0][0], 0) < 0) {
1410             error("dup %d to %d failed: %s\n", data_read[0][0], 0,
1411                 strerror(errno));
1412             /*NOTREACHED*/
1413         }
1414         aclose(data_read[0][0]);
1415         aclose(data_read[0][1]);
1416
1417         /*
1418          * The reply stream is stdout
1419          */
1420         if (dup2(data_write[0][1], 1) < 0) {
1421             error("dup %d to %d failed: %s\n", data_write[0][1], 1,
1422                 strerror(errno));
1423         }
1424         aclose(data_write[0][0]);
1425         aclose(data_write[0][1]);
1426
1427         for (i = 0; i < DATA_FD_COUNT; i++) {
1428             aclose(data_read[i + 1][0]);
1429             aclose(data_write[i + 1][1]);
1430         }
1431
1432         /*
1433          *  Make sure they are not open in the range DATA_FD_OFFSET to
1434          *      DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1
1435          */
1436         for (i = 0; i < DATA_FD_COUNT; i++) {
1437             while(data_read[i + 1][1] >= DATA_FD_OFFSET &&
1438                   data_read[i + 1][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1439                 newfd = dup(data_read[i + 1][1]);
1440                 if(newfd == -1)
1441                     error("Can't dup out off DATA_FD range");
1442                 data_read[i + 1][1] = newfd;
1443             }
1444             while(data_write[i + 1][0] >= DATA_FD_OFFSET &&
1445                   data_write[i + 1][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1446                 newfd = dup(data_write[i + 1][0]);
1447                 if(newfd == -1)
1448                     error("Can't dup out off DATA_FD range");
1449                 data_write[i + 1][0] = newfd;
1450             }
1451         }
1452         for (i = 0; i < DATA_FD_COUNT*2; i++)
1453             close(DATA_FD_OFFSET + i);
1454
1455         /*
1456          * The rest start at the offset defined in amandad.h, and continue
1457          * through the internal defined.
1458          */
1459         for (i = 0; i < DATA_FD_COUNT; i++) {
1460             if (dup2(data_read[i + 1][1], i*2 + DATA_FD_OFFSET) < 0) {
1461                 error("dup %d to %d failed: %s\n", data_read[i + 1][1],
1462                     i + DATA_FD_OFFSET, strerror(errno));
1463             }
1464             aclose(data_read[i + 1][1]);
1465
1466             if (dup2(data_write[i + 1][0], i*2 + 1 + DATA_FD_OFFSET) < 0) {
1467                 error("dup %d to %d failed: %s\n", data_write[i + 1][0],
1468                     i + DATA_FD_OFFSET, strerror(errno));
1469             }
1470             aclose(data_write[i + 1][0]);
1471         }
1472
1473         /* close all unneeded fd */
1474         safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
1475         close(2);
1476
1477         execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env());
1478         error("could not exec service %s: %s\n", cmd, strerror(errno));
1479         /*NOTREACHED*/
1480     }
1481     return NULL;
1482 }
1483
1484 /*
1485  * Unallocate a service instance
1486  */
1487 static void
1488 service_delete(
1489     struct active_service *     as)
1490 {
1491     int i;
1492     struct datafd_handle *dh;
1493
1494 #ifdef AMANDAD_DEBUG
1495         dbprintf(("%s: closing service: %s\n",
1496             debug_prefix_time(NULL), (as->cmd)?as->cmd:"??UNKONWN??"));
1497 #endif
1498
1499     assert(as != NULL);
1500
1501     assert(as->cmd != NULL);
1502     amfree(as->cmd);
1503
1504     assert(as->arguments != NULL);
1505     amfree(as->arguments);
1506
1507     if (as->reqfd != -1)
1508         aclose(as->reqfd);
1509     if (as->repfd != -1)
1510         aclose(as->repfd);
1511
1512     if (as->ev_repfd != NULL)
1513         event_release(as->ev_repfd);
1514     if (as->ev_reptimeout != NULL)
1515         event_release(as->ev_reptimeout);
1516
1517     for (i = 0; i < DATA_FD_COUNT; i++) {
1518         dh = &as->data[i];
1519
1520         aclose(dh->fd_read);
1521         aclose(dh->fd_write);
1522
1523         if (dh->netfd != NULL)
1524             security_stream_close(dh->netfd);
1525
1526         if (dh->ev_read != NULL)
1527             event_release(dh->ev_read);
1528         if (dh->ev_write != NULL)
1529             event_release(dh->ev_write);
1530     }
1531
1532     if (as->security_handle != NULL)
1533         security_close(as->security_handle);
1534
1535     assert(as->pid > 0);
1536     kill(as->pid, SIGTERM);
1537     waitpid(as->pid, NULL, WNOHANG);
1538
1539     TAILQ_REMOVE(&serviceq.tailq, as, tq);
1540     assert(serviceq.qlength > 0);
1541     serviceq.qlength--;
1542
1543     amfree(as->cmd);
1544     amfree(as->arguments);
1545     amfree(as->repbuf);
1546     amfree(as->rep_pkt.body);
1547     amfree(as);
1548
1549     if(exit_on_qlength == 0 && serviceq.qlength == 0) {
1550         dbclose();
1551         exit(0);
1552     }
1553 }
1554
1555 /*
1556  * Like 'fullwrite', but does the work in a child process so pipelines
1557  * do not hang.
1558  */
1559 static int
1560 writebuf(
1561     struct active_service *     as,
1562     const void *                bufp,
1563     size_t                      size)
1564 {
1565     pid_t pid;
1566     ssize_t    writesize;
1567
1568     switch (pid=fork()) {
1569     case -1:
1570         break;
1571
1572     default:
1573         waitpid(pid, NULL, WNOHANG);
1574         return 0;                       /* this is the parent */
1575
1576     case 0:                             /* this is the child */
1577         close(as->repfd);
1578         writesize = fullwrite(as->reqfd, bufp, size);
1579         exit(writesize != (ssize_t)size);
1580         /* NOTREACHED */
1581     }
1582     return -1;
1583 }
1584
1585 static ssize_t
1586 do_sendpkt(
1587     security_handle_t * handle,
1588     pkt_t *             pkt)
1589 {
1590     dbprintf(("%s: sending %s pkt:\n<<<<<\n%s>>>>>\n",
1591         debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
1592     return security_sendpkt(handle, pkt);
1593 }
1594
1595 #ifdef AMANDAD_DEBUG
1596 /*
1597  * Convert a state into a string
1598  */
1599 static const char *
1600 state2str(
1601     state_t     state)
1602 {
1603     static const struct {
1604         state_t state;
1605         const char str[13];
1606     } states[] = {
1607 #define X(state)        { state, stringize(state) }
1608         X(s_sendack),
1609         X(s_repwait),
1610         X(s_processrep),
1611         X(s_sendrep),
1612         X(s_ackwait),
1613 #undef X
1614     };
1615     int i;
1616
1617     for (i = 0; i < (int)(sizeof(states) / sizeof(states[0])); i++)
1618         if (state == states[i].state)
1619             return (states[i].str);
1620     return ("INVALID STATE");
1621 }
1622
1623 /*
1624  * Convert an action into a string
1625  */
1626 static const char *
1627 action2str(
1628     action_t    action)
1629 {
1630     static const struct {
1631         action_t action;
1632         const char str[12];
1633     } actions[] = {
1634 #define X(action)       { action, stringize(action) }
1635         X(A_START),
1636         X(A_RECVPKT),
1637         X(A_RECVREP),
1638         X(A_PENDING),
1639         X(A_FINISH),
1640         X(A_CONTINUE),
1641         X(A_SENDNAK),
1642         X(A_TIMEOUT),
1643 #undef X
1644     };
1645     int i;
1646
1647     for (i = 0; i < (int)(sizeof(actions) / sizeof(actions[0])); i++)
1648         if (action == actions[i].action)
1649             return (actions[i].str);
1650     return ("UNKNOWN ACTION");
1651 }
1652 #endif  /* AMANDAD_DEBUG */