Imported Upstream version 3.3.3
[debian/amanda] / amandad-src / amandad.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1999 University of Maryland at College Park
4  * Copyright (c) 2007-2012 Zmanda, Inc.  All Rights Reserved.
5  * All Rights Reserved.
6  *
7  * Permission to use, copy, modify, distribute, and sell this software and its
8  * documentation for any purpose is hereby granted without fee, provided that
9  * the above copyright notice appear in all copies and that both that
10  * copyright notice and this permission notice appear in supporting
11  * documentation, and that the name of U.M. not be used in advertising or
12  * publicity pertaining to distribution of the software without specific,
13  * written prior permission.  U.M. makes no representations about the
14  * suitability of this software for any purpose.  It is provided "as is"
15  * without express or implied warranty.
16  *
17  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
19  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
20  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
21  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
22  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
23  *
24  * Authors: the Amanda Development Team.  Its members are listed in a
25  * file named AUTHORS, in the root directory of this distribution.
26  */
27
28 /*
29  * $Id: amandad.c,v 1.18 2006/08/21 20:17:09 martinea Exp $
30  *
31  * handle client-host side of Amanda network communications, including
32  * security checks, execution of the proper service, and acking the
33  * master side
34  */
35
36 #include "amanda.h"
37 #include "amandad.h"
38 #include "clock.h"
39 #include "event.h"
40 #include "amfeatures.h"
41 #include "packet.h"
42 #include "version.h"
43 #include "security.h"
44 #include "stream.h"
45 #include "util.h"
46 #include "conffile.h"
47
48 #define REP_TIMEOUT     (6*60*60)       /* secs for service to reply */
49 #define ACK_TIMEOUT     10              /* XXX should be configurable */
50 #define STDERR_PIPE (DATA_FD_COUNT + 1)
51
52 #define amandad_debug(i, ...) do {      \
53         if ((i) <= debug_amandad) {     \
54                 dbprintf(__VA_ARGS__);  \
55         }                               \
56 } while (0)
57
58 /*
59  * These are the actions for entering the state machine
60  */
61 typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
62     A_SENDNAK, A_TIMEOUT } action_t;
63
64 /*
65  * This is a state in the state machine.  It is a function pointer to
66  * the function that actually implements the state.
67  */
68 struct active_service;
69 typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
70
71 /* string that we scan for in sendbackup's MESG stream */
72 static const char info_end_str[] = "sendbackup: info end\n";
73 #define INFO_END_LEN (sizeof(info_end_str)-1)
74
75 /* 
76  * Here are the services that we allow.
77  * Must be in the same order as services[].
78  */
79 typedef enum {
80     SERVICE_NOOP,
81     SERVICE_SENDSIZE,
82     SERVICE_SENDBACKUP,
83     SERVICE_SELFCHECK,
84     SERVICE_AMINDEXD,
85     SERVICE_AMIDXTAPED,
86     SERVICE_AMDUMPD
87 } service_t;
88
89 static struct services {
90     char *name;
91     int  active;
92     service_t service;
93 } services[] = {
94    { "noop", 1, SERVICE_NOOP },
95    { "sendsize", 1, SERVICE_SENDSIZE },
96    { "sendbackup", 1, SERVICE_SENDBACKUP },
97    { "selfcheck", 1, SERVICE_SELFCHECK },
98    { "amindexd", 0, SERVICE_AMINDEXD },
99    { "amidxtaped", 0, SERVICE_AMIDXTAPED },
100    { "amdumpd", 0, SERVICE_AMDUMPD }
101 };
102 #define NSERVICES       (int)(sizeof(services) / sizeof(services[0]))
103
104 /*
105  * This structure describes an active running service.
106  *
107  * An active service is something running that we have received
108  * a request for.  This structure holds info on that service, including
109  * file descriptors for data, etc, as well as the security handle
110  * for communications with the amanda server.
111  */
112 struct active_service {
113     service_t service;                  /* service name */
114     char *cmd;                          /* name of command we ran */
115     char *arguments;                    /* arguments we sent it */
116     security_handle_t *security_handle; /* remote server */
117     state_t state;                      /* how far this has progressed */
118     pid_t pid;                          /* pid of subprocess */
119     int send_partial_reply;             /* send PREP packet */
120     int reqfd;                          /* pipe to write requests */
121     int repfd;                          /* pipe to read replies */
122     int errfd;                          /* pipe to read stderr */
123     event_handle_t *ev_repfd;           /* read event handle for repfd */
124     event_handle_t *ev_reptimeout;      /* timeout for rep data */
125     event_handle_t *ev_errfd;           /* read event handle for errfd */
126     pkt_t rep_pkt;                      /* rep packet we're sending out */
127     char *errbuf;                       /* buffer to read the err into */
128     char *repbuf;                       /* buffer to read the rep into */
129     size_t bufsize;                     /* length of repbuf */
130     size_t repbufsize;                  /* length of repbuf */
131     int repretry;                       /* times we'll retry sending the rep */
132     int seen_info_end;                  /* have we seen "sendbackup info end\n"? */
133     char info_end_buf[INFO_END_LEN];    /* last few bytes read, used for scanning for info end */
134
135     /*
136      * General user streams to the process, and their equivalent
137      * network streams.
138      */
139     struct datafd_handle {
140         int fd_read;                    /* pipe to child process */
141         int fd_write;                   /* pipe to child process */
142         event_handle_t *ev_read;        /* it's read event handle */
143         event_handle_t *ev_write;       /* it's write event handle */
144         security_stream_t *netfd;       /* stream to amanda server */
145         struct active_service *as;      /* pointer back to our enclosure */
146     } data[DATA_FD_COUNT];
147     char databuf[NETWORK_BLOCK_BYTES];  /* buffer to relay netfd data in */
148 };
149
150 /*
151  * Queue of outstanding requests that we are running.
152  */
153 GSList *serviceq = NULL;
154
155 static event_handle_t *exit_event;
156 static int exit_on_qlength = 1;
157 static char *auth = NULL;
158 static kencrypt_type amandad_kencrypt = KENCRYPT_NONE;
159
160 int main(int argc, char **argv);
161
162 static int allocstream(struct active_service *, int);
163 static void exit_check(void *);
164 static void protocol_accept(security_handle_t *, pkt_t *);
165 static void state_machine(struct active_service *, action_t, pkt_t *);
166
167 static action_t s_sendack(struct active_service *, action_t, pkt_t *);
168 static action_t s_repwait(struct active_service *, action_t, pkt_t *);
169 static action_t s_processrep(struct active_service *, action_t, pkt_t *);
170 static action_t s_sendrep(struct active_service *, action_t, pkt_t *);
171 static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
172
173 static void repfd_recv(void *);
174 static void process_errfd(void *cookie);
175 static void errfd_recv(void *);
176 static void timeout_repfd(void *);
177 static void protocol_recv(void *, pkt_t *, security_status_t);
178 static void process_readnetfd(void *);
179 static void process_writenetfd(void *, void *, ssize_t);
180 static struct active_service *service_new(security_handle_t *,
181     const char *, service_t, const char *);
182 static void service_delete(struct active_service *);
183 static int writebuf(struct active_service *, const void *, size_t);
184 static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
185 static char *amandad_get_security_conf (char *, void *);
186
187 static const char *state2str(state_t);
188 static const char *action2str(action_t);
189
190 int
191 main(
192     int         argc,
193     char **     argv)
194 {
195     int i, j;
196     int have_services;
197     int in, out;
198     const security_driver_t *secdrv;
199     int no_exit = 0;
200     char *pgm = "amandad";              /* in case argv[0] is not set */
201 #if defined(USE_REUSEADDR)
202     const int on = 1;
203     int r;
204 #endif
205
206     /*
207      * Configure program for internationalization:
208      *   1) Only set the message locale for now.
209      *   2) Set textdomain for all amanda related programs to "amanda"
210      *      We don't want to be forced to support dozens of message catalogs.
211      */  
212     setlocale(LC_MESSAGES, "C");
213     textdomain("amanda"); 
214
215     safe_fd(-1, 0);
216     safe_cd();
217
218     /*
219      * Nexenta needs the SUN_PERSONALITY env variable to be unset, otherwise
220      * the Sun version of tar in /usr/sun/sbin/tar is called instead.
221      *
222      * On other operating systems this will have no effect.
223      */
224
225 #ifdef HAVE_UNSETENV
226     unsetenv("SUN_PERSONALITY");
227 #endif
228
229     /*
230      * When called via inetd, it is not uncommon to forget to put the
231      * argv[0] value on the config line.  On some systems (e.g. Solaris)
232      * this causes argv and/or argv[0] to be NULL, so we have to be
233      * careful getting our name.
234      */
235     if ((argv == NULL) || (argv[0] == NULL)) {
236             pgm = "amandad";            /* in case argv[0] is not set */
237     } else {
238             pgm = basename(argv[0]);    /* Strip of leading path get debug name */
239     }
240     set_pname(pgm);
241     dbopen(DBG_SUBDIR_AMANDAD);
242
243     if(argv == NULL) {
244         error(_("argv == NULL\n"));
245         /*NOTREACHED*/
246     }
247
248     if (argc > 1 && argv && argv[1] && g_str_equal(argv[1], "--version")) {
249         printf("amandad-%s\n", VERSION);
250         return (0);
251     }
252
253     /* Don't die when child closes pipe */
254     signal(SIGPIPE, SIG_IGN);
255
256     /* Parse the configuration; we'll handle errors later */
257     config_init(CONFIG_INIT_CLIENT, NULL);
258
259     if (geteuid() == 0) {
260         check_running_as(RUNNING_AS_ROOT);
261         initgroups(CLIENT_LOGIN, get_client_gid());
262         setgid(get_client_gid());
263         setegid(get_client_gid());
264         seteuid(get_client_uid());
265     } else {
266         check_running_as(RUNNING_AS_CLIENT_LOGIN);
267     }
268
269     add_amanda_log_handler(amanda_log_stderr);
270     add_amanda_log_handler(amanda_log_syslog);
271
272     /*
273      * ad-hoc argument parsing
274      *
275      * We accept        -auth=[authentication type]
276      *                  -no-exit
277      *                  -tcp=[port]
278      *                  -udp=[port]
279      * We also add a list of services that amandad can launch
280      */
281     secdrv = NULL;
282     in = 0; out = 1;            /* default to stdin/stdout */
283     have_services = 0;
284     for (i = 1; i < argc; i++) {
285         /*
286          * Get a driver for a security type specified after -auth=
287          */
288         if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
289             argv[i] += strlen("-auth=");
290             secdrv = security_getdriver(argv[i]);
291             auth = argv[i];
292             if (secdrv == NULL) {
293                 error(_("no driver for security type '%s'\n"), argv[i]);
294                 /*NOTREACHED*/
295             }
296             if (strcmp(auth, "local") == 0 ||
297                 strcmp(auth, "rsh") == 0 ||
298                 strcmp(auth, "ssh") == 0) {
299                 int i;
300                 for (i=0; i < NSERVICES; i++) {
301                     services[i].active = 1;
302                 }
303             }
304             continue;
305         }
306
307         /*
308          * If -no-exit is specified, always run even after requests have
309          * been satisfied.
310          */
311         else if (strcmp(argv[i], "-no-exit") == 0) {
312             no_exit = 1;
313             continue;
314         }
315
316         /*
317          * Allow us to directly bind to a udp port for debugging.
318          * This may only apply to some security types.
319          */
320         else if (strncmp(argv[i], "-udp=", strlen("-udp=")) == 0) {
321 #ifdef WORKING_IPV6
322             struct sockaddr_in6 sin;
323 #else
324             struct sockaddr_in sin;
325 #endif
326
327             argv[i] += strlen("-udp=");
328 #ifdef WORKING_IPV6
329             in = out = socket(AF_INET6, SOCK_DGRAM, 0);
330 #else
331             in = out = socket(AF_INET, SOCK_DGRAM, 0);
332 #endif
333             if (in < 0) {
334                 error(_("can't create dgram socket: %s\n"), strerror(errno));
335                 /*NOTREACHED*/
336             }
337 #ifdef USE_REUSEADDR
338             r = setsockopt(in, SOL_SOCKET, SO_REUSEADDR,
339                 (void *)&on, (socklen_t_equiv)sizeof(on));
340             if (r < 0) {
341                 dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
342                           strerror(errno));
343             }
344 #endif
345
346 #ifdef WORKING_IPV6
347             sin.sin6_family = (sa_family_t)AF_INET6;
348             sin.sin6_addr = in6addr_any;
349             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
350 #else
351             sin.sin_family = (sa_family_t)AF_INET;
352             sin.sin_addr.s_addr = INADDR_ANY;
353             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
354 #endif
355             if (bind(in, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
356                 error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
357                     strerror(errno));
358                 /*NOTREACHED*/
359             }
360         }
361         /*
362          * Ditto for tcp ports.
363          */
364         else if (strncmp(argv[i], "-tcp=", strlen("-tcp=")) == 0) {
365 #ifdef WORKING_IPV6
366             struct sockaddr_in6 sin;
367 #else
368             struct sockaddr_in sin;
369 #endif
370             int sock;
371             socklen_t_equiv n;
372
373             argv[i] += strlen("-tcp=");
374 #ifdef WORKING_IPV6
375             sock = socket(AF_INET6, SOCK_STREAM, 0);
376 #else
377             sock = socket(AF_INET, SOCK_STREAM, 0);
378 #endif
379             if (sock < 0) {
380                 error(_("can't create tcp socket: %s\n"), strerror(errno));
381                 /*NOTREACHED*/
382             }
383 #ifdef USE_REUSEADDR
384             r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
385                 (void *)&on, (socklen_t_equiv)sizeof(on));
386             if (r < 0) {
387                 dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
388                           strerror(errno));
389             }
390 #endif
391 #ifdef WORKING_IPV6
392             sin.sin6_family = (sa_family_t)AF_INET6;
393             sin.sin6_addr = in6addr_any;
394             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
395 #else
396             sin.sin_family = (sa_family_t)AF_INET;
397             sin.sin_addr.s_addr = INADDR_ANY;
398             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
399 #endif
400             if (bind(sock, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
401                 error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
402                     strerror(errno));
403                 /*NOTREACHED*/
404             }
405             listen(sock, 10);
406             n = (socklen_t_equiv)sizeof(sin);
407             in = out = accept(sock, (struct sockaddr *)&sin, &n);
408         }
409         /*
410          * It must be a service name
411          */
412         else {
413             /* clear all services */
414             if(!have_services) {
415                 for (j = 0; j < (int)NSERVICES; j++)
416                     services[j].active = 0;
417             }
418             have_services = 1;
419
420             if(strcmp(argv[i],"amdump") == 0) {
421                 services[0].active = 1;
422                 services[1].active = 1;
423                 services[2].active = 1;
424                 services[3].active = 1;
425             }
426             else {
427                 for (j = 0; j < (int)NSERVICES; j++)
428                     if (strcmp(services[j].name, argv[i]) == 0)
429                         break;
430                 if (j == (int)NSERVICES) {
431                     dbprintf(_("%s: invalid service\n"), argv[i]);
432                     exit(1);
433                 }
434                 services[j].active = 1;
435             }
436         }
437     }
438
439     /*
440      * If no security type specified, use BSDTCP
441      */
442     if (secdrv == NULL) {
443         secdrv = security_getdriver("BSDTCP");
444         auth = "bsdtcp";
445         if (secdrv == NULL) {
446             error(_("no driver for default security type 'BSDTCP'\n"));
447             /*NOTREACHED*/
448         }
449     }
450
451     if(strcasecmp(auth, "rsh") == 0 ||
452        strcasecmp(auth, "ssh") == 0 ||
453        strcasecmp(auth, "local") == 0 ||
454        strcasecmp(auth, "bsdtcp") == 0) {
455         exit_on_qlength = 1;
456     }
457
458 #ifndef SINGLE_USERID
459     if (geteuid() == 0) {
460         if (strcasecmp(auth, "krb5") != 0) {
461             struct passwd *pwd;
462             /* lookup our local user name */
463             if ((pwd = getpwnam(CLIENT_LOGIN)) == NULL) {
464                 error(_("getpwnam(%s) failed."), CLIENT_LOGIN);
465             }
466
467             if (pwd->pw_uid != 0) {
468                 error(_("'amandad' must be run as user '%s' when using '%s' authentication"),
469                       CLIENT_LOGIN, auth);
470             }
471         }
472     } else {
473         if (strcasecmp(auth, "krb5") == 0) {
474             error(_("'amandad' must be run as user 'root' when using 'krb5' authentication"));
475         }
476     }
477 #endif
478
479     /* initialize */
480
481     startclock();
482
483     dbprintf(_("version %s\n"), VERSION);
484     for (i = 0; version_info[i] != NULL; i++) {
485         dbprintf("    %s", version_info[i]);
486     }
487
488     if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
489         dbprintf(_("WARNING: argv[0] not defined: check inetd.conf\n"));
490     }
491
492     /* krb5 require the euid to be 0 */
493     if (strcasecmp(auth, "krb5") == 0) {
494         seteuid((uid_t)0);
495     }
496
497     /*
498      * Schedule to call protocol_accept() when new security handles
499      * are created on stdin.
500      */
501     security_accept(secdrv, amandad_get_security_conf, in, out, protocol_accept, NULL);
502
503     /*
504      * Schedule an event that will try to exit every 30 seconds if there
505      * are no requests outstanding.
506      */
507     exit_event = event_register((event_id_t)30, EV_TIME, exit_check, &no_exit);
508
509     /*
510      * Call event_loop() with an arg of 0, telling it to block until all
511      * events are completed.
512      */
513     event_loop(0);
514
515     close(in);
516     close(out);
517     dbclose();
518     return(0);
519 }
520
521 /*
522  * This runs periodically and checks to see if we have any active services
523  * still running.  If we don't, then we quit.
524  */
525 static void
526 exit_check(
527     void *      cookie)
528 {
529     int no_exit;
530
531     assert(cookie != NULL);
532     no_exit = *(int *)cookie;
533
534     /*
535      * If things are still running, then don't exit.
536      */
537     if (g_slist_length(serviceq) > 0)
538         return;
539
540     /*
541      * If the caller asked us to never exit, then we're done
542      */
543     if (no_exit)
544         return;
545
546     g_debug("timeout exit");
547     dbclose();
548     exit(0);
549 }
550
551 /*
552  * Handles new incoming protocol handles.  This is a callback for
553  * security_accept(), which gets called when new handles are detected.
554  */
555 static void
556 protocol_accept(
557     security_handle_t * handle,
558     pkt_t *             pkt)
559 {
560     pkt_t pkt_out;
561     GSList *iter;
562     struct active_service *as;
563     char *pktbody, *tok, *service, *arguments;
564     char *service_path = NULL;
565     GSList *errlist = NULL;
566     int i;
567     char *peer_name;
568
569     pkt_out.body = NULL;
570
571     /*
572      * If handle is NULL, then the connection is closed.
573      */
574     if (handle == NULL) {
575         if (exit_on_qlength && exit_event) {
576             /* remove the timeout, we will exit once the service terminate */
577             event_release(exit_event);
578             exit_event = NULL;
579         }
580         return;
581     }
582
583     /*
584      * If we have errors (not warnings) from the config file, let the remote system
585      * know immediately.  Unfortunately, we only get one ERROR line, so if there
586      * are multiple errors, we just show the first.
587      */
588     if (config_errors(&errlist) >= CFGERR_ERRORS) {
589         GSList *iter = errlist;
590         char *errmsg;
591         gboolean multiple_errors = FALSE;
592
593         if (iter) {
594             errmsg = (char *)iter->data;
595             if (iter->next)
596                 multiple_errors = TRUE;
597         } else {
598             errmsg = "(no error message)";
599         }
600
601         pkt_init(&pkt_out, P_NAK, "ERROR %s%s", errmsg,
602             multiple_errors? _(" (additional errors not displayed)"):"");
603         do_sendpkt(handle, &pkt_out);
604         amfree(pkt_out.body);
605         security_close(handle);
606         return;
607     }
608
609     peer_name = security_get_authenticated_peer_name(handle);
610     g_debug("authenticated peer name is '%s'", peer_name);
611     amfree(peer_name);
612
613     /*
614      * If pkt is NULL, then there was a problem with the new connection.
615      */
616     if (pkt == NULL) {
617         dbprintf(_("accept error: %s\n"), security_geterror(handle));
618         pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
619         do_sendpkt(handle, &pkt_out);
620         amfree(pkt_out.body);
621         security_close(handle);
622         return;
623     }
624
625     dbprintf(_("accept recv %s pkt:\n<<<<<\n%s>>>>>\n"),
626         pkt_type2str(pkt->type), pkt->body);
627
628     /*
629      * If this is not a REQ packet, just forget about it.
630      */
631     if (pkt->type != P_REQ) {
632         dbprintf(_("received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n"),
633             pkt_type2str(pkt->type), pkt->body);
634         security_close(handle);
635         return;
636     }
637
638     pktbody = service = arguments = NULL;
639     as = NULL;
640
641     /*
642      * Parse out the service and arguments
643      */
644
645     pktbody = stralloc(pkt->body);
646
647     tok = strtok(pktbody, " ");
648     if (tok == NULL)
649         goto badreq;
650     if (strcmp(tok, "SERVICE") != 0)
651         goto badreq;
652
653     tok = strtok(NULL, " \n");
654     if (tok == NULL)
655         goto badreq;
656     service = stralloc(tok);
657
658     /* we call everything else 'arguments' */
659     tok = strtok(NULL, "");
660     if (tok == NULL)
661         goto badreq;
662     arguments = stralloc(tok);
663
664     /* see if it's one we allow */
665     for (i = 0; i < (int)NSERVICES; i++)
666         if (services[i].active == 1 && strcmp(services[i].name, service) == 0)
667             break;
668     if (i == (int)NSERVICES) {
669         dbprintf(_("%s: invalid service\n"), service);
670         pkt_init(&pkt_out, P_NAK, _("ERROR %s: invalid service, add '%s' as argument to amandad\n"), service, service);
671         goto send_pkt_out;
672     }
673
674     service_path = vstralloc(amlibexecdir, "/", service, NULL);
675     if (access(service_path, X_OK) < 0) {
676         dbprintf(_("can't execute %s: %s\n"), service_path, strerror(errno));
677             pkt_init(&pkt_out, P_NAK,
678                      _("ERROR execute access to \"%s\" denied\n"),
679                      service_path);
680         goto send_pkt_out;
681     }
682
683     /* see if its already running */
684     for (iter = serviceq; iter != NULL; iter = g_slist_next(iter)) {
685         as = (struct active_service *)iter->data;
686             if (strcmp(as->cmd, service_path) == 0 &&
687                 strcmp(as->arguments, arguments) == 0) {
688                     dbprintf(_("%s %s: already running, acking req\n"),
689                         service, arguments);
690                     pkt_init_empty(&pkt_out, P_ACK);
691                     goto send_pkt_out_no_delete;
692             }
693     }
694
695     /*
696      * create a new service instance, and send the arguments down
697      * the request pipe.
698      */
699     dbprintf(_("creating new service: %s\n%s\n"), service, arguments);
700     as = service_new(handle, service_path, services[i].service, arguments);
701     if (writebuf(as, arguments, strlen(arguments)) < 0) {
702         const char *errmsg = strerror(errno);
703         dbprintf(_("error sending arguments to %s: %s\n"), service, errmsg);
704         pkt_init(&pkt_out, P_NAK, _("ERROR error writing arguments to %s: %s\n"),
705             service, errmsg);
706         goto send_pkt_out;
707     }
708     aclose(as->reqfd);
709
710     amfree(pktbody);
711     amfree(service);
712     amfree(service_path);
713     amfree(arguments);
714
715     /*
716      * Move to the sendack state, and start up the state
717      * machine.
718      */
719     as->state = s_sendack;
720     state_machine(as, A_START, NULL);
721     return;
722
723 badreq:
724     pkt_init(&pkt_out, P_NAK, _("ERROR invalid REQ\n"));
725     dbprintf(_("received invalid %s packet:\n<<<<<\n%s>>>>>\n\n"),
726         pkt_type2str(pkt->type), pkt->body);
727
728 send_pkt_out:
729     if(as)
730         service_delete(as);
731 send_pkt_out_no_delete:
732     amfree(pktbody);
733     amfree(service_path);
734     amfree(service);
735     amfree(arguments);
736     do_sendpkt(handle, &pkt_out);
737     security_close(handle);
738     amfree(pkt_out.body);
739 }
740
741 /*
742  * Handles incoming protocol packets.  Routes responses to the proper
743  * running service.
744  */
745 static void
746 state_machine(
747     struct active_service *     as,
748     action_t                    action,
749     pkt_t *                     pkt)
750 {
751     action_t retaction;
752     state_t curstate;
753     pkt_t nak;
754
755     amandad_debug(1, _("state_machine: %p entering\n"), as);
756     for (;;) {
757         curstate = as->state;
758         amandad_debug(1, _("state_machine: %p curstate=%s action=%s\n"), as,
759                           state2str(curstate), action2str(action));
760         retaction = (*curstate)(as, action, pkt);
761         amandad_debug(1, _("state_machine: %p curstate=%s returned %s (nextstate=%s)\n"),
762                           as, state2str(curstate), action2str(retaction),
763                           state2str(as->state));
764
765         switch (retaction) {
766         /*
767          * State has queued up and is now blocking on input.
768          */
769         case A_PENDING:
770             amandad_debug(1, _("state_machine: %p leaving (A_PENDING)\n"), as);
771             return;
772
773         /*
774          * service has switched states.  Loop.
775          */
776         case A_CONTINUE:
777             break;
778
779         /*
780          * state has determined that the packet it received was bogus.
781          * Send a nak, and return.
782          */
783         case A_SENDNAK:
784             dbprintf(_("received unexpected %s packet\n"),
785                 pkt_type2str(pkt->type));
786             dbprintf(_("<<<<<\n%s----\n\n"), pkt->body);
787             pkt_init(&nak, P_NAK, _("ERROR unexpected packet type %s\n"),
788                 pkt_type2str(pkt->type));
789             do_sendpkt(as->security_handle, &nak);
790             amfree(nak.body);
791             security_recvpkt(as->security_handle, protocol_recv, as, -1);
792             amandad_debug(1, _("state_machine: %p leaving (A_SENDNAK)\n"), as);
793             return;
794
795         /*
796          * Service is done.  Remove it and finish.
797          */
798         case A_FINISH:
799             amandad_debug(1, _("state_machine: %p leaving (A_FINISH)\n"), as);
800             service_delete(as);
801             return;
802
803         default:
804             assert(0);
805             break;
806         }
807     }
808     /*NOTREACHED*/
809 }
810
811 /*
812  * This state just sends an ack.  After that, we move to the repwait
813  * state to wait for REP data to arrive from the subprocess.
814  */
815 static action_t
816 s_sendack(
817     struct active_service *     as,
818     action_t                    action,
819     pkt_t *                     pkt)
820 {
821     pkt_t ack;
822
823     (void)action;       /* Quiet unused parameter warning */
824     (void)pkt;          /* Quiet unused parameter warning */
825
826     pkt_init_empty(&ack, P_ACK);
827     if (do_sendpkt(as->security_handle, &ack) < 0) {
828         dbprintf(_("error sending ACK: %s\n"),
829             security_geterror(as->security_handle));
830         amfree(ack.body);
831         return (A_FINISH);
832     }
833     amfree(ack.body);
834
835     /*
836      * move to the repwait state
837      * Setup a listener for data on the reply fd, but also
838      * listen for packets over the wire, as the server may
839      * poll us if we take a long time.
840      * Setup a timeout that will fire if it takes too long to
841      * receive rep data.
842      */
843     as->state = s_repwait;
844     as->ev_repfd = event_register((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
845     as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
846         timeout_repfd, as);
847     as->errbuf = NULL;
848     as->ev_errfd = event_register((event_id_t)as->errfd, EV_READFD, errfd_recv, as);
849     security_recvpkt(as->security_handle, protocol_recv, as, -1);
850     return (A_PENDING);
851 }
852
853 /*
854  * This is the repwait state.  We have responded to the initial REQ with
855  * an ACK, and we are now waiting for the process we spawned to pass us 
856  * data to send in a REP.
857  */
858 static action_t
859 s_repwait(
860     struct active_service *     as,
861     action_t                    action,
862     pkt_t *                     pkt)
863 {
864     ssize_t   n;
865     char     *repbuf_temp;
866     char     *what;
867     char     *msg;
868     int       code = 0;
869     int       pid;
870     amwait_t  retstat;
871
872     /*
873      * We normally shouldn't receive any packets while waiting
874      * for our REP data, but in some cases we do.
875      */
876     if (action == A_RECVPKT) {
877         assert(pkt != NULL);
878         /*
879          * Another req for something that's running.  Just send an ACK
880          * and go back and wait for more data.
881          */
882         if (pkt->type == P_REQ) {
883             dbprintf(_("received dup P_REQ packet, ACKing it\n"));
884             amfree(as->rep_pkt.body);
885             pkt_init_empty(&as->rep_pkt, P_ACK);
886             do_sendpkt(as->security_handle, &as->rep_pkt);
887             security_recvpkt(as->security_handle, protocol_recv, as, -1);
888             return (A_PENDING);
889         }
890         /* something unexpected.  Nak it */
891         return (A_SENDNAK);
892     }
893
894     if (action == A_TIMEOUT) {
895         amfree(as->rep_pkt.body);
896         pkt_init(&as->rep_pkt, P_NAK, _("ERROR timeout on reply pipe\n"));
897         dbprintf(_("%s timed out waiting for REP data\n"), as->cmd);
898         do_sendpkt(as->security_handle, &as->rep_pkt);
899         return (A_FINISH);
900     }
901
902     assert(action == A_RECVREP);
903     if(as->bufsize == 0) {
904         as->bufsize = NETWORK_BLOCK_BYTES;
905         as->repbuf = alloc(as->bufsize);
906     }
907
908     do {
909         n = read(as->repfd, as->repbuf + as->repbufsize,
910                  as->bufsize - as->repbufsize - 1);
911     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
912     if (n < 0) {
913         const char *errstr = strerror(errno);
914         dbprintf(_("read error on reply pipe: %s\n"), errstr);
915         amfree(as->rep_pkt.body);
916         pkt_init(&as->rep_pkt, P_NAK, _("ERROR read error on reply pipe: %s\n"),
917                  errstr);
918         do_sendpkt(as->security_handle, &as->rep_pkt);
919         return (A_FINISH);
920     }
921
922     /* If end of service, wait for process status */
923     if (n == 0) {
924         pid = waitpid(as->pid, &retstat, WNOHANG);
925         if (as->service  == SERVICE_NOOP ||
926             as->service  == SERVICE_SENDSIZE ||
927             as->service  == SERVICE_SELFCHECK) {
928             int t = 0;
929             while (t<5 && pid == 0) {
930                 sleep(1);
931                 t++;
932                 pid = waitpid(as->pid, &retstat, WNOHANG);
933             }
934         }
935
936         process_errfd(as);
937
938         if (pid == 0)
939             pid = waitpid(as->pid, &retstat, WNOHANG);
940
941         if (pid > 0) {
942             what = NULL;
943             if (! WIFEXITED(retstat)) {
944                 what = _("signal");
945                 code = WTERMSIG(retstat);
946             } else if (WEXITSTATUS(retstat) != 0) {
947                 what = _("code");
948                 code = WEXITSTATUS(retstat);
949             }
950             if (what) {
951                 dbprintf(_("service %s failed: pid %u exited with %s %d\n"),
952                          (as->cmd)?as->cmd:_("??UNKONWN??"),
953                          (unsigned)as->pid,
954                          what, code);
955                 msg = vstrallocf(
956                      _("ERROR service %s failed: pid %u exited with %s %d\n"),
957                      (as->cmd)?as->cmd:_("??UNKONWN??"), (unsigned)as->pid,
958                      what, code);
959                 if (as->repbufsize + strlen(msg) >= (as->bufsize - 1)) {
960                         as->bufsize *= 2;
961                         repbuf_temp = alloc(as->bufsize);
962                         memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
963                         amfree(as->repbuf);
964                         as->repbuf = repbuf_temp;
965                 }
966                 strcpy(as->repbuf + as->repbufsize, msg);
967                 as->repbufsize += strlen(msg);
968                 amfree(msg);
969             }
970         }
971     }
972
973     /*
974      * If we got some data, go back and wait for more, or EOF.  Nul terminate
975      * the buffer first.
976      */
977     as->repbuf[n + as->repbufsize] = '\0';
978     if (n > 0) {
979         as->repbufsize += n;
980         if(as->repbufsize >= (as->bufsize - 1)) {
981             as->bufsize *= 2;
982             repbuf_temp = alloc(as->bufsize);
983             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
984             amfree(as->repbuf);
985             as->repbuf = repbuf_temp;
986         }
987         else if(as->send_partial_reply) {
988             amfree(as->rep_pkt.body);
989             pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
990             do_sendpkt(as->security_handle, &as->rep_pkt);
991             amfree(as->rep_pkt.body);
992             pkt_init_empty(&as->rep_pkt, P_REP);
993         }
994  
995         return (A_PENDING);
996     }
997
998     /*
999      * If we got 0, then we hit EOF.  Process the data and release
1000      * the timeout.
1001      */
1002     assert(n == 0);
1003
1004     assert(as->ev_repfd != NULL);
1005     event_release(as->ev_repfd);
1006     as->ev_repfd = NULL;
1007
1008     assert(as->ev_reptimeout != NULL);
1009     event_release(as->ev_reptimeout);
1010     as->ev_reptimeout = NULL;
1011
1012     as->state = s_processrep;
1013     aclose(as->repfd);
1014     return (A_CONTINUE);
1015 }
1016
1017 /*
1018  * After we have read in all of the rep data, we process it and send
1019  * it out as a REP packet.
1020  */
1021 static action_t
1022 s_processrep(
1023     struct active_service *     as,
1024     action_t                    action,
1025     pkt_t *                     pkt)
1026 {
1027     char *tok, *repbuf;
1028
1029     (void)action;       /* Quiet unused parameter warning */
1030     (void)pkt;          /* Quiet unused parameter warning */
1031
1032     /*
1033      * Copy the rep lines into the outgoing packet.
1034      *
1035      * If this line is a CONNECT, translate it
1036      * Format is "CONNECT <tag> <handle> <tag> <handle> etc...
1037      * Example:
1038      *
1039      *  CONNECT DATA 4 MESG 5 INDEX 6
1040      *
1041      * The tags are arbitrary.  The handles are in the DATA_FD pool.
1042      * We need to map these to security streams and pass them back
1043      * to the amanda server.  If the handle is -1, then we don't map.
1044      */
1045     if (strncmp_const(as->repbuf,"KENCRYPT\n") == 0) {
1046         amandad_kencrypt = KENCRYPT_WILL_DO;
1047         repbuf = stralloc(as->repbuf + 9);
1048     } else {
1049         repbuf = stralloc(as->repbuf);
1050     }
1051     amfree(as->rep_pkt.body);
1052     pkt_init_empty(&as->rep_pkt, P_REP);
1053     tok = strtok(repbuf, " ");
1054     if (tok == NULL)
1055         goto error;
1056     if (strcmp(tok, "CONNECT") == 0) {
1057         char *line, *nextbuf;
1058
1059         /* Save the entire line */
1060         line = strtok(NULL, "\n");
1061         /* Save the buf following the line */
1062         nextbuf = strtok(NULL, "");
1063
1064         if (line == NULL || nextbuf == NULL)
1065             goto error;
1066
1067         pkt_cat(&as->rep_pkt, "CONNECT");
1068
1069         /* loop over the id/handle pairs */
1070         for (;;) {
1071             /* id */
1072             tok = strtok(line, " ");
1073             line = NULL;        /* keep working from line */
1074             if (tok == NULL)
1075                 break;
1076             pkt_cat(&as->rep_pkt, " %s", tok);
1077
1078             /* handle */
1079             tok = strtok(NULL, " \n");
1080             if (tok == NULL)
1081                 goto error;
1082             /* convert the handle into something the server can process */
1083             pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
1084         }
1085         pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
1086     } else {
1087 error:
1088         pkt_cat(&as->rep_pkt, "%s", as->repbuf);
1089     }
1090
1091     /*
1092      * We've setup our REP packet in as->rep_pkt.  Now move to the transmission
1093      * state.
1094      */
1095     as->state = s_sendrep;
1096     as->repretry = getconf_int(CNF_REP_TRIES);
1097     amfree(repbuf);
1098     return (A_CONTINUE);
1099 }
1100
1101 /*
1102  * This is the state where we send the REP we just collected from our child.
1103  */
1104 static action_t
1105 s_sendrep(
1106     struct active_service *     as,
1107     action_t                    action,
1108     pkt_t *                     pkt)
1109 {
1110     (void)action;       /* Quiet unused parameter warning */
1111     (void)pkt;          /* Quiet unused parameter warning */
1112
1113     /*
1114      * Transmit it and move to the ack state.
1115      */
1116     do_sendpkt(as->security_handle, &as->rep_pkt);
1117     security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
1118     as->state = s_ackwait;
1119     return (A_PENDING);
1120 }
1121
1122 /*
1123  * This is the state in which we wait for the server to ACK the REP
1124  * we just sent it.
1125  */
1126 static action_t
1127 s_ackwait(
1128     struct active_service *     as,
1129     action_t                    action,
1130     pkt_t *                     pkt)
1131 {
1132     struct datafd_handle *dh;
1133     int npipes;
1134
1135     /*
1136      * If we got a timeout, try again, but eventually give up.
1137      */
1138     if (action == A_TIMEOUT) {
1139         if (--as->repretry > 0) {
1140             as->state = s_sendrep;
1141             return (A_CONTINUE);
1142         }
1143         dbprintf(_("timeout waiting for ACK for our REP\n"));
1144         return (A_FINISH);
1145     }
1146     amandad_debug(1, _("received ACK, now opening streams\n"));
1147
1148     assert(action == A_RECVPKT);
1149
1150     if (pkt->type == P_REQ) {
1151         dbprintf(_("received dup P_REQ packet, resending REP\n"));
1152         as->state = s_sendrep;
1153         return (A_CONTINUE);
1154     }
1155
1156     if (pkt->type != P_ACK)
1157         return (A_SENDNAK);
1158
1159     if (amandad_kencrypt == KENCRYPT_WILL_DO) {
1160         amandad_kencrypt = KENCRYPT_YES;
1161     }
1162
1163     /*
1164      * Got the ack, now open the pipes
1165      */
1166     for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1167         if (dh->netfd == NULL)
1168             continue;
1169         dbprintf("opening security stream for fd %d\n", (int)(dh - as->data) + DATA_FD_OFFSET);
1170         if (security_stream_accept(dh->netfd) < 0) {
1171             dbprintf(_("stream %td accept failed: %s\n"),
1172                 dh - &as->data[0], security_geterror(as->security_handle));
1173             security_stream_close(dh->netfd);
1174             dh->netfd = NULL;
1175             continue;
1176         }
1177
1178         /* setup an event for reads from it.  As a special case, don't start
1179          * listening on as->data[0] until we read some data on another fd, if
1180          * the service is sendbackup.  This ensures that we send a MESG or 
1181          * INDEX token before any DATA tokens, as dumper assumes. This is a
1182          * hack, if that wasn't already obvious! */
1183         if (dh != &as->data[0] || as->service != SERVICE_SENDBACKUP) {
1184             dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1185                                          process_readnetfd, dh);
1186         } else {
1187             amandad_debug(1, "Skipping registration of sendbackup's data FD\n");
1188         }
1189
1190         security_stream_read(dh->netfd, process_writenetfd, dh);
1191
1192     }
1193
1194     /*
1195      * Pipes are open, so auth them.  Count them at the same time.
1196      */
1197     for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1198         if (dh->netfd == NULL)
1199             continue;
1200         if (security_stream_auth(dh->netfd) < 0) {
1201             security_stream_close(dh->netfd);
1202             dh->netfd = NULL;
1203             event_release(dh->ev_read);
1204             event_release(dh->ev_write);
1205             dh->ev_read = NULL;
1206             dh->ev_write = NULL;
1207         } else {
1208             npipes++;
1209         }
1210     }
1211
1212     /*
1213      * If no pipes are open, then we're done.  Otherwise, just start running.
1214      * The event handlers on all of the pipes will take it from here.
1215      */
1216     amandad_debug(1, _("at end of s_ackwait, npipes is %d\n"), npipes);
1217     if (npipes == 0)
1218         return (A_FINISH);
1219     else {
1220         security_close(as->security_handle);
1221         as->security_handle = NULL;
1222         return (A_PENDING);
1223     }
1224 }
1225
1226 /*
1227  * Called when a repfd has received data
1228  */
1229 static void
1230 repfd_recv(
1231     void *      cookie)
1232 {
1233     struct active_service *as = cookie;
1234
1235     assert(as != NULL);
1236     assert(as->ev_repfd != NULL);
1237
1238     state_machine(as, A_RECVREP, NULL);
1239 }
1240
1241 static void
1242 process_errfd(
1243     void *cookie)
1244 {
1245     struct active_service *as = cookie;
1246
1247     /* Process errfd before sending the REP packet */
1248     if (as->ev_errfd) {
1249         SELECT_ARG_TYPE readset;
1250         struct timeval  tv;
1251         int             nfound;
1252
1253         memset(&tv, 0, SIZEOF(tv));
1254         FD_ZERO(&readset);
1255         FD_SET(as->errfd, &readset);
1256         nfound = select(as->errfd+1, &readset, NULL, NULL, &tv);
1257         if (nfound && FD_ISSET(as->errfd, &readset)) {
1258             errfd_recv(as);
1259         }
1260     }
1261 }
1262
1263 /*
1264  * Called when a errfd has received data
1265  */
1266 static void
1267 errfd_recv(
1268     void *      cookie)
1269 {
1270     struct active_service *as = cookie;
1271     char  buf[32769];
1272     int   n;
1273     char *r;
1274
1275     assert(as != NULL);
1276     assert(as->ev_errfd != NULL);
1277
1278     n = read(as->errfd, &buf, 32768);
1279     /* merge buffer */
1280     if (n > 0) {
1281         /* Terminate it with '\0' */
1282         buf[n+1] = '\0';
1283
1284         if (as->errbuf) {
1285             as->errbuf = vstrextend(&as->errbuf, buf, NULL);
1286         } else {
1287             as->errbuf = stralloc(buf);
1288         }
1289     } else if (n == 0) {
1290         event_release(as->ev_errfd);
1291         as->ev_errfd = NULL;
1292     } else { /* n < 0 */
1293         event_release(as->ev_errfd);
1294         as->ev_errfd = NULL;
1295         g_snprintf(buf, 32768,
1296                    "error reading stderr or service: %s\n", strerror(errno));
1297     }
1298
1299     /* for each line terminate by '\n' */
1300     while (as->errbuf != NULL  && (r = strchr(as->errbuf, '\n')) != NULL) {
1301         char *s;
1302
1303         *r = '\0';
1304         s = vstrallocf("ERROR service %s: %s\n",
1305                        services[as->service].name, as->errbuf);
1306
1307         /* Add to repbuf, error message will be in the REP packet if it
1308          * is not already sent
1309          */
1310         n = strlen(s);
1311         if (as->bufsize == 0) {
1312             as->bufsize = NETWORK_BLOCK_BYTES;
1313             as->repbuf = alloc(as->bufsize);
1314         }
1315         while (as->bufsize < as->repbufsize + n) {
1316             char *repbuf_temp;
1317             as->bufsize *= 2;
1318             repbuf_temp = alloc(as->bufsize);
1319             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
1320             amfree(as->repbuf);
1321             as->repbuf = repbuf_temp;
1322         }
1323         memcpy(as->repbuf + as->repbufsize, s, n);
1324         as->repbufsize += n;
1325
1326         dbprintf("%s", s);
1327
1328         /* remove first line from buffer */
1329         r++;
1330         s = stralloc(r);
1331         amfree(as->errbuf);
1332         as->errbuf = s;
1333     }
1334 }
1335
1336 /*
1337  * Called when a repfd has timed out
1338  */
1339 static void
1340 timeout_repfd(
1341     void *      cookie)
1342 {
1343     struct active_service *as = cookie;
1344
1345     assert(as != NULL);
1346     assert(as->ev_reptimeout != NULL);
1347
1348     state_machine(as, A_TIMEOUT, NULL);
1349 }
1350
1351 /*
1352  * Called when a handle has received data
1353  */
1354 static void
1355 protocol_recv(
1356     void *              cookie,
1357     pkt_t *             pkt,
1358     security_status_t   status)
1359 {
1360     struct active_service *as = cookie;
1361
1362     assert(as != NULL);
1363
1364     switch (status) {
1365     case S_OK:
1366         dbprintf(_("received %s pkt:\n<<<<<\n%s>>>>>\n"),
1367             pkt_type2str(pkt->type), pkt->body);
1368         state_machine(as, A_RECVPKT, pkt);
1369         break;
1370     case S_TIMEOUT:
1371         dbprintf(_("timeout\n"));
1372         state_machine(as, A_TIMEOUT, NULL);
1373         break;
1374     case S_ERROR:
1375         dbprintf(_("receive error: %s\n"),
1376             security_geterror(as->security_handle));
1377         break;
1378     }
1379 }
1380
1381 /*
1382  * This is a generic relay function that just reads data from one of
1383  * the process's pipes and passes it up the equivalent security_stream_t
1384  */
1385 static void
1386 process_readnetfd(
1387     void *      cookie)
1388 {
1389     pkt_t nak;
1390     struct datafd_handle *dh = cookie;
1391     struct active_service *as = dh->as;
1392     ssize_t n;
1393
1394     nak.body = NULL;
1395
1396     do {
1397         n = read(dh->fd_read, as->databuf, SIZEOF(as->databuf));
1398     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
1399
1400     /*
1401      * Process has died.
1402      */
1403     if (n < 0) {
1404         pkt_init(&nak, P_NAK, _("A ERROR data descriptor %d broken: %s\n"),
1405             dh->fd_read, strerror(errno));
1406         goto sendnak;
1407     }
1408     /*
1409      * Process has closed the pipe.  Just remove this event handler.
1410      * If all pipes are closed, shut down this service.
1411      */
1412     if (n == 0) {
1413         event_release(dh->ev_read);
1414         dh->ev_read = NULL;
1415         if(dh->ev_write == NULL) {
1416             security_stream_close(dh->netfd);
1417             dh->netfd = NULL;
1418         }
1419         for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1420             if (dh->netfd != NULL)
1421                 return;
1422         }
1423         service_delete(as);
1424         return;
1425     }
1426
1427     /* Handle the special case of recognizing "sendbackup info end"
1428      * from sendbackup's MESG fd */
1429     if (as->service == SERVICE_SENDBACKUP && !as->seen_info_end && dh == &as->data[1]) {
1430         /* make a buffer containing the combined data from info_end_buf
1431          * and what we've read this time, and search it for info_end_strj
1432          * This includes a NULL byte for strstr's sanity. */
1433         char *combined_buf = malloc(INFO_END_LEN + n + 1);
1434         memcpy(combined_buf, as->info_end_buf, INFO_END_LEN);
1435         memcpy(combined_buf+INFO_END_LEN, as->databuf, n);
1436         combined_buf[INFO_END_LEN+n] = '\0';
1437
1438         as->seen_info_end = (strstr(combined_buf, info_end_str) != NULL);
1439
1440         /* fill info_end_buf from the tail end of combined_buf */
1441         memcpy(as->info_end_buf, combined_buf + n, INFO_END_LEN);
1442         amfree(combined_buf);
1443
1444         /* if we did see info_end_str, start reading the data fd (fd 0) */
1445         if (as->seen_info_end) {
1446             struct datafd_handle *dh = &as->data[0];
1447             amandad_debug(1, "Opening datafd to sendbackup (delayed until sendbackup sent header info)\n");
1448             dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1449                                          process_readnetfd, dh);
1450         } else {
1451             amandad_debug(1, "sendbackup header info still not complete\n");
1452         }
1453     }
1454
1455     if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
1456         /* stream has croaked */
1457         pkt_init(&nak, P_NAK, _("ERROR write error on stream %d: %s\n"),
1458             security_stream_id(dh->netfd),
1459             security_stream_geterror(dh->netfd));
1460         goto sendnak;
1461     }
1462     return;
1463
1464 sendnak:
1465     do_sendpkt(as->security_handle, &nak);
1466     service_delete(as);
1467     amfree(nak.body);
1468 }
1469
1470 /*
1471  * This is a generic relay function that just read data from one of
1472  * the security_stream_t and passes it up the equivalent process's pipes
1473  */
1474 static void
1475 process_writenetfd(
1476     void *      cookie,
1477     void *      buf,
1478     ssize_t     size)
1479 {
1480     struct datafd_handle *dh;
1481
1482     assert(cookie != NULL);
1483     dh = cookie;
1484
1485     if (dh->fd_write <= 0) {
1486         dbprintf(_("process_writenetfd: dh->fd_write <= 0\n"));
1487     } else if (size > 0) {
1488         full_write(dh->fd_write, buf, (size_t)size);
1489     } else {
1490         aclose(dh->fd_write);
1491     }
1492 }
1493
1494
1495 /*
1496  * Convert a local stream handle (DATA_FD...) into something that
1497  * can be sent to the amanda server.
1498  *
1499  * Returns a number that should be sent to the server in the REP packet.
1500  */
1501 static int
1502 allocstream(
1503     struct active_service *     as,
1504     int                         handle)
1505 {
1506     struct datafd_handle *dh;
1507
1508     /* note that handle is in the range DATA_FD_OFFSET to DATA_FD_COUNT, but
1509      * it is NOT a file descriptor! */
1510
1511     /* if the handle is -1, then we don't bother */
1512     if (handle < 0)
1513         return (-1);
1514
1515     /* make sure the handle's kosher */
1516     if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
1517         return (-1);
1518
1519     /* get a pointer into our handle array */
1520     dh = &as->data[handle - DATA_FD_OFFSET];
1521
1522     /* make sure we're not already using the net handle */
1523     if (dh->netfd != NULL)
1524         return (-1);
1525
1526     /* allocate a stream from the security layer and return */
1527     dh->netfd = security_stream_server(as->security_handle);
1528     if (dh->netfd == NULL) {
1529         dbprintf(_("couldn't open stream to server: %s\n"),
1530             security_geterror(as->security_handle));
1531         return (-1);
1532     }
1533
1534     /*
1535      * convert the stream into a numeric id that can be sent to the
1536      * remote end.
1537      */
1538     return (security_stream_id(dh->netfd));
1539 }
1540
1541 /*
1542  * Create a new service instance
1543  */
1544 static struct active_service *
1545 service_new(
1546     security_handle_t * security_handle,
1547     const char *        cmd,
1548     service_t           service,
1549     const char *        arguments)
1550 {
1551     int i;
1552     int data_read[DATA_FD_COUNT + 2][2];
1553     int data_write[DATA_FD_COUNT + 2][2];
1554     struct active_service *as;
1555     pid_t pid;
1556     int newfd;
1557     char *peer_name;
1558     char *amanda_remote_host_env[2];
1559
1560     assert(security_handle != NULL);
1561     assert(cmd != NULL);
1562     assert(arguments != NULL);
1563
1564     /* a plethora of pipes */
1565     /* data_read[0]                : stdin
1566      * data_write[0]               : stdout
1567      * data_read[1], data_write[1] : first  stream
1568      * data_read[2], data_write[2] : second stream
1569      * data_read[3], data_write[3] : third stream
1570      * data_write[4]               : stderr
1571      */
1572     for (i = 0; i < DATA_FD_COUNT + 1; i++) {
1573         if (pipe(data_read[i]) < 0) {
1574             error(_("pipe: %s\n"), strerror(errno));
1575             /*NOTREACHED*/
1576         }
1577         if (pipe(data_write[i]) < 0) {
1578             error(_("pipe: %s\n"), strerror(errno));
1579             /*NOTREACHED*/
1580         }
1581     }
1582     if (pipe(data_write[STDERR_PIPE]) < 0) {
1583         error(_("pipe: %s\n"), strerror(errno));
1584         /*NOTREACHED*/
1585     }
1586
1587     switch(pid = fork()) {
1588     case -1:
1589         error(_("could not fork service %s: %s\n"), cmd, strerror(errno));
1590         /*NOTREACHED*/
1591     default:
1592         /*
1593          * The parent.  Close the far ends of our pipes and return.
1594          */
1595         as = g_new0(struct active_service, 1);
1596         as->cmd = stralloc(cmd);
1597         as->arguments = stralloc(arguments);
1598         as->security_handle = security_handle;
1599         as->state = NULL;
1600         as->service = service;
1601         as->pid = pid;
1602         as->send_partial_reply = 0;
1603         as->seen_info_end = FALSE;
1604         /* fill in info_end_buf with non-null characters */
1605         memset(as->info_end_buf, '-', sizeof(as->info_end_buf));
1606         if(service == SERVICE_SENDSIZE) {
1607             g_option_t *g_options;
1608             char *option_str, *p;
1609
1610             option_str = stralloc(as->arguments+8);
1611             p = strchr(option_str,'\n');
1612             if(p) *p = '\0';
1613
1614             g_options = parse_g_options(option_str, 1);
1615             if(am_has_feature(g_options->features, fe_partial_estimate)) {
1616                 as->send_partial_reply = 1;
1617             }
1618             free_g_options(g_options);
1619             amfree(option_str);
1620         }
1621
1622         /* write to the request pipe */
1623         aclose(data_read[0][0]);
1624         as->reqfd = data_read[0][1];
1625
1626         /*
1627          * read from the reply pipe
1628          */
1629         as->repfd = data_write[0][0];
1630         aclose(data_write[0][1]);
1631         as->ev_repfd = NULL;
1632         as->repbuf = NULL;
1633         as->repbufsize = 0;
1634         as->bufsize = 0;
1635         as->repretry = 0;
1636         as->rep_pkt.body = NULL;
1637
1638         /*
1639          * read from the stderr pipe
1640          */
1641         as->errfd = data_write[STDERR_PIPE][0];
1642         aclose(data_write[STDERR_PIPE][1]);
1643         as->ev_errfd = NULL;
1644         as->errbuf = NULL;
1645
1646         /*
1647          * read from the rest of the general-use pipes
1648          * (netfds are opened as the client requests them)
1649          */
1650         for (i = 0; i < DATA_FD_COUNT; i++) {
1651             aclose(data_read[i + 1][1]);
1652             aclose(data_write[i + 1][0]);
1653             as->data[i].fd_read = data_read[i + 1][0];
1654             as->data[i].fd_write = data_write[i + 1][1];
1655             as->data[i].ev_read = NULL;
1656             as->data[i].ev_write = NULL;
1657             as->data[i].netfd = NULL;
1658             as->data[i].as = as;
1659         }
1660
1661         /* add it to the service queue */
1662         /* increment the active service count */
1663         serviceq = g_slist_append(serviceq, (gpointer)as);
1664
1665         return (as);
1666     case 0:
1667         /*
1668          * The child.  Put our pipes in their advertised locations
1669          * and start up.
1670          */
1671
1672         /* set up the AMANDA_AUTHENTICATED_PEER env var so child services
1673          * can use it to authenticate */
1674         peer_name = security_get_authenticated_peer_name(security_handle);
1675         amanda_remote_host_env[0] = NULL;
1676         amanda_remote_host_env[1] = NULL;
1677         if (*peer_name) {
1678             amanda_remote_host_env[0] =
1679                 g_strdup_printf("AMANDA_AUTHENTICATED_PEER=%s", peer_name);
1680         }
1681
1682         /*
1683          * The data stream is stdin in the new process
1684          */
1685         if (dup2(data_read[0][0], 0) < 0) {
1686             error(_("dup %d to %d failed: %s\n"), data_read[0][0], 0,
1687                 strerror(errno));
1688             /*NOTREACHED*/
1689         }
1690         aclose(data_read[0][0]);
1691         aclose(data_read[0][1]);
1692
1693         /*
1694          * The reply stream is stdout
1695          */
1696         if (dup2(data_write[0][1], 1) < 0) {
1697             error(_("dup %d to %d failed: %s\n"), data_write[0][1], 1,
1698                 strerror(errno));
1699         }
1700         aclose(data_write[0][0]);
1701         aclose(data_write[0][1]);
1702
1703         for (i = 0; i < DATA_FD_COUNT; i++) {
1704             aclose(data_read[i + 1][0]);
1705             aclose(data_write[i + 1][1]);
1706         }
1707
1708         /*
1709          *  Make sure they are not open in the range DATA_FD_OFFSET to
1710          *      DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1
1711          */
1712         for (i = 0; i < DATA_FD_COUNT; i++) {
1713             while(data_read[i + 1][1] >= DATA_FD_OFFSET &&
1714                   data_read[i + 1][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1715                 newfd = dup(data_read[i + 1][1]);
1716                 if(newfd == -1)
1717                     error(_("Can't dup out off DATA_FD range"));
1718                 data_read[i + 1][1] = newfd;
1719             }
1720             while(data_write[i + 1][0] >= DATA_FD_OFFSET &&
1721                   data_write[i + 1][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1722                 newfd = dup(data_write[i + 1][0]);
1723                 if(newfd == -1)
1724                     error(_("Can't dup out off DATA_FD range"));
1725                 data_write[i + 1][0] = newfd;
1726             }
1727         }
1728         while(data_write[4][0] >= DATA_FD_OFFSET &&
1729               data_write[4][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1730             newfd = dup(data_write[4][0]);
1731             if (newfd == -1)
1732                 error(_("Can't dup out off DATA_FD range"));
1733             data_write[4][0] = newfd;
1734         }
1735         while(data_write[4][1] >= DATA_FD_OFFSET &&
1736               data_write[4][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1737             newfd = dup(data_write[4][1]);
1738             if (newfd == -1)
1739                 error(_("Can't dup out off DATA_FD range"));
1740             data_write[4][1] = newfd;
1741         }
1742
1743         for (i = 0; i < DATA_FD_COUNT*2; i++)
1744             close(DATA_FD_OFFSET + i);
1745
1746         /*
1747          * The rest start at the offset defined in amandad.h, and continue
1748          * through the internal defined.
1749          */
1750         for (i = 0; i < DATA_FD_COUNT; i++) {
1751             if (dup2(data_read[i + 1][1], i*2 + DATA_FD_OFFSET) < 0) {
1752                 error(_("dup %d to %d failed: %s\n"), data_read[i + 1][1],
1753                     i + DATA_FD_OFFSET, strerror(errno));
1754             }
1755             aclose(data_read[i + 1][1]);
1756
1757             if (dup2(data_write[i + 1][0], i*2 + 1 + DATA_FD_OFFSET) < 0) {
1758                 error(_("dup %d to %d failed: %s\n"), data_write[i + 1][0],
1759                     i + DATA_FD_OFFSET, strerror(errno));
1760             }
1761             aclose(data_write[i + 1][0]);
1762         }
1763
1764         /* close all unneeded fd */
1765         close(STDERR_FILENO);
1766         dup2(data_write[STDERR_PIPE][1], 2);
1767         aclose(data_write[STDERR_PIPE][0]);
1768         aclose(data_write[STDERR_PIPE][1]);
1769         safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
1770
1771         execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env_full(amanda_remote_host_env));
1772         error(_("could not exec service %s: %s\n"), cmd, strerror(errno));
1773         /*NOTREACHED*/
1774     }
1775     return NULL;
1776 }
1777
1778 /*
1779  * Unallocate a service instance
1780  */
1781 static void
1782 service_delete(
1783     struct active_service *     as)
1784 {
1785     int i;
1786     int   count;
1787     pid_t pid;
1788     struct datafd_handle *dh;
1789
1790     amandad_debug(1, _("closing service: %s\n"),
1791                       (as->cmd)?as->cmd:_("??UNKONWN??"));
1792
1793     assert(as != NULL);
1794
1795     assert(as->cmd != NULL);
1796     amfree(as->cmd);
1797
1798     assert(as->arguments != NULL);
1799     amfree(as->arguments);
1800
1801     if (as->reqfd != -1)
1802         aclose(as->reqfd);
1803     if (as->repfd != -1)
1804         aclose(as->repfd);
1805     if (as->errfd != -1) {
1806         process_errfd(as);
1807         aclose(as->errfd);
1808     }
1809
1810     if (as->ev_repfd != NULL)
1811         event_release(as->ev_repfd);
1812     if (as->ev_reptimeout != NULL)
1813         event_release(as->ev_reptimeout);
1814     if (as->ev_errfd != NULL)
1815         event_release(as->ev_errfd);
1816
1817     for (i = 0; i < DATA_FD_COUNT; i++) {
1818         dh = &as->data[i];
1819
1820         aclose(dh->fd_read);
1821         aclose(dh->fd_write);
1822
1823         if (dh->netfd != NULL)
1824             security_stream_close(dh->netfd);
1825
1826         if (dh->ev_read != NULL)
1827             event_release(dh->ev_read);
1828         if (dh->ev_write != NULL)
1829             event_release(dh->ev_write);
1830     }
1831
1832     if (as->security_handle != NULL)
1833         security_close(as->security_handle);
1834
1835     /* try to kill the process; if this fails, then it's already dead and
1836      * likely some of the other zombie cleanup ate its brains, so we don't
1837      * bother to waitpid for it */
1838     assert(as->pid > 0);
1839     pid = waitpid(as->pid, NULL, WNOHANG);
1840     if (pid != as->pid && kill(as->pid, SIGTERM) == 0) {
1841         pid = waitpid(as->pid, NULL, WNOHANG);
1842         count = 5;
1843         while (pid != as->pid && count > 0) {
1844             count--;
1845             sleep(1);
1846             pid = waitpid(as->pid, NULL, WNOHANG);
1847         }
1848         if (pid != as->pid) {
1849             g_debug("Process %d failed to exit", (int)as->pid);
1850         }
1851     }
1852
1853     serviceq = g_slist_remove(serviceq, (gpointer)as);
1854
1855     amfree(as->cmd);
1856     amfree(as->arguments);
1857     amfree(as->repbuf);
1858     amfree(as->rep_pkt.body);
1859     amfree(as);
1860
1861     if(exit_on_qlength == 0 && g_slist_length(serviceq) == 0) {
1862         dbclose();
1863         exit(0);
1864     }
1865 }
1866
1867 /*
1868  * Like 'fullwrite', but does the work in a child process so pipelines
1869  * do not hang.
1870  */
1871 static int
1872 writebuf(
1873     struct active_service *     as,
1874     const void *                bufp,
1875     size_t                      size)
1876 {
1877     pid_t pid;
1878     size_t    writesize;
1879
1880     switch (pid=fork()) {
1881     case -1:
1882         break;
1883
1884     default:
1885         waitpid(pid, NULL, WNOHANG);
1886         return 0;                       /* this is the parent */
1887
1888     case 0:                             /* this is the child */
1889         close(as->repfd);
1890         writesize = full_write(as->reqfd, bufp, size);
1891         exit(writesize != size);
1892         /* NOTREACHED */
1893     }
1894     return -1;
1895 }
1896
1897 static ssize_t
1898 do_sendpkt(
1899     security_handle_t * handle,
1900     pkt_t *             pkt)
1901 {
1902     dbprintf(_("sending %s pkt:\n<<<<<\n%s>>>>>\n"),
1903         pkt_type2str(pkt->type), pkt->body);
1904     if (handle)
1905         return security_sendpkt(handle, pkt);
1906     else
1907         return 1;
1908 }
1909
1910 /*
1911  * Convert a state into a string
1912  */
1913 static const char *
1914 state2str(
1915     state_t     state)
1916 {
1917     static const struct {
1918         state_t state;
1919         const char str[13];
1920     } states[] = {
1921 #define X(state)        { state, stringize(state) }
1922         X(s_sendack),
1923         X(s_repwait),
1924         X(s_processrep),
1925         X(s_sendrep),
1926         X(s_ackwait),
1927 #undef X
1928     };
1929     int i;
1930
1931     for (i = 0; i < (int)(sizeof(states) / sizeof(states[0])); i++)
1932         if (state == states[i].state)
1933             return (states[i].str);
1934     return (_("INVALID STATE"));
1935 }
1936
1937 /*
1938  * Convert an action into a string
1939  */
1940 static const char *
1941 action2str(
1942     action_t    action)
1943 {
1944     static const struct {
1945         action_t action;
1946         const char str[12];
1947     } actions[] = {
1948 #define X(action)       { action, stringize(action) }
1949         X(A_START),
1950         X(A_RECVPKT),
1951         X(A_RECVREP),
1952         X(A_PENDING),
1953         X(A_FINISH),
1954         X(A_CONTINUE),
1955         X(A_SENDNAK),
1956         X(A_TIMEOUT),
1957 #undef X
1958     };
1959     int i;
1960
1961     for (i = 0; i < (int)(sizeof(actions) / sizeof(actions[0])); i++)
1962         if (action == actions[i].action)
1963             return (actions[i].str);
1964     return (_("UNKNOWN ACTION"));
1965 }
1966
1967 static char *
1968 amandad_get_security_conf(
1969     char *      string,
1970     void *      arg)
1971 {
1972     (void)arg;      /* Quiet unused parameter warning */
1973
1974     if (!string || !*string)
1975         return(NULL);
1976
1977     if (strcmp(string, "kencrypt")==0) {
1978         if (amandad_kencrypt == KENCRYPT_YES)
1979             return ("yes");
1980         else
1981             return (NULL);
1982     }
1983     return(NULL);
1984 }
1985