Imported Upstream version 2.5.2p1
[debian/amanda] / amandad-src / amandad.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1999 University of Maryland at College Park
4  * All Rights Reserved.
5  *
6  * Permission to use, copy, modify, distribute, and sell this software and its
7  * documentation for any purpose is hereby granted without fee, provided that
8  * the above copyright notice appear in all copies and that both that
9  * copyright notice and this permission notice appear in supporting
10  * documentation, and that the name of U.M. not be used in advertising or
11  * publicity pertaining to distribution of the software without specific,
12  * written prior permission.  U.M. makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as is"
14  * without express or implied warranty.
15  *
16  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22  *
23  * Authors: the Amanda Development Team.  Its members are listed in a
24  * file named AUTHORS, in the root directory of this distribution.
25  */
26
27 /*
28  * $Id: amandad.c,v 1.18 2006/08/21 20:17:09 martinea Exp $
29  *
30  * handle client-host side of Amanda network communications, including
31  * security checks, execution of the proper service, and acking the
32  * master side
33  */
34
35 #include "amanda.h"
36 #include "amandad.h"
37 #include "clock.h"
38 #include "event.h"
39 #include "amfeatures.h"
40 #include "packet.h"
41 #include "version.h"
42 #include "queue.h"
43 #include "security.h"
44 #include "stream.h"
45 #include "util.h"
46 #include "conffile.h"
47
48 #define REP_TIMEOUT     (6*60*60)       /* secs for service to reply */
49 #define ACK_TIMEOUT     10              /* XXX should be configurable */
50
51 #define amandad_debug(i,x) do {         \
52         if ((i) <= debug_amandad) {     \
53                 dbprintf(x);            \
54         }                               \
55 } while (0)
56
57 /*
58  * These are the actions for entering the state machine
59  */
60 typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
61     A_SENDNAK, A_TIMEOUT } action_t;
62
63 /*
64  * This is a state in the state machine.  It is a function pointer to
65  * the function that actually implements the state.
66  */
67 struct active_service;
68 typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
69
70 /*
71  * This structure describes an active running service.
72  *
73  * An active service is something running that we have received
74  * a request for.  This structure holds info on that service, including
75  * file descriptors for data, etc, as well as the security handle
76  * for communications with the amanda server.
77  */
78 struct active_service {
79     char *cmd;                          /* name of command we ran */
80     char *arguments;                    /* arguments we sent it */
81     security_handle_t *security_handle; /* remote server */
82     state_t state;                      /* how far this has progressed */
83     pid_t pid;                          /* pid of subprocess */
84     int send_partial_reply;             /* send PREP packet */
85     int reqfd;                          /* pipe to write requests */
86     int repfd;                          /* pipe to read replies */
87     event_handle_t *ev_repfd;           /* read event handle for repfd */
88     event_handle_t *ev_reptimeout;      /* timeout for rep data */
89     pkt_t rep_pkt;                      /* rep packet we're sending out */
90     char *repbuf;                       /* buffer to read the rep into */
91     size_t bufsize;                     /* length of repbuf */
92     size_t repbufsize;                  /* length of repbuf */
93     int repretry;                       /* times we'll retry sending the rep */
94     /*
95      * General user streams to the process, and their equivalent
96      * network streams.
97      */
98     struct datafd_handle {
99         int fd_read;                    /* pipe to child process */
100         int fd_write;                   /* pipe to child process */
101         event_handle_t *ev_read;        /* it's read event handle */
102         event_handle_t *ev_write;       /* it's write event handle */
103         security_stream_t *netfd;       /* stream to amanda server */
104         struct active_service *as;      /* pointer back to our enclosure */
105     } data[DATA_FD_COUNT];
106     char databuf[NETWORK_BLOCK_BYTES];  /* buffer to relay netfd data in */
107     TAILQ_ENTRY(active_service) tq;     /* queue handle */
108 };
109
110 /* 
111  * Here are the services that we allow.
112  */
113 static struct services {
114     char *name;
115     int  active;
116 } services[] = {
117     { "noop", 1 },
118     { "sendsize", 1 },
119     { "sendbackup", 1 },
120     { "selfcheck", 1 },
121     { "amindexd", 0 },
122     { "amidxtaped", 0 }
123 };
124 #define NSERVICES       (int)(sizeof(services) / sizeof(services[0]))
125
126 /*
127  * Queue of outstanding requests that we are running.
128  */
129 static struct {
130     TAILQ_HEAD(, active_service) tailq;
131     int qlength;
132 } serviceq = {
133     TAILQ_HEAD_INITIALIZER(serviceq.tailq), 0
134 };
135
136 /*
137  * Data for dbmalloc to check for memory leaks
138  */
139 #ifdef USE_DBMALLOC
140 static struct {
141     struct {
142         unsigned long size, hist;
143     } start, end;
144 } dbmalloc_info;
145 #endif
146
147 static int wait_30s = 1;
148 static int exit_on_qlength = 1;
149 static char *auth = NULL;
150
151 int main(int argc, char **argv);
152
153 static int allocstream(struct active_service *, int);
154 static void exit_check(void *);
155 static void protocol_accept(security_handle_t *, pkt_t *);
156 static void state_machine(struct active_service *, action_t, pkt_t *);
157
158 static action_t s_sendack(struct active_service *, action_t, pkt_t *);
159 static action_t s_repwait(struct active_service *, action_t, pkt_t *);
160 static action_t s_processrep(struct active_service *, action_t, pkt_t *);
161 static action_t s_sendrep(struct active_service *, action_t, pkt_t *);
162 static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
163
164 static void repfd_recv(void *);
165 static void timeout_repfd(void *);
166 static void protocol_recv(void *, pkt_t *, security_status_t);
167 static void process_readnetfd(void *);
168 static void process_writenetfd(void *, void *, ssize_t);
169 static struct active_service *service_new(security_handle_t *,
170     const char *, const char *);
171 static void service_delete(struct active_service *);
172 static int writebuf(struct active_service *, const void *, size_t);
173 static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
174
175 static void child_signal(int signal);
176
177 static const char *state2str(state_t);
178 static const char *action2str(action_t);
179
180 /*
181  * Harvests defunct processes...
182  */
183
184 static void
185 child_signal(
186     int         signal)
187 {
188     pid_t       rp;
189
190     (void)signal;       /* Quite compiler warning */
191     /*
192      * Reap and child status and promptly ignore since we don't care...
193      */
194     do {
195         rp = waitpid(-1, NULL, WNOHANG);
196     } while (rp > 0);
197 }
198
199 int
200 main(
201     int         argc,
202     char **     argv)
203 {
204     int i, j;
205     int have_services;
206     int in, out;
207     const security_driver_t *secdrv;
208     int no_exit = 0;
209     struct sigaction act, oact;
210     char *pgm = "amandad";              /* in case argv[0] is not set */
211 #if defined(USE_REUSEADDR)
212     const int on = 1;
213     int r;
214 #endif
215     char *conffile;
216
217     safe_fd(-1, 0);
218     safe_cd();
219
220     /*
221      * When called via inetd, it is not uncommon to forget to put the
222      * argv[0] value on the config line.  On some systems (e.g. Solaris)
223      * this causes argv and/or argv[0] to be NULL, so we have to be
224      * careful getting our name.
225      */
226     if ((argv == NULL) || (argv[0] == NULL)) {
227             pgm = "amandad";            /* in case argv[0] is not set */
228     } else {
229             pgm = basename(argv[0]);    /* Strip of leading path get debug name */
230     }
231     set_pname(pgm);
232     dbopen(DBG_SUBDIR_AMANDAD);
233
234     if(argv == NULL) {
235         error("argv == NULL\n");
236         /*NOTREACHED*/
237     }
238
239     /* Don't die when child closes pipe */
240     signal(SIGPIPE, SIG_IGN);
241
242     /* Tell me when a child exits or dies... */
243     act.sa_handler = child_signal;
244     sigemptyset(&act.sa_mask);
245     act.sa_flags = 0;
246     if(sigaction(SIGCHLD, &act, &oact) != 0) {
247         error("error setting SIGCHLD handler: %s", strerror(errno));
248         /*NOTREACHED*/
249     }
250
251     conffile = vstralloc(CONFIG_DIR, "/", "amanda-client.conf", NULL);
252     if (read_clientconf(conffile) > 0) {
253         error("error reading conffile: %s", conffile);
254         /*NOTREACHED*/
255     }
256     amfree(conffile);
257
258 #ifdef USE_DBMALLOC
259     dbmalloc_info.start.size = malloc_inuse(&dbmalloc_info.start.hist);
260 #endif
261
262     erroutput_type = (ERR_INTERACTIVE|ERR_SYSLOG);
263
264 #ifdef FORCE_USERID
265     /* we'd rather not run as root */
266     if (geteuid() == 0) {
267         if(client_uid == (uid_t) -1) {
268             error("error [cannot find user %s in passwd file]\n", CLIENT_LOGIN);
269             /*NOTREACHED*/
270         }
271         initgroups(CLIENT_LOGIN, client_gid);
272         setgid(client_gid);
273         setegid(client_gid);
274         seteuid(client_uid);
275     }
276 #endif  /* FORCE_USERID */
277
278     /*
279      * ad-hoc argument parsing
280      *
281      * We accept        -auth=[authentication type]
282      *                  -no-exit
283      *                  -tcp=[port]
284      *                  -udp=[port]
285      * We also add a list of services that amandad can launch
286      */
287     secdrv = NULL;
288     in = 0; out = 1;            /* default to stdin/stdout */
289     have_services = 0;
290     for (i = 1; i < argc; i++) {
291         /*
292          * accept -krb4 as an alias for -auth=krb4 (for compatibility)
293          */
294         if (strcmp(argv[i], "-krb4") == 0) {
295             argv[i] = "-auth=krb4";
296             /* FALLTHROUGH */
297             auth = "krb4";
298         }
299
300         /*
301          * Get a driver for a security type specified after -auth=
302          */
303         else if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
304             argv[i] += strlen("-auth=");
305             secdrv = security_getdriver(argv[i]);
306             auth = argv[i];
307             if (secdrv == NULL) {
308                 error("no driver for security type '%s'\n", argv[i]);
309                 /*NOTREACHED*/
310             }
311             continue;
312         }
313
314         /*
315          * If -no-exit is specified, always run even after requests have
316          * been satisfied.
317          */
318         else if (strcmp(argv[i], "-no-exit") == 0) {
319             no_exit = 1;
320             continue;
321         }
322
323         /*
324          * Allow us to directly bind to a udp port for debugging.
325          * This may only apply to some security types.
326          */
327         else if (strncmp(argv[i], "-udp=", strlen("-udp=")) == 0) {
328 #ifdef WORKING_IPV6
329             struct sockaddr_in6 sin;
330 #else
331             struct sockaddr_in sin;
332 #endif
333
334             argv[i] += strlen("-udp=");
335 #ifdef WORKING_IPV6
336             in = out = socket(AF_INET6, SOCK_DGRAM, 0);
337 #else
338             in = out = socket(AF_INET, SOCK_DGRAM, 0);
339 #endif
340             if (in < 0) {
341                 error("can't create dgram socket: %s\n", strerror(errno));
342                 /*NOTREACHED*/
343             }
344 #ifdef USE_REUSEADDR
345             r = setsockopt(in, SOL_SOCKET, SO_REUSEADDR,
346                 (void *)&on, (socklen_t)sizeof(on));
347             if (r < 0) {
348                 dbprintf(("%s: amandad: setsockopt(SO_REUSEADDR) failed: %s\n",
349                           debug_prefix_time(NULL),
350                           strerror(errno)));
351             }
352 #endif
353
354 #ifdef WORKING_IPV6
355             sin.sin6_family = (sa_family_t)AF_INET6;
356             sin.sin6_addr = in6addr_any;
357             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
358 #else
359             sin.sin_family = (sa_family_t)AF_INET;
360             sin.sin_addr.s_addr = INADDR_ANY;
361             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
362 #endif
363             if (bind(in, (struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
364                 error("can't bind to port %d: %s\n", atoi(argv[i]),
365                     strerror(errno));
366                 /*NOTREACHED*/
367             }
368         }
369         /*
370          * Ditto for tcp ports.
371          */
372         else if (strncmp(argv[i], "-tcp=", strlen("-tcp=")) == 0) {
373 #ifdef WORKING_IPV6
374             struct sockaddr_in6 sin;
375 #else
376             struct sockaddr_in sin;
377 #endif
378             int sock;
379             socklen_t n;
380
381             argv[i] += strlen("-tcp=");
382 #ifdef WORKING_IPV6
383             sock = socket(AF_INET6, SOCK_STREAM, 0);
384 #else
385             sock = socket(AF_INET, SOCK_STREAM, 0);
386 #endif
387             if (sock < 0) {
388                 error("can't create tcp socket: %s\n", strerror(errno));
389                 /*NOTREACHED*/
390             }
391 #ifdef USE_REUSEADDR
392             r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
393                 (void *)&on, (socklen_t)sizeof(on));
394             if (r < 0) {
395                 dbprintf(("%s: amandad: setsockopt(SO_REUSEADDR) failed: %s\n",
396                           debug_prefix_time(NULL),
397                           strerror(errno)));
398             }
399 #endif
400 #ifdef WORKING_IPV6
401             sin.sin6_family = (sa_family_t)AF_INET6;
402             sin.sin6_addr = in6addr_any;
403             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
404 #else
405             sin.sin_family = (sa_family_t)AF_INET;
406             sin.sin_addr.s_addr = INADDR_ANY;
407             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
408 #endif
409             if (bind(sock, (struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
410                 error("can't bind to port %d: %s\n", atoi(argv[i]),
411                     strerror(errno));
412                 /*NOTREACHED*/
413             }
414             listen(sock, 10);
415             n = (socklen_t)sizeof(sin);
416             in = out = accept(sock, (struct sockaddr *)&sin, &n);
417         }
418         /*
419          * It must be a service name
420          */
421         else {
422             /* clear all services */
423             if(!have_services) {
424                 for (j = 0; j < (int)NSERVICES; j++)
425                     services[j].active = 0;
426             }
427             have_services = 1;
428
429             if(strcmp(argv[i],"amdump") == 0) {
430                 services[0].active = 1;
431                 services[1].active = 1;
432                 services[2].active = 1;
433                 services[3].active = 1;
434             }
435             else {
436                 for (j = 0; j < (int)NSERVICES; j++)
437                     if (strcmp(services[j].name, argv[i]) == 0)
438                         break;
439                 if (j == (int)NSERVICES) {
440                     dbprintf(("%s: %s: invalid service\n",
441                               debug_prefix_time(NULL), argv[i]));
442                     exit(1);
443                 }
444                 services[j].active = 1;
445             }
446         }
447     }
448
449     /*
450      * If no security type specified, use BSD
451      */
452     if (secdrv == NULL) {
453         secdrv = security_getdriver("BSD");
454         auth = "bsd";
455         if (secdrv == NULL) {
456             error("no driver for default security type 'BSD'\n");
457             /*NOTREACHED*/
458         }
459     }
460
461     if(strcasecmp(auth, "rsh") == 0 ||
462        strcasecmp(auth, "ssh") == 0 ||
463        strcasecmp(auth, "bsdtcp") == 0) {
464         wait_30s = 0;
465         exit_on_qlength = 1;
466     }
467
468     /* initialize */
469
470     startclock();
471
472     dbprintf(("%s: version %s\n", get_pname(), version()));
473     for (i = 0; version_info[i] != NULL; i++) {
474         dbprintf(("%s: %s", debug_prefix_time(NULL), version_info[i]));
475     }
476
477     if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
478         dbprintf(("%s: WARNING: argv[0] not defined: check inetd.conf\n",
479                   debug_prefix_time(NULL)));
480     }
481
482     /*
483      * Schedule to call protocol_accept() when new security handles
484      * are created on stdin.
485      */
486     security_accept(secdrv, in, out, protocol_accept);
487
488     /*
489      * Schedule an event that will try to exit every 30 seconds if there
490      * are no requests outstanding.
491      */
492     if(wait_30s)
493         (void)event_register((event_id_t)30, EV_TIME, exit_check, &no_exit);
494
495     /*
496      * Call event_loop() with an arg of 0, telling it to block until all
497      * events are completed.
498      */
499     event_loop(0);
500
501     close(in);
502     close(out);
503     dbclose();
504     return(0);
505 }
506
507 /*
508  * This runs periodically and checks to see if we have any active services
509  * still running.  If we don't, then we quit.
510  */
511 static void
512 exit_check(
513     void *      cookie)
514 {
515     int no_exit;
516
517     assert(cookie != NULL);
518     no_exit = *(int *)cookie;
519
520     /*
521      * If things are still running, then don't exit.
522      */
523     if (serviceq.qlength > 0)
524         return;
525
526     /*
527      * If the caller asked us to never exit, then we're done
528      */
529     if (no_exit)
530         return;
531
532 #ifdef USE_DBMALLOC
533     dbmalloc_info.end.size = malloc_inuse(&dbmalloc_info.end.hist);
534
535     if (dbmalloc_info.start.size != dbmalloc_info.end.size) {
536         malloc_list(dbfd(), dbmalloc_info.start.hist,
537             dbmalloc_info.end.hist);
538     }
539 #endif
540
541     dbclose();
542     exit(0);
543 }
544
545 /*
546  * Handles new incoming protocol handles.  This is a callback for
547  * security_accept(), which gets called when new handles are detected.
548  */
549 static void
550 protocol_accept(
551     security_handle_t * handle,
552     pkt_t *             pkt)
553 {
554     pkt_t pkt_out;
555     struct active_service *as;
556     char *pktbody, *tok, *service, *arguments;
557     char *service_path = NULL;
558     int i;
559
560     pkt_out.body = NULL;
561
562     /*
563      * If handle is NULL, then the connection is closed.
564      */
565     if(handle == NULL) {
566         return;
567     }
568
569     /*
570      * If pkt is NULL, then there was a problem with the new connection.
571      */
572     if (pkt == NULL) {
573         dbprintf(("%s: accept error: %s\n",
574             debug_prefix_time(NULL), security_geterror(handle)));
575         pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
576         do_sendpkt(handle, &pkt_out);
577         amfree(pkt_out.body);
578         security_close(handle);
579         return;
580     }
581
582     dbprintf(("%s: accept recv %s pkt:\n<<<<<\n%s>>>>>\n",
583         debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
584
585     /*
586      * If this is not a REQ packet, just forget about it.
587      */
588     if (pkt->type != P_REQ) {
589         dbprintf(("%s: received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n",
590             debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
591         security_close(handle);
592         return;
593     }
594
595     pktbody = service = arguments = NULL;
596     as = NULL;
597
598     /*
599      * Parse out the service and arguments
600      */
601
602     pktbody = stralloc(pkt->body);
603
604     tok = strtok(pktbody, " ");
605     if (tok == NULL)
606         goto badreq;
607     if (strcmp(tok, "SERVICE") != 0)
608         goto badreq;
609
610     tok = strtok(NULL, " \n");
611     if (tok == NULL)
612         goto badreq;
613     service = stralloc(tok);
614
615     /* we call everything else 'arguments' */
616     tok = strtok(NULL, "");
617     if (tok == NULL)
618         goto badreq;
619     arguments = stralloc(tok);
620
621     /* see if it's one we allow */
622     for (i = 0; i < (int)NSERVICES; i++)
623         if (services[i].active == 1 && strcmp(services[i].name, service) == 0)
624             break;
625     if (i == (int)NSERVICES) {
626         dbprintf(("%s: %s: invalid service\n",
627             debug_prefix_time(NULL), service));
628         pkt_init(&pkt_out, P_NAK, "ERROR %s: invalid service, add '%s' as argument to amandad\n", service, service);
629         goto send_pkt_out;
630     }
631
632     service_path = vstralloc(libexecdir, "/", service, versionsuffix(), NULL);
633     if (access(service_path, X_OK) < 0) {
634         dbprintf(("%s: can't execute %s: %s\n",
635             debug_prefix_time(NULL), service_path, strerror(errno)));
636             pkt_init(&pkt_out, P_NAK,
637                      "ERROR execute access to \"%s\" denied\n",
638                      service_path);
639         goto send_pkt_out;
640     }
641
642     /* see if its already running */
643     for (as = TAILQ_FIRST(&serviceq.tailq); as != NULL;
644         as = TAILQ_NEXT(as, tq)) {
645             if (strcmp(as->cmd, service_path) == 0 &&
646                 strcmp(as->arguments, arguments) == 0) {
647                     dbprintf(("%s: %s %s: already running, acking req\n",
648                         debug_prefix_time(NULL), service, arguments));
649                     pkt_init_empty(&pkt_out, P_ACK);
650                     goto send_pkt_out_no_delete;
651             }
652     }
653
654     /*
655      * create a new service instance, and send the arguments down
656      * the request pipe.
657      */
658     dbprintf(("%s: creating new service: %s\n%s\n",
659         debug_prefix_time(NULL), service, arguments));
660     as = service_new(handle, service_path, arguments);
661     if (writebuf(as, arguments, strlen(arguments)) < 0) {
662         const char *errmsg = strerror(errno);
663         dbprintf(("%s: error sending arguments to %s: %s\n",
664             debug_prefix_time(NULL), service, errmsg));
665         pkt_init(&pkt_out, P_NAK, "ERROR error writing arguments to %s: %s\n",
666             service, errmsg);
667         goto send_pkt_out;
668     }
669     aclose(as->reqfd);
670
671     amfree(pktbody);
672     amfree(service);
673     amfree(service_path);
674     amfree(arguments);
675
676     /*
677      * Move to the sendack state, and start up the state
678      * machine.
679      */
680     as->state = s_sendack;
681     state_machine(as, A_START, NULL);
682     return;
683
684 badreq:
685     pkt_init(&pkt_out, P_NAK, "ERROR invalid REQ\n");
686     dbprintf(("%s: received invalid %s packet:\n<<<<<\n%s>>>>>\n\n",
687         debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
688
689 send_pkt_out:
690     if(as)
691         service_delete(as);
692 send_pkt_out_no_delete:
693     amfree(pktbody);
694     amfree(service_path);
695     amfree(service);
696     amfree(arguments);
697     do_sendpkt(handle, &pkt_out);
698     security_close(handle);
699     amfree(pkt_out.body);
700 }
701
702 /*
703  * Handles incoming protocol packets.  Routes responses to the proper
704  * running service.
705  */
706 static void
707 state_machine(
708     struct active_service *     as,
709     action_t                    action,
710     pkt_t *                     pkt)
711 {
712     action_t retaction;
713     state_t curstate;
714     pkt_t nak;
715
716     amandad_debug(1, ("%s: state_machine: %p entering\n",
717                       debug_prefix_time(NULL), as));
718     for (;;) {
719         curstate = as->state;
720         amandad_debug(1, ("%s: state_machine: %p curstate=%s action=%s\n",
721                           debug_prefix_time(NULL), as,
722                           state2str(curstate), action2str(action)));
723         retaction = (*curstate)(as, action, pkt);
724         amandad_debug(1, ("%s: state_machine: %p curstate=%s returned %s (nextstate=%s)\n",
725                           debug_prefix_time(NULL),
726                           as, state2str(curstate), action2str(retaction),
727                           state2str(as->state)));
728
729         switch (retaction) {
730         /*
731          * State has queued up and is now blocking on input.
732          */
733         case A_PENDING:
734             amandad_debug(1, ("%s: state_machine: %p leaving (A_PENDING)\n",
735                               debug_prefix_time(NULL), as));
736             return;
737
738         /*
739          * service has switched states.  Loop.
740          */
741         case A_CONTINUE:
742             break;
743
744         /*
745          * state has determined that the packet it received was bogus.
746          * Send a nak, and return.
747          */
748         case A_SENDNAK:
749             dbprintf(("%s: received unexpected %s packet\n",
750                 debug_prefix_time(NULL), pkt_type2str(pkt->type)));
751             dbprintf(("<<<<<\n%s----\n\n", pkt->body));
752             pkt_init(&nak, P_NAK, "ERROR unexpected packet type %s\n",
753                 pkt_type2str(pkt->type));
754             do_sendpkt(as->security_handle, &nak);
755             amfree(nak.body);
756             security_recvpkt(as->security_handle, protocol_recv, as, -1);
757             amandad_debug(1, ("%s: state_machine: %p leaving (A_SENDNAK)\n",
758                               debug_prefix_time(NULL), as));
759             return;
760
761         /*
762          * Service is done.  Remove it and finish.
763          */
764         case A_FINISH:
765             service_delete(as);
766             amandad_debug(1, ("%s: state_machine: %p leaving (A_FINISH)\n",
767                               debug_prefix_time(NULL), as));
768             return;
769
770         default:
771             assert(0);
772             break;
773         }
774     }
775     /*NOTREACHED*/
776 }
777
778 /*
779  * This state just sends an ack.  After that, we move to the repwait
780  * state to wait for REP data to arrive from the subprocess.
781  */
782 static action_t
783 s_sendack(
784     struct active_service *     as,
785     action_t                    action,
786     pkt_t *                     pkt)
787 {
788     pkt_t ack;
789
790     (void)action;       /* Quiet unused parameter warning */
791     (void)pkt;          /* Quiet unused parameter warning */
792
793     pkt_init_empty(&ack, P_ACK);
794     if (do_sendpkt(as->security_handle, &ack) < 0) {
795         dbprintf(("%s: error sending ACK: %s\n",
796             debug_prefix_time(NULL), security_geterror(as->security_handle)));
797         amfree(ack.body);
798         return (A_FINISH);
799     }
800     amfree(ack.body);
801
802     /*
803      * move to the repwait state
804      * Setup a listener for data on the reply fd, but also
805      * listen for packets over the wire, as the server may
806      * poll us if we take a long time.
807      * Setup a timeout that will fire if it takes too long to
808      * receive rep data.
809      */
810     as->state = s_repwait;
811     as->ev_repfd = event_register((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
812     as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
813         timeout_repfd, as);
814     security_recvpkt(as->security_handle, protocol_recv, as, -1);
815     return (A_PENDING);
816 }
817
818 /*
819  * This is the repwait state.  We have responded to the initial REQ with
820  * an ACK, and we are now waiting for the process we spawned to pass us 
821  * data to send in a REP.
822  */
823 static action_t
824 s_repwait(
825     struct active_service *     as,
826     action_t                    action,
827     pkt_t *                     pkt)
828 {
829     ssize_t n;
830     char *repbuf_temp;
831
832     /*
833      * We normally shouldn't receive any packets while waiting
834      * for our REP data, but in some cases we do.
835      */
836     if (action == A_RECVPKT) {
837         assert(pkt != NULL);
838         /*
839          * Another req for something that's running.  Just send an ACK
840          * and go back and wait for more data.
841          */
842         if (pkt->type == P_REQ) {
843             dbprintf(("%s: received dup P_REQ packet, ACKing it\n",
844                 debug_prefix_time(NULL)));
845             amfree(as->rep_pkt.body);
846             pkt_init_empty(&as->rep_pkt, P_ACK);
847             do_sendpkt(as->security_handle, &as->rep_pkt);
848             security_recvpkt(as->security_handle, protocol_recv, as, -1);
849             return (A_PENDING);
850         }
851         /* something unexpected.  Nak it */
852         return (A_SENDNAK);
853     }
854
855     if (action == A_TIMEOUT) {
856         amfree(as->rep_pkt.body);
857         pkt_init(&as->rep_pkt, P_NAK, "ERROR timeout on reply pipe\n");
858         dbprintf(("%s: %s timed out waiting for REP data\n",
859             debug_prefix_time(NULL), as->cmd));
860         do_sendpkt(as->security_handle, &as->rep_pkt);
861         return (A_FINISH);
862     }
863
864     assert(action == A_RECVREP);
865     if(as->bufsize == 0) {
866         as->bufsize = NETWORK_BLOCK_BYTES;
867         as->repbuf = alloc(as->bufsize);
868     }
869
870     do {
871         n = read(as->repfd, as->repbuf + as->repbufsize,
872                  as->bufsize - as->repbufsize - 1);
873     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
874     if (n < 0) {
875         const char *errstr = strerror(errno);
876         dbprintf(("%s: read error on reply pipe: %s\n",
877                   debug_prefix_time(NULL), errstr));
878         amfree(as->rep_pkt.body);
879         pkt_init(&as->rep_pkt, P_NAK, "ERROR read error on reply pipe: %s\n",
880                  errstr);
881         do_sendpkt(as->security_handle, &as->rep_pkt);
882         return (A_FINISH);
883     }
884     /*
885      * If we got some data, go back and wait for more, or EOF.  Nul terminate
886      * the buffer first.
887      */
888     as->repbuf[n + as->repbufsize] = '\0';
889     if (n > 0) {
890         as->repbufsize += n;
891         if(as->repbufsize >= (as->bufsize - 1)) {
892             as->bufsize *= 2;
893             repbuf_temp = alloc(as->bufsize);
894             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
895             amfree(as->repbuf);
896             as->repbuf = repbuf_temp;
897         }
898         else if(as->send_partial_reply) {
899             amfree(as->rep_pkt.body);
900             pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
901             do_sendpkt(as->security_handle, &as->rep_pkt);
902             amfree(as->rep_pkt.body);
903             pkt_init_empty(&as->rep_pkt, P_REP);
904         }
905  
906         return (A_PENDING);
907     }
908
909     /*
910      * If we got 0, then we hit EOF.  Process the data and release
911      * the timeout.
912      */
913     assert(n == 0);
914
915     assert(as->ev_repfd != NULL);
916     event_release(as->ev_repfd);
917     as->ev_repfd = NULL;
918
919     assert(as->ev_reptimeout != NULL);
920     event_release(as->ev_reptimeout);
921     as->ev_reptimeout = NULL;
922
923     as->state = s_processrep;
924     aclose(as->repfd);
925     return (A_CONTINUE);
926 }
927
928 /*
929  * After we have read in all of the rep data, we process it and send
930  * it out as a REP packet.
931  */
932 static action_t
933 s_processrep(
934     struct active_service *     as,
935     action_t                    action,
936     pkt_t *                     pkt)
937 {
938     char *tok, *repbuf;
939
940     (void)action;       /* Quiet unused parameter warning */
941     (void)pkt;          /* Quiet unused parameter warning */
942
943     /*
944      * Copy the rep lines into the outgoing packet.
945      *
946      * If this line is a CONNECT, translate it
947      * Format is "CONNECT <tag> <handle> <tag> <handle> etc...
948      * Example:
949      *
950      *  CONNECT DATA 4 MESG 5 INDEX 6
951      *
952      * The tags are arbitrary.  The handles are in the DATA_FD pool.
953      * We need to map these to security streams and pass them back
954      * to the amanda server.  If the handle is -1, then we don't map.
955      */
956     repbuf = stralloc(as->repbuf);
957     amfree(as->rep_pkt.body);
958     pkt_init_empty(&as->rep_pkt, P_REP);
959     tok = strtok(repbuf, " ");
960     if (tok == NULL)
961         goto error;
962     if (strcmp(tok, "CONNECT") == 0) {
963         char *line, *nextbuf;
964
965         /* Save the entire line */
966         line = strtok(NULL, "\n");
967         /* Save the buf following the line */
968         nextbuf = strtok(NULL, "");
969
970         if (line == NULL || nextbuf == NULL)
971             goto error;
972
973         pkt_cat(&as->rep_pkt, "CONNECT");
974
975         /* loop over the id/handle pairs */
976         for (;;) {
977             /* id */
978             tok = strtok(line, " ");
979             line = NULL;        /* keep working from line */
980             if (tok == NULL)
981                 break;
982             pkt_cat(&as->rep_pkt, " %s", tok);
983
984             /* handle */
985             tok = strtok(NULL, " \n");
986             if (tok == NULL)
987                 goto error;
988             /* convert the handle into something the server can process */
989             pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
990         }
991         pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
992     } else {
993 error:
994         pkt_cat(&as->rep_pkt, "%s", as->repbuf);
995     }
996
997     /*
998      * We've setup our REP packet in as->rep_pkt.  Now move to the transmission
999      * state.
1000      */
1001     as->state = s_sendrep;
1002     as->repretry = getconf_int(CNF_REP_TRIES);
1003     amfree(repbuf);
1004     return (A_CONTINUE);
1005 }
1006
1007 /*
1008  * This is the state where we send the REP we just collected from our child.
1009  */
1010 static action_t
1011 s_sendrep(
1012     struct active_service *     as,
1013     action_t                    action,
1014     pkt_t *                     pkt)
1015 {
1016     (void)action;       /* Quiet unused parameter warning */
1017     (void)pkt;          /* Quiet unused parameter warning */
1018
1019     /*
1020      * Transmit it and move to the ack state.
1021      */
1022     do_sendpkt(as->security_handle, &as->rep_pkt);
1023     security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
1024     as->state = s_ackwait;
1025     return (A_PENDING);
1026 }
1027
1028 /*
1029  * This is the state in which we wait for the server to ACK the REP
1030  * we just sent it.
1031  */
1032 static action_t
1033 s_ackwait(
1034     struct active_service *     as,
1035     action_t                    action,
1036     pkt_t *                     pkt)
1037 {
1038     struct datafd_handle *dh;
1039     int npipes;
1040
1041     /*
1042      * If we got a timeout, try again, but eventually give up.
1043      */
1044     if (action == A_TIMEOUT) {
1045         if (--as->repretry > 0) {
1046             as->state = s_sendrep;
1047             return (A_CONTINUE);
1048         }
1049         dbprintf(("%s: timeout waiting for ACK for our REP\n",
1050             debug_prefix_time(NULL)));
1051         return (A_FINISH);
1052     }
1053     amandad_debug(1, ("%s: received ACK, now opening streams\n",
1054                       debug_prefix_time(NULL)));
1055
1056     assert(action == A_RECVPKT);
1057
1058     if (pkt->type == P_REQ) {
1059         dbprintf(("%s: received dup P_REQ packet, resending REP\n",
1060                   debug_prefix_time(NULL)));
1061         as->state = s_sendrep;
1062         return (A_CONTINUE);
1063     }
1064
1065     if (pkt->type != P_ACK)
1066         return (A_SENDNAK);
1067
1068     /*
1069      * Got the ack, now open the pipes
1070      */
1071     for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1072         if (dh->netfd == NULL)
1073             continue;
1074         if (security_stream_accept(dh->netfd) < 0) {
1075             dbprintf(("%s: stream %td accept failed: %s\n",
1076                 debug_prefix_time(NULL),
1077                 dh - &as->data[0], security_geterror(as->security_handle)));
1078             security_stream_close(dh->netfd);
1079             dh->netfd = NULL;
1080             continue;
1081         }
1082         /* setup an event for reads from it */
1083         dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1084                                      process_readnetfd, dh);
1085
1086         security_stream_read(dh->netfd, process_writenetfd, dh);
1087
1088     }
1089
1090     /*
1091      * Pipes are open, so auth them.  Count them at the same time.
1092      */
1093     for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1094         if (dh->netfd == NULL)
1095             continue;
1096         if (security_stream_auth(dh->netfd) < 0) {
1097             security_stream_close(dh->netfd);
1098             dh->netfd = NULL;
1099             event_release(dh->ev_read);
1100             event_release(dh->ev_write);
1101             dh->ev_read = NULL;
1102             dh->ev_write = NULL;
1103         } else {
1104             npipes++;
1105         }
1106     }
1107
1108     /*
1109      * If no pipes are open, then we're done.  Otherwise, just start running.
1110      * The event handlers on all of the pipes will take it from here.
1111      */
1112     amandad_debug(1, ("%s: at end of s_ackwait, npipes is %d\n",
1113                       debug_prefix_time(NULL), npipes));
1114     if (npipes == 0)
1115         return (A_FINISH);
1116     else {
1117         security_close(as->security_handle);
1118         as->security_handle = NULL;
1119         return (A_PENDING);
1120     }
1121 }
1122
1123 /*
1124  * Called when a repfd has received data
1125  */
1126 static void
1127 repfd_recv(
1128     void *      cookie)
1129 {
1130     struct active_service *as = cookie;
1131
1132     assert(as != NULL);
1133     assert(as->ev_repfd != NULL);
1134
1135     state_machine(as, A_RECVREP, NULL);
1136 }
1137
1138 /*
1139  * Called when a repfd has timed out
1140  */
1141 static void
1142 timeout_repfd(
1143     void *      cookie)
1144 {
1145     struct active_service *as = cookie;
1146
1147     assert(as != NULL);
1148     assert(as->ev_reptimeout != NULL);
1149
1150     state_machine(as, A_TIMEOUT, NULL);
1151 }
1152
1153 /*
1154  * Called when a handle has received data
1155  */
1156 static void
1157 protocol_recv(
1158     void *              cookie,
1159     pkt_t *             pkt,
1160     security_status_t   status)
1161 {
1162     struct active_service *as = cookie;
1163
1164     assert(as != NULL);
1165
1166     switch (status) {
1167     case S_OK:
1168         dbprintf(("%s: received %s pkt:\n<<<<<\n%s>>>>>\n",
1169             debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
1170         state_machine(as, A_RECVPKT, pkt);
1171         break;
1172     case S_TIMEOUT:
1173         dbprintf(("%s: timeout\n", debug_prefix_time(NULL)));
1174         state_machine(as, A_TIMEOUT, NULL);
1175         break;
1176     case S_ERROR:
1177         dbprintf(("%s: receive error: %s\n",
1178             debug_prefix_time(NULL), security_geterror(as->security_handle)));
1179         break;
1180     }
1181 }
1182
1183 /*
1184  * This is a generic relay function that just reads data from one of
1185  * the process's pipes and passes it up the equivalent security_stream_t
1186  */
1187 static void
1188 process_readnetfd(
1189     void *      cookie)
1190 {
1191     pkt_t nak;
1192     struct datafd_handle *dh = cookie;
1193     struct active_service *as = dh->as;
1194     ssize_t n;
1195
1196     nak.body = NULL;
1197
1198     do {
1199         n = read(dh->fd_read, as->databuf, SIZEOF(as->databuf));
1200     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
1201
1202     /*
1203      * Process has died.
1204      */
1205     if (n < 0) {
1206         pkt_init(&nak, P_NAK, "A ERROR data descriptor %d broken: %s\n",
1207             dh->fd_read, strerror(errno));
1208         goto sendnak;
1209     }
1210     /*
1211      * Process has closed the pipe.  Just remove this event handler.
1212      * If all pipes are closed, shut down this service.
1213      */
1214     if (n == 0) {
1215         event_release(dh->ev_read);
1216         dh->ev_read = NULL;
1217         if(dh->ev_write == NULL) {
1218             security_stream_close(dh->netfd);
1219             dh->netfd = NULL;
1220         }
1221         for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1222             if (dh->netfd != NULL)
1223                 return;
1224         }
1225         service_delete(as);
1226         return;
1227     }
1228     if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
1229         /* stream has croaked */
1230         pkt_init(&nak, P_NAK, "ERROR write error on stream %d: %s\n",
1231             security_stream_id(dh->netfd),
1232             security_stream_geterror(dh->netfd));
1233         goto sendnak;
1234     }
1235     return;
1236
1237 sendnak:
1238     do_sendpkt(as->security_handle, &nak);
1239     service_delete(as);
1240     amfree(nak.body);
1241 }
1242
1243 /*
1244  * This is a generic relay function that just read data from one of
1245  * the security_stream_t and passes it up the equivalent process's pipes
1246  */
1247 static void
1248 process_writenetfd(
1249     void *      cookie,
1250     void *      buf,
1251     ssize_t     size)
1252 {
1253     struct datafd_handle *dh;
1254
1255     assert(cookie != NULL);
1256     dh = cookie;
1257
1258     if (dh->fd_write <= 0) {
1259         dbprintf(("%s: process_writenetfd: dh->fd_write <= 0\n",
1260             debug_prefix_time(NULL)));
1261     } else if (size > 0) {
1262         fullwrite(dh->fd_write, buf, (size_t)size);
1263         security_stream_read(dh->netfd, process_writenetfd, dh);
1264     }
1265     else {
1266         aclose(dh->fd_write);
1267     }
1268 }
1269
1270
1271 /*
1272  * Convert a local stream handle (DATA_FD...) into something that
1273  * can be sent to the amanda server.
1274  *
1275  * Returns a number that should be sent to the server in the REP packet.
1276  */
1277 static int
1278 allocstream(
1279     struct active_service *     as,
1280     int                         handle)
1281 {
1282     struct datafd_handle *dh;
1283
1284     /* if the handle is -1, then we don't bother */
1285     if (handle < 0)
1286         return (-1);
1287
1288     /* make sure the handle's kosher */
1289     if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
1290         return (-1);
1291
1292     /* get a pointer into our handle array */
1293     dh = &as->data[handle - DATA_FD_OFFSET];
1294
1295     /* make sure we're not already using the net handle */
1296     if (dh->netfd != NULL)
1297         return (-1);
1298
1299     /* allocate a stream from the security layer and return */
1300     dh->netfd = security_stream_server(as->security_handle);
1301     if (dh->netfd == NULL) {
1302         dbprintf(("%s: couldn't open stream to server: %s\n",
1303             debug_prefix_time(NULL), security_geterror(as->security_handle)));
1304         return (-1);
1305     }
1306
1307     /*
1308      * convert the stream into a numeric id that can be sent to the
1309      * remote end.
1310      */
1311     return (security_stream_id(dh->netfd));
1312 }
1313
1314 /*
1315  * Create a new service instance
1316  */
1317 static struct active_service *
1318 service_new(
1319     security_handle_t * security_handle,
1320     const char *        cmd,
1321     const char *        arguments)
1322 {
1323     int i;
1324     int data_read[DATA_FD_COUNT + 1][2];
1325     int data_write[DATA_FD_COUNT + 1][2];
1326     struct active_service *as;
1327     pid_t pid;
1328     int newfd;
1329
1330     assert(security_handle != NULL);
1331     assert(cmd != NULL);
1332     assert(arguments != NULL);
1333
1334     /* a plethora of pipes */
1335     for (i = 0; i < DATA_FD_COUNT + 1; i++) {
1336         if (pipe(data_read[i]) < 0) {
1337             error("pipe: %s\n", strerror(errno));
1338             /*NOTREACHED*/
1339         }
1340         if (pipe(data_write[i]) < 0) {
1341             error("pipe: %s\n", strerror(errno));
1342             /*NOTREACHED*/
1343         }
1344     }
1345
1346     switch(pid = fork()) {
1347     case -1:
1348         error("could not fork service %s: %s\n", cmd, strerror(errno));
1349         /*NOTREACHED*/
1350     default:
1351         /*
1352          * The parent.  Close the far ends of our pipes and return.
1353          */
1354         as = alloc(SIZEOF(*as));
1355         as->cmd = stralloc(cmd);
1356         as->arguments = stralloc(arguments);
1357         as->security_handle = security_handle;
1358         as->state = NULL;
1359         as->pid = pid;
1360         as->send_partial_reply = 0;
1361         if(strcmp(cmd+(strlen(cmd)-8), "sendsize") == 0) {
1362             g_option_t *g_options;
1363             char *option_str, *p;
1364
1365             option_str = stralloc(as->arguments+8);
1366             p = strchr(option_str,'\n');
1367             if(p) *p = '\0';
1368
1369             g_options = parse_g_options(option_str, 1);
1370             if(am_has_feature(g_options->features, fe_partial_estimate)) {
1371                 as->send_partial_reply = 1;
1372             }
1373             free_g_options(g_options);
1374             amfree(option_str);
1375         }
1376
1377         /* write to the request pipe */
1378         aclose(data_read[0][0]);
1379         as->reqfd = data_read[0][1];
1380
1381         /*
1382          * read from the reply pipe
1383          */
1384         as->repfd = data_write[0][0];
1385         aclose(data_write[0][1]);
1386         as->ev_repfd = NULL;
1387         as->repbuf = NULL;
1388         as->repbufsize = 0;
1389         as->bufsize = 0;
1390         as->repretry = 0;
1391         as->rep_pkt.body = NULL;
1392
1393         /*
1394          * read from the rest of the general-use pipes
1395          * (netfds are opened as the client requests them)
1396          */
1397         for (i = 0; i < DATA_FD_COUNT; i++) {
1398             aclose(data_read[i + 1][1]);
1399             aclose(data_write[i + 1][0]);
1400             as->data[i].fd_read = data_read[i + 1][0];
1401             as->data[i].fd_write = data_write[i + 1][1];
1402             as->data[i].ev_read = NULL;
1403             as->data[i].ev_write = NULL;
1404             as->data[i].netfd = NULL;
1405             as->data[i].as = as;
1406         }
1407
1408         /* add it to the service queue */
1409         /* increment the active service count */
1410         TAILQ_INSERT_TAIL(&serviceq.tailq, as, tq);
1411         serviceq.qlength++;
1412
1413         return (as);
1414     case 0:
1415         /*
1416          * The child.  Put our pipes in their advertised locations
1417          * and start up.
1418          */
1419 #ifdef FORCE_USERID
1420         seteuid((uid_t)0);
1421         setuid(client_uid);
1422 #endif
1423
1424         /*
1425          * The data stream is stdin in the new process
1426          */
1427         if (dup2(data_read[0][0], 0) < 0) {
1428             error("dup %d to %d failed: %s\n", data_read[0][0], 0,
1429                 strerror(errno));
1430             /*NOTREACHED*/
1431         }
1432         aclose(data_read[0][0]);
1433         aclose(data_read[0][1]);
1434
1435         /*
1436          * The reply stream is stdout
1437          */
1438         if (dup2(data_write[0][1], 1) < 0) {
1439             error("dup %d to %d failed: %s\n", data_write[0][1], 1,
1440                 strerror(errno));
1441         }
1442         aclose(data_write[0][0]);
1443         aclose(data_write[0][1]);
1444
1445         for (i = 0; i < DATA_FD_COUNT; i++) {
1446             aclose(data_read[i + 1][0]);
1447             aclose(data_write[i + 1][1]);
1448         }
1449
1450         /*
1451          *  Make sure they are not open in the range DATA_FD_OFFSET to
1452          *      DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1
1453          */
1454         for (i = 0; i < DATA_FD_COUNT; i++) {
1455             while(data_read[i + 1][1] >= DATA_FD_OFFSET &&
1456                   data_read[i + 1][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1457                 newfd = dup(data_read[i + 1][1]);
1458                 if(newfd == -1)
1459                     error("Can't dup out off DATA_FD range");
1460                 data_read[i + 1][1] = newfd;
1461             }
1462             while(data_write[i + 1][0] >= DATA_FD_OFFSET &&
1463                   data_write[i + 1][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1464                 newfd = dup(data_write[i + 1][0]);
1465                 if(newfd == -1)
1466                     error("Can't dup out off DATA_FD range");
1467                 data_write[i + 1][0] = newfd;
1468             }
1469         }
1470         for (i = 0; i < DATA_FD_COUNT*2; i++)
1471             close(DATA_FD_OFFSET + i);
1472
1473         /*
1474          * The rest start at the offset defined in amandad.h, and continue
1475          * through the internal defined.
1476          */
1477         for (i = 0; i < DATA_FD_COUNT; i++) {
1478             if (dup2(data_read[i + 1][1], i*2 + DATA_FD_OFFSET) < 0) {
1479                 error("dup %d to %d failed: %s\n", data_read[i + 1][1],
1480                     i + DATA_FD_OFFSET, strerror(errno));
1481             }
1482             aclose(data_read[i + 1][1]);
1483
1484             if (dup2(data_write[i + 1][0], i*2 + 1 + DATA_FD_OFFSET) < 0) {
1485                 error("dup %d to %d failed: %s\n", data_write[i + 1][0],
1486                     i + DATA_FD_OFFSET, strerror(errno));
1487             }
1488             aclose(data_write[i + 1][0]);
1489         }
1490
1491         /* close all unneeded fd */
1492         safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
1493         close(2);
1494
1495         execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env());
1496         error("could not exec service %s: %s\n", cmd, strerror(errno));
1497         /*NOTREACHED*/
1498     }
1499     return NULL;
1500 }
1501
1502 /*
1503  * Unallocate a service instance
1504  */
1505 static void
1506 service_delete(
1507     struct active_service *     as)
1508 {
1509     int i;
1510     struct datafd_handle *dh;
1511
1512     amandad_debug(1, ("%s: closing service: %s\n",
1513                       debug_prefix_time(NULL),
1514                       (as->cmd)?as->cmd:"??UNKONWN??"));
1515
1516     assert(as != NULL);
1517
1518     assert(as->cmd != NULL);
1519     amfree(as->cmd);
1520
1521     assert(as->arguments != NULL);
1522     amfree(as->arguments);
1523
1524     if (as->reqfd != -1)
1525         aclose(as->reqfd);
1526     if (as->repfd != -1)
1527         aclose(as->repfd);
1528
1529     if (as->ev_repfd != NULL)
1530         event_release(as->ev_repfd);
1531     if (as->ev_reptimeout != NULL)
1532         event_release(as->ev_reptimeout);
1533
1534     for (i = 0; i < DATA_FD_COUNT; i++) {
1535         dh = &as->data[i];
1536
1537         aclose(dh->fd_read);
1538         aclose(dh->fd_write);
1539
1540         if (dh->netfd != NULL)
1541             security_stream_close(dh->netfd);
1542
1543         if (dh->ev_read != NULL)
1544             event_release(dh->ev_read);
1545         if (dh->ev_write != NULL)
1546             event_release(dh->ev_write);
1547     }
1548
1549     if (as->security_handle != NULL)
1550         security_close(as->security_handle);
1551
1552     assert(as->pid > 0);
1553     kill(as->pid, SIGTERM);
1554     waitpid(as->pid, NULL, WNOHANG);
1555
1556     TAILQ_REMOVE(&serviceq.tailq, as, tq);
1557     assert(serviceq.qlength > 0);
1558     serviceq.qlength--;
1559
1560     amfree(as->cmd);
1561     amfree(as->arguments);
1562     amfree(as->repbuf);
1563     amfree(as->rep_pkt.body);
1564     amfree(as);
1565
1566     if(exit_on_qlength == 0 && serviceq.qlength == 0) {
1567         dbclose();
1568         exit(0);
1569     }
1570 }
1571
1572 /*
1573  * Like 'fullwrite', but does the work in a child process so pipelines
1574  * do not hang.
1575  */
1576 static int
1577 writebuf(
1578     struct active_service *     as,
1579     const void *                bufp,
1580     size_t                      size)
1581 {
1582     pid_t pid;
1583     ssize_t    writesize;
1584
1585     switch (pid=fork()) {
1586     case -1:
1587         break;
1588
1589     default:
1590         waitpid(pid, NULL, WNOHANG);
1591         return 0;                       /* this is the parent */
1592
1593     case 0:                             /* this is the child */
1594         close(as->repfd);
1595         writesize = fullwrite(as->reqfd, bufp, size);
1596         exit(writesize != (ssize_t)size);
1597         /* NOTREACHED */
1598     }
1599     return -1;
1600 }
1601
1602 static ssize_t
1603 do_sendpkt(
1604     security_handle_t * handle,
1605     pkt_t *             pkt)
1606 {
1607     dbprintf(("%s: sending %s pkt:\n<<<<<\n%s>>>>>\n",
1608         debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
1609     if (handle)
1610         return security_sendpkt(handle, pkt);
1611     else
1612         return 1;
1613 }
1614
1615 /*
1616  * Convert a state into a string
1617  */
1618 static const char *
1619 state2str(
1620     state_t     state)
1621 {
1622     static const struct {
1623         state_t state;
1624         const char str[13];
1625     } states[] = {
1626 #define X(state)        { state, stringize(state) }
1627         X(s_sendack),
1628         X(s_repwait),
1629         X(s_processrep),
1630         X(s_sendrep),
1631         X(s_ackwait),
1632 #undef X
1633     };
1634     int i;
1635
1636     for (i = 0; i < (int)(sizeof(states) / sizeof(states[0])); i++)
1637         if (state == states[i].state)
1638             return (states[i].str);
1639     return ("INVALID STATE");
1640 }
1641
1642 /*
1643  * Convert an action into a string
1644  */
1645 static const char *
1646 action2str(
1647     action_t    action)
1648 {
1649     static const struct {
1650         action_t action;
1651         const char str[12];
1652     } actions[] = {
1653 #define X(action)       { action, stringize(action) }
1654         X(A_START),
1655         X(A_RECVPKT),
1656         X(A_RECVREP),
1657         X(A_PENDING),
1658         X(A_FINISH),
1659         X(A_CONTINUE),
1660         X(A_SENDNAK),
1661         X(A_TIMEOUT),
1662 #undef X
1663     };
1664     int i;
1665
1666     for (i = 0; i < (int)(sizeof(actions) / sizeof(actions[0])); i++)
1667         if (action == actions[i].action)
1668             return (actions[i].str);
1669     return ("UNKNOWN ACTION");
1670 }