9090c3085aa6bc8b894833ca3c46ff1d5516002f
[debian/amanda] / client-src / amandad.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1999 University of Maryland at College Park
4  * All Rights Reserved.
5  *
6  * Permission to use, copy, modify, distribute, and sell this software and its
7  * documentation for any purpose is hereby granted without fee, provided that
8  * the above copyright notice appear in all copies and that both that
9  * copyright notice and this permission notice appear in supporting
10  * documentation, and that the name of U.M. not be used in advertising or
11  * publicity pertaining to distribution of the software without specific,
12  * written prior permission.  U.M. makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as is"
14  * without express or implied warranty.
15  *
16  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22  *
23  * Authors: the Amanda Development Team.  Its members are listed in a
24  * file named AUTHORS, in the root directory of this distribution.
25  */
26
27 /*
28  * $Id: amandad.c,v 1.62.2.2 2006/05/08 11:50:16 martinea Exp $
29  *
30  * handle client-host side of Amanda network communications, including
31  * security checks, execution of the proper service, and acking the
32  * master side
33  */
34
35 /*#define       AMANDAD_DEBUG*/
36
37 #include "amanda.h"
38 #include "amandad.h"
39 #include "clock.h"
40 #include "event.h"
41 #include "amfeatures.h"
42 #include "packet.h"
43 #include "version.h"
44 #include "queue.h"
45 #include "security.h"
46 #include "stream.h"
47 #include "util.h"
48 #include "client_util.h"
49
50 #define REP_TIMEOUT     (6*60*60)       /* secs for service to reply */
51 #define ACK_TIMEOUT     10              /* XXX should be configurable */
52 #define MAX_REP_RETRIES 5
53
54 /*
55  * These are the actions for entering the state machine
56  */
57 typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
58     A_SENDNAK, A_TIMEOUT } action_t;
59
60 /*
61  * This is a state in the state machine.  It is a function pointer to
62  * the function that actually implements the state.
63  */
64 struct active_service;
65 typedef action_t (*state_t) P((struct active_service *, action_t, pkt_t *));
66
67 /*
68  * This structure describes an active running service.
69  *
70  * An active service is something running that we have received
71  * a request for.  This structure holds info on that service, including
72  * file descriptors for data, etc, as well as the security handle
73  * for communications with the amanda server.
74  */
75 struct active_service {
76     char *cmd;                          /* name of command we ran */
77     char *arguments;                    /* arguments we sent it */
78     security_handle_t *security_handle; /* remote server */
79     state_t state;                      /* how far this has progressed */
80     pid_t pid;                          /* pid of subprocess */
81     int send_partial_reply;             /* send PREP packet */
82     int reqfd;                          /* pipe to write requests */
83     int repfd;                          /* pipe to read replies */
84     event_handle_t *ev_repfd;           /* read event handle for repfd */
85     event_handle_t *ev_reptimeout;      /* timeout for rep data */
86     pkt_t rep_pkt;                      /* rep packet we're sending out */
87     char repbuf[MAX_PACKET];            /* buffer to read the rep into */
88     int repbufsize;                     /* length of repbuf */
89     int repretry;                       /* times we'll retry sending the rep */
90     /*
91      * General user streams to the process, and their equivalent
92      * network streams.
93      */
94     struct datafd_handle {
95         int fd;                         /* pipe to child process */
96         event_handle_t *ev_handle;      /* it's read event handle */
97         security_stream_t *netfd;       /* stream to amanda server */
98         struct active_service *as;      /* pointer back to our enclosure */
99     } data[DATA_FD_COUNT];
100     char databuf[NETWORK_BLOCK_BYTES];  /* buffer to relay netfd data in */
101     TAILQ_ENTRY(active_service) tq;     /* queue handle */
102 };
103
104 /* 
105  * Here are the services that we allow.
106  */
107 static const char *services[] = {
108     "noop",
109     "sendsize",
110     "sendbackup",
111     "selfcheck",
112 };
113 #define NSERVICES       (sizeof(services) / sizeof(services[0]))
114
115 /*
116  * Queue of outstanding requests that we are running.
117  */
118 static struct {
119     TAILQ_HEAD(, active_service) tailq;
120     int qlength;
121 } serviceq = {
122     TAILQ_HEAD_INITIALIZER(serviceq.tailq), 0
123 };
124
125 /*
126  * Data for dbmalloc to check for memory leaks
127  */
128 #ifdef USE_DBMALLOC
129 static struct {
130     struct {
131         unsigned long size, hist;
132     } start, end;
133 } dbmalloc_info;
134 #endif
135
136 int ack_timeout     = ACK_TIMEOUT;
137
138 int main P((int argc, char **argv));
139
140 static int allocstream P((struct active_service *, int));
141 static void exit_check P((void *));
142 static void protocol_accept P((security_handle_t *, pkt_t *));
143 static void state_machine P((struct active_service *, action_t, pkt_t *));
144
145 static action_t s_sendack P((struct active_service *, action_t, pkt_t *));
146 static action_t s_repwait P((struct active_service *, action_t, pkt_t *));
147 static action_t s_processrep P((struct active_service *, action_t, pkt_t *));
148 static action_t s_sendrep P((struct active_service *, action_t, pkt_t *));
149 static action_t s_ackwait P((struct active_service *, action_t, pkt_t *));
150
151 static void repfd_recv P((void *));
152 static void timeout_repfd P((void *));
153 static void protocol_recv P((void *, pkt_t *, security_status_t));
154 static void process_netfd P((void *));
155 static struct active_service *service_new P((security_handle_t *,
156     const char *, const char *));
157 static void service_delete P((struct active_service *));
158 static int writebuf P((struct active_service *, const void *, size_t));
159 static int do_sendpkt P((security_handle_t *handle, pkt_t *pkt));
160
161 #ifdef AMANDAD_DEBUG
162 static const char *state2str P((state_t));
163 static const char *action2str P((action_t));
164 #endif
165
166 int
167 main(argc, argv)
168     int argc;
169     char **argv;
170 {
171     int i, in, out;
172     const security_driver_t *secdrv;
173     int no_exit = 0;
174     char *pgm = "amandad";              /* in case argv[0] is not set */
175
176     safe_fd(-1, 0);
177     safe_cd();
178
179     if(argv == NULL) {
180         error("argv == NULL\n");
181     }
182
183     /*
184      * When called via inetd, it is not uncommon to forget to put the
185      * argv[0] value on the config line.  On some systems (e.g. Solaris)
186      * this causes argv and/or argv[0] to be NULL, so we have to be
187      * careful getting our name.
188      */
189     if (argc >= 1 && argv != NULL && argv[0] != NULL) {
190         if((pgm = strrchr(argv[0], '/')) != NULL) {
191             pgm++;
192         } else {
193             pgm = argv[0];
194         }
195     }
196
197     set_pname(pgm);
198
199     /* Don't die when child closes pipe */
200     signal(SIGPIPE, SIG_IGN);
201
202 #ifdef USE_DBMALLOC
203     dbmalloc_info.start.size = malloc_inuse(&dbmalloc_info.start.hist);
204 #endif
205
206     erroutput_type = (ERR_INTERACTIVE|ERR_SYSLOG);
207
208 #ifdef FORCE_USERID
209     /* we'd rather not run as root */
210     if (geteuid() == 0) {
211         if(client_uid == (uid_t) -1) {
212             error("error [cannot find user %s in passwd file]\n", CLIENT_LOGIN);
213         }
214         initgroups(CLIENT_LOGIN, client_gid);
215         setgid(client_gid);
216         setegid(client_gid);
217         seteuid(client_uid);
218     }
219 #endif  /* FORCE_USERID */
220
221     /*
222      * ad-hoc argument parsing
223      *
224      * We accept        -auth=[authentication type]
225      *                  -no-exit
226 #ifdef AMANDAD_DEBUG
227      *                  -tcp=[port]
228      *                  -udp=[port]
229 #endif
230      */
231     secdrv = NULL;
232     in = 0; out = 1;            /* default to stdin/stdout */
233     for (i = 1; i < argc; i++) {
234         /*
235          * accept -krb4 as an alias for -auth=krb4 (for compatibility)
236          */
237         if (strcmp(argv[i], "-krb4") == 0) {
238             argv[i] = "-auth=krb4";
239             /* FALLTHROUGH */
240         }
241
242         /*
243          * Get a driver for a security type specified after -auth=
244          */
245         if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
246             argv[i] += strlen("-auth=");
247             secdrv = security_getdriver(argv[i]);
248             if (secdrv == NULL)
249                 error("no driver for security type '%s'", argv[i]);
250             continue;
251         }
252
253         /*
254          * If -no-exit is specified, always run even after requests have
255          * been satisfied.
256          */
257         if (strcmp(argv[i], "-no-exit") == 0) {
258             no_exit = 1;
259             continue;
260         }
261
262 #ifdef AMANDAD_DEBUG
263         /*
264          * Allow us to directly bind to a udp port for debugging.
265          * This may only apply to some security types.
266          */
267         if (strncmp(argv[i], "-udp=", strlen("-udp=")) == 0) {
268             struct sockaddr_in sin;
269
270             argv[i] += strlen("-udp=");
271             in = out = socket(AF_INET, SOCK_DGRAM, 0);
272             if (in < 0)
273                 error("can't create dgram socket: %s\n", strerror(errno));
274             sin.sin_family = AF_INET;
275             sin.sin_addr.s_addr = INADDR_ANY;
276             sin.sin_port = htons(atoi(argv[i]));
277             if (bind(in, (struct sockaddr *)&sin, sizeof(sin)) < 0)
278                 error("can't bind to port %d: %s\n", atoi(argv[i]),
279                     strerror(errno));
280         }
281         /*
282          * Ditto for tcp ports.
283          */
284         if (strncmp(argv[i], "-tcp=", strlen("-tcp=")) == 0) {
285             struct sockaddr_in sin;
286             int sock, n;
287
288             argv[i] += strlen("-tcp=");
289             sock = socket(AF_INET, SOCK_STREAM, 0);
290             if (sock < 0)
291                 error("can't create tcp socket: %s\n", strerror(errno));
292             n = 1;
293             setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&n, sizeof(n));
294             sin.sin_family = AF_INET;
295             sin.sin_addr.s_addr = INADDR_ANY;
296             sin.sin_port = htons(atoi(argv[i]));
297             if (bind(sock, (struct sockaddr *)&sin, sizeof(sin)) < 0)
298                 error("can't bind to port %d: %s\n", atoi(argv[i]),
299                     strerror(errno));
300             listen(sock, 10);
301             n = sizeof(sin);
302             in = out = accept(sock, (struct sockaddr *)&sin, &n);
303         }
304 #endif
305     }
306
307     /*
308      * If no security type specified, use BSD
309      */
310     if (secdrv == NULL) {
311         secdrv = security_getdriver("BSD");
312         if (secdrv == NULL)
313             error("no driver for default security type 'BSD'");
314     }
315
316     /* initialize */
317
318     dbopen();
319     {
320         /* this lameness is for error() */
321         int db_fd = dbfd();
322         if(db_fd != -1) {
323             dup2(db_fd, 2);
324         }
325     }
326
327     startclock();
328
329     dbprintf(("%s: version %s\n", get_pname(), version()));
330     for (i = 0; version_info[i] != NULL; i++) {
331         dbprintf(("%s: %s", debug_prefix(NULL), version_info[i]));
332     }
333
334     if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
335         dbprintf(("%s: WARNING: argv[0] not defined: check inetd.conf\n",
336                   debug_prefix(NULL)));
337     }
338
339     /*
340      * Schedule to call protocol_accept() when new security handles
341      * are created on stdin.
342      */
343     security_accept(secdrv, in, out, protocol_accept);
344
345     /*
346      * Schedule an event that will try to exit every 30 seconds if there
347      * are no requests outstanding.
348      */
349     (void)event_register(30, EV_TIME, exit_check, &no_exit);
350
351     /*
352      * Call event_loop() with an arg of 0, telling it to block until all
353      * events are completed.
354      */
355     event_loop(0);
356
357     /* NOTREACHED */
358     exit(1);    /* appease gcc/lint */
359 }
360
361 /*
362  * This runs periodically and checks to see if we have any active services
363  * still running.  If we don't, then we quit.
364  */
365 static void
366 exit_check(cookie)
367     void *cookie;
368 {
369     int no_exit;
370
371     assert(cookie != NULL);
372     no_exit = *(int *)cookie;
373
374     /*
375      * If things are still running, then don't exit.
376      */
377     if (serviceq.qlength > 0)
378         return;
379
380     /*
381      * If the caller asked us to never exit, then we're done
382      */
383     if (no_exit)
384         return;
385
386 #ifdef USE_DBMALLOC
387     dbmalloc_info.end.size = malloc_inuse(&dbmalloc_info.end.hist);
388
389     if (dbmalloc_info.start.size != dbmalloc_info.end.size) {
390         malloc_list(dbfd(), dbmalloc_info.start.hist,
391             dbmalloc_info.end.hist);
392     }
393 #endif
394
395     dbclose();
396     exit(0);
397 }
398
399 /*
400  * Handles new incoming protocol handles.  This is a callback for
401  * security_accept(), which gets called when new handles are detected.
402  */
403 static void
404 protocol_accept(handle, pkt)
405     security_handle_t *handle;
406     pkt_t *pkt;
407 {
408     pkt_t pkt_out;
409     struct active_service *as;
410     char *pktbody, *tok, *service, *arguments;
411     int i;
412
413     /*
414      * If pkt is NULL, then there was a problem with the new connection.
415      */
416     if (pkt == NULL) {
417         dbprintf(("%s: accept error: %s\n",
418             debug_prefix_time(NULL), security_geterror(handle)));
419         pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
420         do_sendpkt(handle, &pkt_out);
421         security_close(handle);
422         return;
423     }
424
425     dbprintf(("%s: accept recv %s pkt:\n<<<<<\n%s>>>>>\n",
426         debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
427
428     /*
429      * If this is not a REQ packet, just forget about it.
430      */
431     if (pkt->type != P_REQ) {
432         dbprintf(("%s: received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n",
433             debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
434         security_close(handle);
435         return;
436     }
437
438     pktbody = service = arguments = NULL;
439     as = NULL;
440
441     /*
442      * Parse out the service and arguments
443      */
444
445     pktbody = stralloc(pkt->body);
446
447     tok = strtok(pktbody, " ");
448     if (tok == NULL)
449         goto badreq;
450     if (strcmp(tok, "SERVICE") != 0)
451         goto badreq;
452
453     tok = strtok(NULL, " \n");
454     if (tok == NULL)
455         goto badreq;
456     service = stralloc(tok);
457
458     /* we call everything else 'arguments' */
459     tok = strtok(NULL, "");
460     if (tok == NULL)
461         goto badreq;
462     arguments = stralloc(tok);
463
464     /* see if it's one we allow */
465     for (i = 0; i < NSERVICES; i++)
466         if (strcmp(services[i], service) == 0)
467             break;
468     if (i == NSERVICES) {
469         dbprintf(("%s: %s: invalid service\n",
470             debug_prefix_time(NULL), service));
471         pkt_init(&pkt_out, P_NAK, "ERROR %s: invalid service\n", service);
472         goto send_pkt_out;
473     }
474
475     service = newvstralloc(service,
476                        libexecdir, "/", service, versionsuffix(),
477                        NULL);
478     if (access(service, X_OK) < 0) {
479         dbprintf(("%s: can't execute %s: %s\n",
480             debug_prefix_time(NULL), service, strerror(errno)));
481             pkt_init(&pkt_out, P_NAK, "ERROR execute access to \"%s\" denied\n",
482             service);
483         goto send_pkt_out;
484     }
485
486     /* see if its already running */
487     for (as = TAILQ_FIRST(&serviceq.tailq); as != NULL;
488         as = TAILQ_NEXT(as, tq)) {
489             if (strcmp(as->cmd, service) == 0 &&
490                 strcmp(as->arguments, arguments) == 0) {
491                     dbprintf(("%s: %s %s: already running, acking req\n",
492                         debug_prefix_time(NULL), service, arguments));
493                     pkt_init(&pkt_out, P_ACK, "");
494                     goto send_pkt_out;
495             }
496     }
497
498     /*
499      * create a new service instance, and send the arguments down
500      * the request pipe.
501      */
502     dbprintf(("%s: creating new service: %s\n%s\n",
503         debug_prefix_time(NULL), service, arguments));
504     as = service_new(handle, service, arguments);
505     if (writebuf(as, arguments, strlen(arguments)) < 0) {
506         const char *errmsg = strerror(errno);
507         dbprintf(("%s: error sending arguments to %s: %s\n",
508             debug_prefix_time(NULL), service, errmsg));
509         pkt_init(&pkt_out, P_NAK, "ERROR error writing arguments to %s: %s\n",
510             service, errmsg);
511         goto send_pkt_out;
512     }
513     aclose(as->reqfd);
514
515     amfree(pktbody);
516     amfree(service);
517     amfree(arguments);
518
519     /*
520      * Move to the sendack state, and start up the state
521      * machine.
522      */
523     as->state = s_sendack;
524     state_machine(as, A_START, NULL);
525     return;
526
527 badreq:
528     pkt_init(&pkt_out, P_NAK, "ERROR invalid REQ\n");
529     dbprintf(("%s: received invalid %s packet:\n<<<<<\n%s>>>>>\n\n",
530         debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
531
532 send_pkt_out:
533     amfree(pktbody);
534     amfree(service);
535     amfree(arguments);
536     if(as) service_delete(as);
537     do_sendpkt(handle, &pkt_out);
538     security_close(handle);
539 }
540
541 /*
542  * Handles incoming protocol packets.  Routes responses to the proper
543  * running service.
544  */
545 static void
546 state_machine(as, action, pkt)
547     struct active_service *as;
548     action_t action;
549     pkt_t *pkt;
550 {
551     action_t retaction;
552     state_t curstate;
553     pkt_t nak;
554
555 #ifdef AMANDAD_DEBUG
556     dbprintf(("%s: state_machine: %X entering\n",
557         debug_prefix_time(NULL), (unsigned int)as));
558 #endif
559     for (;;) {
560         curstate = as->state;
561 #ifdef AMANDAD_DEBUG
562         dbprintf(("%s: state_machine: %X curstate=%s action=%s\n",
563             debug_prefix_time(NULL), (unsigned int)as,
564             state2str(curstate), action2str(action)));
565 #endif
566         retaction = (*curstate)(as, action, pkt);
567 #ifdef AMANDAD_DEBUG
568         dbprintf(("%s: state_machine: %X curstate=%s returned %s (nextstate=%s)\n",
569             debug_prefix_time(NULL),
570             (unsigned int)as, state2str(curstate), action2str(retaction),
571             state2str(as->state)));
572 #endif
573
574         switch (retaction) {
575         /*
576          * State has queued up and is now blocking on input.
577          */
578         case A_PENDING:
579 #ifdef AMANDAD_DEBUG
580             dbprintf(("%s: state_machine: %X leaving (A_PENDING)\n",
581                 debug_prefix_time(NULL), (unsigned int)as));
582 #endif
583             return;
584
585         /*
586          * service has switched states.  Loop.
587          */
588         case A_CONTINUE:
589             break;
590
591         /*
592          * state has determined that the packet it received was bogus.
593          * Send a nak, and return.
594          */
595         case A_SENDNAK:
596             dbprintf(("%s: received unexpected %s packet\n",
597                 debug_prefix_time(NULL), pkt_type2str(pkt->type)));
598             dbprintf(("<<<<<\n%s----\n\n", pkt->body));
599             pkt_init(&nak, P_NAK, "ERROR unexpected packet type %s\n",
600                 pkt_type2str(pkt->type));
601             do_sendpkt(as->security_handle, &nak);
602 #ifdef AMANDAD_DEBUG
603             dbprintf(("%s: state_machine: %X leaving (A_SENDNAK)\n",
604                 debug_prefix_time(NULL), (unsigned int)as));
605 #endif
606             return;
607
608         /*
609          * Service is done.  Remove it and finish.
610          */
611         case A_FINISH:
612             service_delete(as);
613 #ifdef AMANDAD_DEBUG
614             dbprintf(("%s: state_machine: %X leaving (A_FINISH)\n",
615                 debug_prefix_time(NULL), (unsigned int)as));
616 #endif
617             return;
618
619         default:
620             assert(0);
621             break;
622         }
623     }
624     /* NOTREACHED */
625 }
626
627 /*
628  * This state just sends an ack.  After that, we move to the repwait
629  * state to wait for REP data to arrive from the subprocess.
630  */
631 static action_t
632 s_sendack(as, action, pkt)
633     struct active_service *as;
634     action_t action;
635     pkt_t *pkt;
636 {
637     pkt_t ack;
638
639     pkt_init(&ack, P_ACK, "");
640     if (do_sendpkt(as->security_handle, &ack) < 0) {
641         dbprintf(("%s: error sending ACK: %s\n",
642             debug_prefix_time(NULL), security_geterror(as->security_handle)));
643         return (A_FINISH);
644     }
645
646     /*
647      * move to the repwait state
648      * Setup a listener for data on the reply fd, but also
649      * listen for packets over the wire, as the server may
650      * poll us if we take a long time.
651      * Setup a timeout that will fire if it takes too long to
652      * receive rep data.
653      */
654     as->state = s_repwait;
655     as->ev_repfd = event_register(as->repfd, EV_READFD, repfd_recv, as);
656     as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
657         timeout_repfd, as);
658     security_recvpkt(as->security_handle, protocol_recv, as, -1);
659     return (A_PENDING);
660 }
661
662 /*
663  * This is the repwait state.  We have responded to the initial REQ with
664  * an ACK, and we are now waiting for the process we spawned to pass us 
665  * data to send in a REP.
666  */
667 static action_t
668 s_repwait(as, action, pkt)
669     struct active_service *as;
670     action_t action;
671     pkt_t *pkt;
672 {
673     int n;
674
675     /*
676      * We normally shouldn't receive any packets while waiting
677      * for our REP data, but in some cases we do.
678      */
679     if (action == A_RECVPKT) {
680         assert(pkt != NULL);
681         /*
682          * Another req for something that's running.  Just send an ACK
683          * and go back and wait for more data.
684          */
685         if (pkt->type == P_REQ) {
686             dbprintf(("%s: received dup P_REQ packet, ACKing it\n",
687                 debug_prefix_time(NULL)));
688             pkt_init(&as->rep_pkt, P_ACK, "");
689             do_sendpkt(as->security_handle, &as->rep_pkt);
690             return (A_PENDING);
691         }
692         /* something unexpected.  Nak it */
693         return (A_SENDNAK);
694     }
695
696     if (action == A_TIMEOUT) {
697         pkt_init(&as->rep_pkt, P_NAK, "ERROR timeout on reply pipe\n");
698         dbprintf(("%s: %s timed out waiting for REP data\n",
699             debug_prefix_time(NULL), as->cmd));
700         do_sendpkt(as->security_handle, &as->rep_pkt);
701         return (A_FINISH);
702     }
703
704     assert(action == A_RECVREP);
705
706     /*
707      * If the read fails, consider the process dead, and remove it.
708      * Always save room for nul termination.
709      */
710     if (as->repbufsize + 1 >= sizeof(as->repbuf)) {
711         dbprintf(("%s: more than %d bytes in reply\n",
712             debug_prefix_time(NULL), sizeof(as->repbuf)));
713         dbprintf(("%s: reply so far:\n%s\n", debug_prefix(NULL), as->repbuf));
714         pkt_init(&as->rep_pkt, P_NAK, "ERROR more than %d bytes in reply\n",
715             sizeof(as->repbuf));
716         do_sendpkt(as->security_handle, &as->rep_pkt);
717         return (A_FINISH);
718     }
719     do {
720         n = read(as->repfd, as->repbuf + as->repbufsize,
721                  sizeof(as->repbuf) - as->repbufsize - 1);
722     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
723     if (n < 0) {
724         const char *errstr = strerror(errno);
725         dbprintf(("%s: read error on reply pipe: %s\n",
726             debug_prefix_time(NULL), errstr));
727         pkt_init(&as->rep_pkt, P_NAK, "ERROR read error on reply pipe: %s\n",
728             errstr);
729         do_sendpkt(as->security_handle, &as->rep_pkt);
730         return (A_FINISH);
731     }
732     /*
733      * If we got some data, go back and wait for more, or EOF.  Nul terminate
734      * the buffer first.
735      */
736     as->repbuf[n + as->repbufsize] = '\0';
737     if (n > 0) {
738         as->repbufsize += n;
739         if(as->send_partial_reply) {
740             pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
741             do_sendpkt(as->security_handle, &as->rep_pkt);
742             pkt_init(&as->rep_pkt, P_REP, "");
743         }
744  
745         return (A_PENDING);
746     }
747
748     /*
749      * If we got 0, then we hit EOF.  Process the data and release
750      * the timeout.
751      */
752     assert(n == 0);
753
754     assert(as->ev_repfd != NULL);
755     event_release(as->ev_repfd);
756     as->ev_repfd = NULL;
757
758     assert(as->ev_reptimeout != NULL);
759     event_release(as->ev_reptimeout);
760     as->ev_reptimeout = NULL;
761
762     as->state = s_processrep;
763     aclose(as->repfd);
764     return (A_CONTINUE);
765 }
766
767 /*
768  * After we have read in all of the rep data, we process it and send
769  * it out as a REP packet.
770  */
771 static action_t
772 s_processrep(as, action, pkt)
773     struct active_service *as;
774     action_t action;
775     pkt_t *pkt;
776 {
777     char *tok, *repbuf;
778
779     /*
780      * Copy the rep lines into the outgoing packet.
781      *
782      * If this line is a CONNECT, translate it
783      * Format is "CONNECT <tag> <handle> <tag> <handle> etc...
784      * Example:
785      *
786      *  CONNECT DATA 4 MESG 5 INDEX 6
787      *
788      * The tags are arbitrary.  The handles are in the DATA_FD pool.
789      * We need to map these to security streams and pass them back
790      * to the amanda server.  If the handle is -1, then we don't map.
791      */
792     repbuf = stralloc(as->repbuf);
793     pkt_init(&as->rep_pkt, P_REP, "");
794     tok = strtok(repbuf, " ");
795     if (tok == NULL)
796         goto error;
797     if (strcmp(tok, "CONNECT") == 0) {
798         char *line, *nextbuf;
799
800         /* Save the entire line */
801         line = strtok(NULL, "\n");
802         /* Save the buf following the line */
803         nextbuf = strtok(NULL, "");
804
805         if (line == NULL || nextbuf == NULL)
806             goto error;
807
808         pkt_cat(&as->rep_pkt, "CONNECT");
809
810         /* loop over the id/handle pairs */
811         for (;;) {
812             /* id */
813             tok = strtok(line, " ");
814             line = NULL;        /* keep working from line */
815             if (tok == NULL)
816                 break;
817             pkt_cat(&as->rep_pkt, " %s", tok);
818
819             /* handle */
820             tok = strtok(NULL, " \n");
821             if (tok == NULL)
822                 goto error;
823             /* convert the handle into something the server can process */
824             pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
825         }
826         pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
827     } else {
828 error:
829         pkt_cat(&as->rep_pkt, "%s", as->repbuf);
830     }
831
832     /*
833      * We've setup our REP packet in as->rep_pkt.  Now move to the transmission
834      * state.
835      */
836     as->state = s_sendrep;
837     as->repretry = MAX_REP_RETRIES;
838     amfree(repbuf);
839     return (A_CONTINUE);
840 }
841
842 /*
843  * This is the state where we send the REP we just collected from our child.
844  */
845 static action_t
846 s_sendrep(as, action, pkt)
847     struct active_service *as;
848     action_t action;
849     pkt_t *pkt;
850 {
851
852     /*
853      * Transmit it and move to the ack state.
854      */
855     do_sendpkt(as->security_handle, &as->rep_pkt);
856     security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
857     as->state = s_ackwait;
858     return (A_PENDING);
859 }
860
861 /*
862  * This is the state in which we wait for the server to ACK the REP
863  * we just sent it.
864  */
865 static action_t
866 s_ackwait(as, action, pkt)
867     struct active_service *as;
868     action_t action;
869     pkt_t *pkt;
870 {
871     struct datafd_handle *dh;
872     int npipes;
873
874     /*
875      * If we got a timeout, try again, but eventually give up.
876      */
877     if (action == A_TIMEOUT) {
878         if (--as->repretry > 0) {
879             as->state = s_sendrep;
880             return (A_CONTINUE);
881         }
882         dbprintf(("%s: timeout waiting for ACK for our REP\n",
883             debug_prefix_time(NULL)));
884         return (A_FINISH);
885     }
886 #ifdef AMANDAD_DEBUG
887     dbprintf(("%s: received ACK, now opening streams\n",
888         debug_prefix_time(NULL)));
889 #endif
890
891     assert(action == A_RECVPKT);
892     if (pkt->type != P_ACK)
893         return (A_SENDNAK);
894
895     /*
896      * Got the ack, now open the pipes
897      */
898     for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
899         if (dh->netfd == NULL)
900             continue;
901         if (security_stream_accept(dh->netfd) < 0) {
902             dbprintf(("%s: stream %d accept failed: %s\n",
903                 debug_prefix_time(NULL),
904                 dh - &as->data[0], security_geterror(as->security_handle)));
905             security_stream_close(dh->netfd);
906             dh->netfd = NULL;
907         }
908         /* setup an event for reads from it */
909         dh->ev_handle = event_register(dh->fd, EV_READFD, process_netfd, dh);
910     }
911
912     /*
913      * Pipes are open, so auth them.  Count them at the same time.
914      */
915     for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
916         if (dh->netfd == NULL)
917             continue;
918         if (security_stream_auth(dh->netfd) < 0) {
919             security_stream_close(dh->netfd);
920             dh->netfd = NULL;
921             event_release(dh->ev_handle);
922             dh->ev_handle = NULL;
923         } else {
924             npipes++;
925         }
926     }
927
928     /*
929      * If no pipes are open, then we're done.  Otherwise, just start running.
930      * The event handlers on all of the pipes will take it from here.
931      */
932 #ifdef AMANDAD_DEBUG
933     dbprintf(("%s: at end of s_ackwait, npipes is %d\n",
934         debug_prefix_time(NULL), npipes));
935 #endif
936     if (npipes == 0)
937         return (A_FINISH);
938     else {
939         security_close(as->security_handle);
940         as->security_handle = NULL;
941         return (A_PENDING);
942     }
943 }
944
945 /*
946  * Called when a repfd has received data
947  */
948 static void
949 repfd_recv(cookie)
950     void *cookie;
951 {
952     struct active_service *as = cookie;
953
954     assert(as != NULL);
955     assert(as->ev_repfd != NULL);
956
957     state_machine(as, A_RECVREP, NULL);
958 }
959
960 /*
961  * Called when a repfd has timed out
962  */
963 static void
964 timeout_repfd(cookie)
965     void *cookie;
966 {
967     struct active_service *as = cookie;
968
969     assert(as != NULL);
970     assert(as->ev_reptimeout != NULL);
971
972     state_machine(as, A_TIMEOUT, NULL);
973 }
974
975 /*
976  * Called when a handle has received data
977  */
978 static void
979 protocol_recv(cookie, pkt, status)
980     void *cookie;
981     pkt_t *pkt;
982     security_status_t status;
983 {
984     struct active_service *as = cookie;
985
986     assert(as != NULL);
987
988     switch (status) {
989     case S_OK:
990         dbprintf(("%s: received %s pkt:\n<<<<<\n%s>>>>>\n",
991             debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
992         state_machine(as, A_RECVPKT, pkt);
993         break;
994     case S_TIMEOUT:
995         dbprintf(("%s: timeout\n", debug_prefix_time(NULL)));
996         state_machine(as, A_TIMEOUT, NULL);
997         break;
998     case S_ERROR:
999         dbprintf(("%s: receive error: %s\n",
1000             debug_prefix_time(NULL), security_geterror(as->security_handle)));
1001         break;
1002     }
1003 }
1004
1005 /*
1006  * This is a generic relay function that just reads data from one of
1007  * the process's pipes and passes it up the equivalent security_stream_t
1008  */
1009 static void
1010 process_netfd(cookie)
1011     void *cookie;
1012 {
1013     pkt_t nak;
1014     struct datafd_handle *dh = cookie;
1015     struct active_service *as = dh->as;
1016     int n;
1017
1018     do {
1019         n = read(dh->fd, as->databuf, sizeof(as->databuf));
1020     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
1021
1022     /*
1023      * Process has died.
1024      */
1025     if (n < 0) {
1026         pkt_init(&nak, P_NAK, "ERROR data descriptor %d broken: %s\n",
1027             dh->fd, strerror(errno));
1028         goto sendnak;
1029     }
1030     /*
1031      * Process has closed the pipe.  Just remove this event handler.
1032      * If all pipes are closed, shut down this service.
1033      */
1034     if (n == 0) {
1035         event_release(dh->ev_handle);
1036         dh->ev_handle = NULL;
1037         security_stream_close(dh->netfd);
1038         dh->netfd = NULL;
1039         for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1040             if (dh->netfd != NULL)
1041                 return;
1042         }
1043         service_delete(as);
1044         return;
1045     }
1046     if (security_stream_write(dh->netfd, as->databuf, n) < 0) {
1047         /* stream has croaked */
1048         pkt_init(&nak, P_NAK, "ERROR write error on stream %d: %s\n",
1049             security_stream_id(dh->netfd),
1050             security_stream_geterror(dh->netfd));
1051         goto sendnak;
1052     }
1053     return;
1054
1055 sendnak:
1056     do_sendpkt(as->security_handle, &nak);
1057     service_delete(as);
1058 }
1059
1060
1061 /*
1062  * Convert a local stream handle (DATA_FD...) into something that
1063  * can be sent to the amanda server.
1064  *
1065  * Returns a number that should be sent to the server in the REP packet.
1066  */
1067 static int
1068 allocstream(as, handle)
1069     struct active_service *as;
1070     int handle;
1071 {
1072     struct datafd_handle *dh;
1073
1074     /* if the handle is -1, then we don't bother */
1075     if (handle < 0)
1076         return (-1);
1077
1078     /* make sure the handle's kosher */
1079     if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
1080         return (-1);
1081
1082     /* get a pointer into our handle array */
1083     dh = &as->data[handle - DATA_FD_OFFSET];
1084
1085     /* make sure we're not already using the net handle */
1086     if (dh->netfd != NULL)
1087         return (-1);
1088
1089     /* allocate a stream from the security layer and return */
1090     dh->netfd = security_stream_server(as->security_handle);
1091     if (dh->netfd == NULL) {
1092         dbprintf(("%s: couldn't open stream to server: %s\n",
1093             debug_prefix_time(NULL), security_geterror(as->security_handle)));
1094         return (-1);
1095     }
1096
1097     /*
1098      * convert the stream into a numeric id that can be sent to the
1099      * remote end.
1100      */
1101     return (security_stream_id(dh->netfd));
1102 }
1103
1104 /*
1105  * Create a new service instance
1106  */
1107 static struct active_service *
1108 service_new(security_handle, cmd, arguments)
1109     security_handle_t *security_handle;
1110     const char *cmd, *arguments;
1111 {
1112     int data[DATA_FD_COUNT + 2][2], i;
1113     struct active_service *as;
1114     pid_t pid;
1115     int newfd;
1116
1117     assert(security_handle != NULL);
1118     assert(cmd != NULL);
1119     assert(arguments != NULL);
1120
1121     /* a plethora of pipes */
1122     for (i = 0; i < DATA_FD_COUNT + 2; i++)
1123         if (pipe(data[i]) < 0)
1124             error("pipe: %s", strerror(errno));
1125
1126     switch(pid = fork()) {
1127     case -1:
1128         error("could not fork service %s: %s", cmd, strerror(errno));
1129     default:
1130         /*
1131          * The parent.  Close the far ends of our pipes and return.
1132          */
1133         as = alloc(sizeof(*as));
1134         as->cmd = stralloc(cmd);
1135         as->arguments = stralloc(arguments);
1136         as->security_handle = security_handle;
1137         as->state = NULL;
1138         as->pid = pid;
1139         as->send_partial_reply = 0;
1140         if(strcmp(cmd+(strlen(cmd)-8), "sendsize") == 0) {
1141             g_option_t *g_options;
1142             char *option_str, *p;
1143
1144             option_str = stralloc(as->arguments+8);
1145             p = strchr(option_str,'\n');
1146             if(p) *p = '\0';
1147
1148             g_options = parse_g_options(option_str, 1);
1149             if(am_has_feature(g_options->features, fe_partial_estimate)) {
1150                 as->send_partial_reply = 1;
1151             }
1152             amfree(g_options);
1153             amfree(option_str);
1154         }
1155
1156         /* write to the request pipe */
1157         aclose(data[0][0]);
1158         as->reqfd = data[0][1];
1159
1160         /*
1161          * read from the reply pipe
1162          */
1163         as->repfd = data[1][0];
1164         aclose(data[1][1]);
1165         as->ev_repfd = NULL;
1166         as->repbufsize = 0;
1167         as->repretry = 0;
1168
1169         /*
1170          * read from the rest of the general-use pipes
1171          * (netfds are opened as the client requests them)
1172          */
1173         for (i = 0; i < DATA_FD_COUNT; i++) {
1174             aclose(data[i + 2][1]);
1175             as->data[i].fd = data[i + 2][0];
1176             as->data[i].ev_handle = NULL;
1177             as->data[i].netfd = NULL;
1178             as->data[i].as = as;
1179         }
1180
1181         /* add it to the service queue */
1182         /* increment the active service count */
1183         TAILQ_INSERT_TAIL(&serviceq.tailq, as, tq);
1184         serviceq.qlength++;
1185
1186         return (as);
1187     case 0:
1188         /*
1189          * The child.  Put our pipes in their advertised locations
1190          * and start up.
1191          */
1192 #ifdef FORCE_USERID
1193         seteuid((uid_t)0);
1194         setuid(client_uid);
1195 #endif
1196
1197         /*
1198          * The data stream is stdin in the new process
1199          */
1200         if (dup2(data[0][0], 0) < 0) {
1201             error("dup %d to %d failed: %s\n", data[0][0], 0,
1202                 strerror(errno));
1203         }
1204         aclose(data[0][0]);
1205         aclose(data[0][1]);
1206
1207         /*
1208          * The reply stream is stdout
1209          */
1210         if (dup2(data[1][1], 1) < 0) {
1211             error("dup %d to %d failed: %s\n", data[1][1], 1,
1212                 strerror(errno));
1213         }
1214         aclose(data[1][0]);
1215         aclose(data[1][1]);
1216
1217         /*
1218          * The rest start at the offset defined in amandad.h, and continue
1219          * through the internal defined.
1220          */
1221         for (i = 0; i < DATA_FD_COUNT; i++)
1222             aclose(data[i + 2][0]);
1223
1224         /*
1225          *  Make sure they are not open in the range DATA_FD_OFFSET to 
1226          *      DATA_FD_OFFSET + DATA_FD_COUNT - 1
1227          */
1228         for (i = 0; i < DATA_FD_COUNT; i++) {
1229             while(data[i + 2][1] >= DATA_FD_OFFSET &&
1230                   data[i + 2][1] <= DATA_FD_OFFSET + DATA_FD_COUNT - 1) {
1231                 newfd = dup(data[i + 2][1]);
1232                 if(newfd == -1)
1233                     error("Can't dup out off DATA_FD range");
1234                 data[i + 2][1] = newfd;
1235             }
1236         }
1237         for (i = 0; i < DATA_FD_COUNT; i++)
1238             close(DATA_FD_OFFSET + i);
1239
1240         for (i = 0; i < DATA_FD_COUNT; i++) {
1241             if (dup2(data[i + 2][1], i + DATA_FD_OFFSET) < 0) {
1242                 error("dup %d to %d failed: %s\n", data[i + 2][1],
1243                     i + DATA_FD_OFFSET, strerror(errno));
1244             }
1245             aclose(data[i + 2][1]);
1246         }
1247
1248         execle(cmd, cmd, NULL, safe_env());
1249         error("could not exec service %s: %s", cmd, strerror(errno));
1250     }
1251     /* NOTREACHED */
1252     return NULL;
1253 }
1254
1255 /*
1256  * Unallocate a service instance
1257  */
1258 static void
1259 service_delete(as)
1260     struct active_service *as;
1261 {
1262     int i;
1263     struct datafd_handle *dh;
1264
1265 #ifdef AMANDAD_DEBUG
1266         dbprintf(("%s: closing service: %s\n",
1267             debug_prefix_time(NULL), (as->cmd)?as->cmd:"??UNKONWN??"));
1268 #endif
1269
1270     assert(as != NULL);
1271
1272     assert(as->cmd != NULL);
1273     amfree(as->cmd);
1274
1275     assert(as->arguments != NULL);
1276     amfree(as->arguments);
1277
1278     if (as->reqfd != -1)
1279         aclose(as->reqfd);
1280     if (as->repfd != -1)
1281         aclose(as->repfd);
1282
1283     if (as->ev_repfd != NULL)
1284         event_release(as->ev_repfd);
1285     if (as->ev_reptimeout != NULL)
1286         event_release(as->ev_reptimeout);
1287
1288     for (i = 0; i < DATA_FD_COUNT; i++) {
1289         dh = &as->data[i];
1290
1291         aclose(dh->fd);
1292
1293         if (dh->netfd != NULL)
1294             security_stream_close(dh->netfd);
1295
1296         if (dh->ev_handle != NULL)
1297             event_release(dh->ev_handle);
1298     }
1299
1300     if (as->security_handle != NULL)
1301         security_close(as->security_handle);
1302
1303     assert(as->pid > 0);
1304     kill(as->pid, SIGTERM);
1305     waitpid(as->pid, NULL, WNOHANG);
1306
1307     TAILQ_REMOVE(&serviceq.tailq, as, tq);
1308     assert(serviceq.qlength > 0);
1309     serviceq.qlength--;
1310
1311     amfree(as);
1312 }
1313
1314 /*
1315  * Like 'fullwrite', but does the work in a child process so pipelines
1316  * do not hang.
1317  */
1318 static int
1319 writebuf(as, bufp, size)
1320     struct active_service *as;
1321     const void *bufp;
1322     size_t size;
1323 {
1324     int pid;
1325
1326     switch (pid=fork()) {
1327     case -1:
1328         return -1;
1329
1330     default:
1331         waitpid(pid, NULL, WNOHANG);
1332         return 0;                       /* this is the parent */
1333
1334     case 0:
1335         break;                          /* this is the child */
1336     }
1337     aclose (as->repfd);                 /* make sure we are not a reader */
1338     exit (fullwrite(as->reqfd, bufp, size) != size);
1339 }
1340
1341 static int
1342 do_sendpkt(handle, pkt)
1343     security_handle_t *handle;
1344     pkt_t *pkt;
1345 {
1346     dbprintf(("%s: sending %s pkt:\n<<<<<\n%s>>>>>\n",
1347         debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
1348     return security_sendpkt(handle, pkt);
1349 }
1350
1351 #ifdef AMANDAD_DEBUG
1352 /*
1353  * Convert a state into a string
1354  */
1355 static const char *
1356 state2str(state)
1357     state_t state;
1358 {
1359     static const struct {
1360         state_t state;
1361         const char str[13];
1362     } states[] = {
1363 #define X(state)        { state, stringize(state) }
1364         X(s_sendack),
1365         X(s_repwait),
1366         X(s_processrep),
1367         X(s_sendrep),
1368         X(s_ackwait),
1369 #undef X
1370     };
1371     int i;
1372
1373     for (i = 0; i < sizeof(states) / sizeof(states[0]); i++)
1374         if (state == states[i].state)
1375             return (states[i].str);
1376     return ("INVALID STATE");
1377 }
1378
1379 /*
1380  * Convert an action into a string
1381  */
1382 static const char *
1383 action2str(action)
1384     action_t action;
1385 {
1386     static const struct {
1387         action_t action;
1388         const char str[12];
1389     } actions[] = {
1390 #define X(action)       { action, stringize(action) }
1391         X(A_START),
1392         X(A_RECVPKT),
1393         X(A_RECVREP),
1394         X(A_PENDING),
1395         X(A_FINISH),
1396         X(A_CONTINUE),
1397         X(A_SENDNAK),
1398         X(A_TIMEOUT),
1399 #undef X
1400     };
1401     int i;
1402
1403     for (i = 0; i < sizeof(actions) / sizeof(actions[0]); i++)
1404         if (action == actions[i].action)
1405             return (actions[i].str);
1406     return ("UNKNOWN ACTION");
1407 }
1408 #endif  /* AMANDAD_DEBUG */