Imported Upstream version 3.3.2
[debian/amanda] / amandad-src / amandad.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1999 University of Maryland at College Park
4  * All Rights Reserved.
5  *
6  * Permission to use, copy, modify, distribute, and sell this software and its
7  * documentation for any purpose is hereby granted without fee, provided that
8  * the above copyright notice appear in all copies and that both that
9  * copyright notice and this permission notice appear in supporting
10  * documentation, and that the name of U.M. not be used in advertising or
11  * publicity pertaining to distribution of the software without specific,
12  * written prior permission.  U.M. makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as is"
14  * without express or implied warranty.
15  *
16  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22  *
23  * Authors: the Amanda Development Team.  Its members are listed in a
24  * file named AUTHORS, in the root directory of this distribution.
25  */
26
27 /*
28  * $Id: amandad.c,v 1.18 2006/08/21 20:17:09 martinea Exp $
29  *
30  * handle client-host side of Amanda network communications, including
31  * security checks, execution of the proper service, and acking the
32  * master side
33  */
34
35 #include "amanda.h"
36 #include "amandad.h"
37 #include "clock.h"
38 #include "event.h"
39 #include "amfeatures.h"
40 #include "packet.h"
41 #include "version.h"
42 #include "security.h"
43 #include "stream.h"
44 #include "util.h"
45 #include "conffile.h"
46
47 #define REP_TIMEOUT     (6*60*60)       /* secs for service to reply */
48 #define ACK_TIMEOUT     10              /* XXX should be configurable */
49 #define STDERR_PIPE (DATA_FD_COUNT + 1)
50
51 #define amandad_debug(i, ...) do {      \
52         if ((i) <= debug_amandad) {     \
53                 dbprintf(__VA_ARGS__);  \
54         }                               \
55 } while (0)
56
57 /*
58  * These are the actions for entering the state machine
59  */
60 typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
61     A_SENDNAK, A_TIMEOUT } action_t;
62
63 /*
64  * This is a state in the state machine.  It is a function pointer to
65  * the function that actually implements the state.
66  */
67 struct active_service;
68 typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
69
70 /* string that we scan for in sendbackup's MESG stream */
71 static const char info_end_str[] = "sendbackup: info end\n";
72 #define INFO_END_LEN (sizeof(info_end_str)-1)
73
74 /* 
75  * Here are the services that we allow.
76  * Must be in the same order as services[].
77  */
78 typedef enum {
79     SERVICE_NOOP,
80     SERVICE_SENDSIZE,
81     SERVICE_SENDBACKUP,
82     SERVICE_SELFCHECK,
83     SERVICE_AMINDEXD,
84     SERVICE_AMIDXTAPED,
85     SERVICE_AMDUMPD
86 } service_t;
87
88 static struct services {
89     char *name;
90     int  active;
91     service_t service;
92 } services[] = {
93    { "noop", 1, SERVICE_NOOP },
94    { "sendsize", 1, SERVICE_SENDSIZE },
95    { "sendbackup", 1, SERVICE_SENDBACKUP },
96    { "selfcheck", 1, SERVICE_SELFCHECK },
97    { "amindexd", 0, SERVICE_AMINDEXD },
98    { "amidxtaped", 0, SERVICE_AMIDXTAPED },
99    { "amdumpd", 0, SERVICE_AMDUMPD }
100 };
101 #define NSERVICES       (int)(sizeof(services) / sizeof(services[0]))
102
103 /*
104  * This structure describes an active running service.
105  *
106  * An active service is something running that we have received
107  * a request for.  This structure holds info on that service, including
108  * file descriptors for data, etc, as well as the security handle
109  * for communications with the amanda server.
110  */
111 struct active_service {
112     service_t service;                  /* service name */
113     char *cmd;                          /* name of command we ran */
114     char *arguments;                    /* arguments we sent it */
115     security_handle_t *security_handle; /* remote server */
116     state_t state;                      /* how far this has progressed */
117     pid_t pid;                          /* pid of subprocess */
118     int send_partial_reply;             /* send PREP packet */
119     int reqfd;                          /* pipe to write requests */
120     int repfd;                          /* pipe to read replies */
121     int errfd;                          /* pipe to read stderr */
122     event_handle_t *ev_repfd;           /* read event handle for repfd */
123     event_handle_t *ev_reptimeout;      /* timeout for rep data */
124     event_handle_t *ev_errfd;           /* read event handle for errfd */
125     pkt_t rep_pkt;                      /* rep packet we're sending out */
126     char *errbuf;                       /* buffer to read the err into */
127     char *repbuf;                       /* buffer to read the rep into */
128     size_t bufsize;                     /* length of repbuf */
129     size_t repbufsize;                  /* length of repbuf */
130     int repretry;                       /* times we'll retry sending the rep */
131     int seen_info_end;                  /* have we seen "sendbackup info end\n"? */
132     char info_end_buf[INFO_END_LEN];    /* last few bytes read, used for scanning for info end */
133
134     /*
135      * General user streams to the process, and their equivalent
136      * network streams.
137      */
138     struct datafd_handle {
139         int fd_read;                    /* pipe to child process */
140         int fd_write;                   /* pipe to child process */
141         event_handle_t *ev_read;        /* it's read event handle */
142         event_handle_t *ev_write;       /* it's write event handle */
143         security_stream_t *netfd;       /* stream to amanda server */
144         struct active_service *as;      /* pointer back to our enclosure */
145     } data[DATA_FD_COUNT];
146     char databuf[NETWORK_BLOCK_BYTES];  /* buffer to relay netfd data in */
147 };
148
149 /*
150  * Queue of outstanding requests that we are running.
151  */
152 GSList *serviceq = NULL;
153
154 static int wait_30s = 1;
155 static int exit_on_qlength = 1;
156 static char *auth = NULL;
157 static kencrypt_type amandad_kencrypt = KENCRYPT_NONE;
158
159 int main(int argc, char **argv);
160
161 static int allocstream(struct active_service *, int);
162 static void exit_check(void *);
163 static void protocol_accept(security_handle_t *, pkt_t *);
164 static void state_machine(struct active_service *, action_t, pkt_t *);
165
166 static action_t s_sendack(struct active_service *, action_t, pkt_t *);
167 static action_t s_repwait(struct active_service *, action_t, pkt_t *);
168 static action_t s_processrep(struct active_service *, action_t, pkt_t *);
169 static action_t s_sendrep(struct active_service *, action_t, pkt_t *);
170 static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
171
172 static void repfd_recv(void *);
173 static void process_errfd(void *cookie);
174 static void errfd_recv(void *);
175 static void timeout_repfd(void *);
176 static void protocol_recv(void *, pkt_t *, security_status_t);
177 static void process_readnetfd(void *);
178 static void process_writenetfd(void *, void *, ssize_t);
179 static struct active_service *service_new(security_handle_t *,
180     const char *, service_t, const char *);
181 static void service_delete(struct active_service *);
182 static int writebuf(struct active_service *, const void *, size_t);
183 static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
184 static char *amandad_get_security_conf (char *, void *);
185
186 static const char *state2str(state_t);
187 static const char *action2str(action_t);
188
189 int
190 main(
191     int         argc,
192     char **     argv)
193 {
194     int i, j;
195     int have_services;
196     int in, out;
197     const security_driver_t *secdrv;
198     int no_exit = 0;
199     char *pgm = "amandad";              /* in case argv[0] is not set */
200 #if defined(USE_REUSEADDR)
201     const int on = 1;
202     int r;
203 #endif
204
205     /*
206      * Configure program for internationalization:
207      *   1) Only set the message locale for now.
208      *   2) Set textdomain for all amanda related programs to "amanda"
209      *      We don't want to be forced to support dozens of message catalogs.
210      */  
211     setlocale(LC_MESSAGES, "C");
212     textdomain("amanda"); 
213
214     safe_fd(-1, 0);
215     safe_cd();
216
217     /*
218      * Nexenta needs the SUN_PERSONALITY env variable to be unset, otherwise
219      * the Sun version of tar in /usr/sun/sbin/tar is called instead.
220      *
221      * On other operating systems this will have no effect.
222      */
223
224 #ifdef HAVE_UNSETENV
225     unsetenv("SUN_PERSONALITY");
226 #endif
227
228     /*
229      * When called via inetd, it is not uncommon to forget to put the
230      * argv[0] value on the config line.  On some systems (e.g. Solaris)
231      * this causes argv and/or argv[0] to be NULL, so we have to be
232      * careful getting our name.
233      */
234     if ((argv == NULL) || (argv[0] == NULL)) {
235             pgm = "amandad";            /* in case argv[0] is not set */
236     } else {
237             pgm = basename(argv[0]);    /* Strip of leading path get debug name */
238     }
239     set_pname(pgm);
240     dbopen(DBG_SUBDIR_AMANDAD);
241
242     if(argv == NULL) {
243         error(_("argv == NULL\n"));
244         /*NOTREACHED*/
245     }
246
247     if (argc > 1 && argv && argv[1] && g_str_equal(argv[1], "--version")) {
248         printf("amandad-%s\n", VERSION);
249         return (0);
250     }
251
252     /* Don't die when child closes pipe */
253     signal(SIGPIPE, SIG_IGN);
254
255     /* Parse the configuration; we'll handle errors later */
256     config_init(CONFIG_INIT_CLIENT, NULL);
257
258     if (geteuid() == 0) {
259         check_running_as(RUNNING_AS_ROOT);
260         initgroups(CLIENT_LOGIN, get_client_gid());
261         setgid(get_client_gid());
262         setegid(get_client_gid());
263         seteuid(get_client_uid());
264     } else {
265         check_running_as(RUNNING_AS_CLIENT_LOGIN);
266     }
267
268     add_amanda_log_handler(amanda_log_stderr);
269     add_amanda_log_handler(amanda_log_syslog);
270
271     /*
272      * ad-hoc argument parsing
273      *
274      * We accept        -auth=[authentication type]
275      *                  -no-exit
276      *                  -tcp=[port]
277      *                  -udp=[port]
278      * We also add a list of services that amandad can launch
279      */
280     secdrv = NULL;
281     in = 0; out = 1;            /* default to stdin/stdout */
282     have_services = 0;
283     for (i = 1; i < argc; i++) {
284         /*
285          * Get a driver for a security type specified after -auth=
286          */
287         if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
288             argv[i] += strlen("-auth=");
289             secdrv = security_getdriver(argv[i]);
290             auth = argv[i];
291             if (secdrv == NULL) {
292                 error(_("no driver for security type '%s'\n"), argv[i]);
293                 /*NOTREACHED*/
294             }
295             if (strcmp(auth, "local") == 0 ||
296                 strcmp(auth, "rsh") == 0 ||
297                 strcmp(auth, "ssh") == 0) {
298                 int i;
299                 for (i=0; i < NSERVICES; i++) {
300                     services[i].active = 1;
301                 }
302             }
303             continue;
304         }
305
306         /*
307          * If -no-exit is specified, always run even after requests have
308          * been satisfied.
309          */
310         else if (strcmp(argv[i], "-no-exit") == 0) {
311             no_exit = 1;
312             continue;
313         }
314
315         /*
316          * Allow us to directly bind to a udp port for debugging.
317          * This may only apply to some security types.
318          */
319         else if (strncmp(argv[i], "-udp=", strlen("-udp=")) == 0) {
320 #ifdef WORKING_IPV6
321             struct sockaddr_in6 sin;
322 #else
323             struct sockaddr_in sin;
324 #endif
325
326             argv[i] += strlen("-udp=");
327 #ifdef WORKING_IPV6
328             in = out = socket(AF_INET6, SOCK_DGRAM, 0);
329 #else
330             in = out = socket(AF_INET, SOCK_DGRAM, 0);
331 #endif
332             if (in < 0) {
333                 error(_("can't create dgram socket: %s\n"), strerror(errno));
334                 /*NOTREACHED*/
335             }
336 #ifdef USE_REUSEADDR
337             r = setsockopt(in, SOL_SOCKET, SO_REUSEADDR,
338                 (void *)&on, (socklen_t_equiv)sizeof(on));
339             if (r < 0) {
340                 dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
341                           strerror(errno));
342             }
343 #endif
344
345 #ifdef WORKING_IPV6
346             sin.sin6_family = (sa_family_t)AF_INET6;
347             sin.sin6_addr = in6addr_any;
348             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
349 #else
350             sin.sin_family = (sa_family_t)AF_INET;
351             sin.sin_addr.s_addr = INADDR_ANY;
352             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
353 #endif
354             if (bind(in, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
355                 error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
356                     strerror(errno));
357                 /*NOTREACHED*/
358             }
359         }
360         /*
361          * Ditto for tcp ports.
362          */
363         else if (strncmp(argv[i], "-tcp=", strlen("-tcp=")) == 0) {
364 #ifdef WORKING_IPV6
365             struct sockaddr_in6 sin;
366 #else
367             struct sockaddr_in sin;
368 #endif
369             int sock;
370             socklen_t_equiv n;
371
372             argv[i] += strlen("-tcp=");
373 #ifdef WORKING_IPV6
374             sock = socket(AF_INET6, SOCK_STREAM, 0);
375 #else
376             sock = socket(AF_INET, SOCK_STREAM, 0);
377 #endif
378             if (sock < 0) {
379                 error(_("can't create tcp socket: %s\n"), strerror(errno));
380                 /*NOTREACHED*/
381             }
382 #ifdef USE_REUSEADDR
383             r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
384                 (void *)&on, (socklen_t_equiv)sizeof(on));
385             if (r < 0) {
386                 dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
387                           strerror(errno));
388             }
389 #endif
390 #ifdef WORKING_IPV6
391             sin.sin6_family = (sa_family_t)AF_INET6;
392             sin.sin6_addr = in6addr_any;
393             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
394 #else
395             sin.sin_family = (sa_family_t)AF_INET;
396             sin.sin_addr.s_addr = INADDR_ANY;
397             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
398 #endif
399             if (bind(sock, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
400                 error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
401                     strerror(errno));
402                 /*NOTREACHED*/
403             }
404             listen(sock, 10);
405             n = (socklen_t_equiv)sizeof(sin);
406             in = out = accept(sock, (struct sockaddr *)&sin, &n);
407         }
408         /*
409          * It must be a service name
410          */
411         else {
412             /* clear all services */
413             if(!have_services) {
414                 for (j = 0; j < (int)NSERVICES; j++)
415                     services[j].active = 0;
416             }
417             have_services = 1;
418
419             if(strcmp(argv[i],"amdump") == 0) {
420                 services[0].active = 1;
421                 services[1].active = 1;
422                 services[2].active = 1;
423                 services[3].active = 1;
424             }
425             else {
426                 for (j = 0; j < (int)NSERVICES; j++)
427                     if (strcmp(services[j].name, argv[i]) == 0)
428                         break;
429                 if (j == (int)NSERVICES) {
430                     dbprintf(_("%s: invalid service\n"), argv[i]);
431                     exit(1);
432                 }
433                 services[j].active = 1;
434             }
435         }
436     }
437
438     /*
439      * If no security type specified, use BSDTCP
440      */
441     if (secdrv == NULL) {
442         secdrv = security_getdriver("BSDTCP");
443         auth = "bsdtcp";
444         if (secdrv == NULL) {
445             error(_("no driver for default security type 'BSDTCP'\n"));
446             /*NOTREACHED*/
447         }
448     }
449
450     if(strcasecmp(auth, "rsh") == 0 ||
451        strcasecmp(auth, "ssh") == 0 ||
452        strcasecmp(auth, "local") == 0 ||
453        strcasecmp(auth, "bsdtcp") == 0) {
454         wait_30s = 0;
455         exit_on_qlength = 1;
456     }
457
458 #ifndef SINGLE_USERID
459     if (geteuid() == 0) {
460         if (strcasecmp(auth, "krb5") != 0) {
461             struct passwd *pwd;
462             /* lookup our local user name */
463             if ((pwd = getpwnam(CLIENT_LOGIN)) == NULL) {
464                 error(_("getpwnam(%s) failed."), CLIENT_LOGIN);
465             }
466
467             if (pwd->pw_uid != 0) {
468                 error(_("'amandad' must be run as user '%s' when using '%s' authentication"),
469                       CLIENT_LOGIN, auth);
470             }
471         }
472     } else {
473         if (strcasecmp(auth, "krb5") == 0) {
474             error(_("'amandad' must be run as user 'root' when using 'krb5' authentication"));
475         }
476     }
477 #endif
478
479     /* initialize */
480
481     startclock();
482
483     dbprintf(_("version %s\n"), VERSION);
484     for (i = 0; version_info[i] != NULL; i++) {
485         dbprintf("    %s", version_info[i]);
486     }
487
488     if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
489         dbprintf(_("WARNING: argv[0] not defined: check inetd.conf\n"));
490     }
491
492     /* krb5 require the euid to be 0 */
493     if (strcasecmp(auth, "krb5") == 0) {
494         seteuid((uid_t)0);
495     }
496
497     /*
498      * Schedule to call protocol_accept() when new security handles
499      * are created on stdin.
500      */
501     security_accept(secdrv, amandad_get_security_conf, in, out, protocol_accept, NULL);
502
503     /*
504      * Schedule an event that will try to exit every 30 seconds if there
505      * are no requests outstanding.
506      */
507     if(wait_30s)
508         (void)event_register((event_id_t)30, EV_TIME, exit_check, &no_exit);
509
510     /*
511      * Call event_loop() with an arg of 0, telling it to block until all
512      * events are completed.
513      */
514     event_loop(0);
515
516     close(in);
517     close(out);
518     dbclose();
519     return(0);
520 }
521
522 /*
523  * This runs periodically and checks to see if we have any active services
524  * still running.  If we don't, then we quit.
525  */
526 static void
527 exit_check(
528     void *      cookie)
529 {
530     int no_exit;
531
532     assert(cookie != NULL);
533     no_exit = *(int *)cookie;
534
535     /*
536      * If things are still running, then don't exit.
537      */
538     if (g_slist_length(serviceq) > 0)
539         return;
540
541     /*
542      * If the caller asked us to never exit, then we're done
543      */
544     if (no_exit)
545         return;
546
547     dbclose();
548     exit(0);
549 }
550
551 /*
552  * Handles new incoming protocol handles.  This is a callback for
553  * security_accept(), which gets called when new handles are detected.
554  */
555 static void
556 protocol_accept(
557     security_handle_t * handle,
558     pkt_t *             pkt)
559 {
560     pkt_t pkt_out;
561     GSList *iter;
562     struct active_service *as;
563     char *pktbody, *tok, *service, *arguments;
564     char *service_path = NULL;
565     GSList *errlist = NULL;
566     int i;
567     char *peer_name;
568
569     pkt_out.body = NULL;
570
571     /*
572      * If handle is NULL, then the connection is closed.
573      */
574     if(handle == NULL) {
575         return;
576     }
577
578     /*
579      * If we have errors (not warnings) from the config file, let the remote system
580      * know immediately.  Unfortunately, we only get one ERROR line, so if there
581      * are multiple errors, we just show the first.
582      */
583     if (config_errors(&errlist) >= CFGERR_ERRORS) {
584         GSList *iter = errlist;
585         char *errmsg;
586         gboolean multiple_errors = FALSE;
587
588         if (iter) {
589             errmsg = (char *)iter->data;
590             if (iter->next)
591                 multiple_errors = TRUE;
592         } else {
593             errmsg = "(no error message)";
594         }
595
596         pkt_init(&pkt_out, P_NAK, "ERROR %s%s", errmsg,
597             multiple_errors? _(" (additional errors not displayed)"):"");
598         do_sendpkt(handle, &pkt_out);
599         amfree(pkt_out.body);
600         security_close(handle);
601         return;
602     }
603
604     peer_name = security_get_authenticated_peer_name(handle);
605     g_debug("authenticated peer name is '%s'", peer_name);
606     amfree(peer_name);
607
608     /*
609      * If pkt is NULL, then there was a problem with the new connection.
610      */
611     if (pkt == NULL) {
612         dbprintf(_("accept error: %s\n"), security_geterror(handle));
613         pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
614         do_sendpkt(handle, &pkt_out);
615         amfree(pkt_out.body);
616         security_close(handle);
617         return;
618     }
619
620     dbprintf(_("accept recv %s pkt:\n<<<<<\n%s>>>>>\n"),
621         pkt_type2str(pkt->type), pkt->body);
622
623     /*
624      * If this is not a REQ packet, just forget about it.
625      */
626     if (pkt->type != P_REQ) {
627         dbprintf(_("received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n"),
628             pkt_type2str(pkt->type), pkt->body);
629         security_close(handle);
630         return;
631     }
632
633     pktbody = service = arguments = NULL;
634     as = NULL;
635
636     /*
637      * Parse out the service and arguments
638      */
639
640     pktbody = stralloc(pkt->body);
641
642     tok = strtok(pktbody, " ");
643     if (tok == NULL)
644         goto badreq;
645     if (strcmp(tok, "SERVICE") != 0)
646         goto badreq;
647
648     tok = strtok(NULL, " \n");
649     if (tok == NULL)
650         goto badreq;
651     service = stralloc(tok);
652
653     /* we call everything else 'arguments' */
654     tok = strtok(NULL, "");
655     if (tok == NULL)
656         goto badreq;
657     arguments = stralloc(tok);
658
659     /* see if it's one we allow */
660     for (i = 0; i < (int)NSERVICES; i++)
661         if (services[i].active == 1 && strcmp(services[i].name, service) == 0)
662             break;
663     if (i == (int)NSERVICES) {
664         dbprintf(_("%s: invalid service\n"), service);
665         pkt_init(&pkt_out, P_NAK, _("ERROR %s: invalid service, add '%s' as argument to amandad\n"), service, service);
666         goto send_pkt_out;
667     }
668
669     service_path = vstralloc(amlibexecdir, "/", service, NULL);
670     if (access(service_path, X_OK) < 0) {
671         dbprintf(_("can't execute %s: %s\n"), service_path, strerror(errno));
672             pkt_init(&pkt_out, P_NAK,
673                      _("ERROR execute access to \"%s\" denied\n"),
674                      service_path);
675         goto send_pkt_out;
676     }
677
678     /* see if its already running */
679     for (iter = serviceq; iter != NULL; iter = g_slist_next(iter)) {
680         as = (struct active_service *)iter->data;
681             if (strcmp(as->cmd, service_path) == 0 &&
682                 strcmp(as->arguments, arguments) == 0) {
683                     dbprintf(_("%s %s: already running, acking req\n"),
684                         service, arguments);
685                     pkt_init_empty(&pkt_out, P_ACK);
686                     goto send_pkt_out_no_delete;
687             }
688     }
689
690     /*
691      * create a new service instance, and send the arguments down
692      * the request pipe.
693      */
694     dbprintf(_("creating new service: %s\n%s\n"), service, arguments);
695     as = service_new(handle, service_path, services[i].service, arguments);
696     if (writebuf(as, arguments, strlen(arguments)) < 0) {
697         const char *errmsg = strerror(errno);
698         dbprintf(_("error sending arguments to %s: %s\n"), service, errmsg);
699         pkt_init(&pkt_out, P_NAK, _("ERROR error writing arguments to %s: %s\n"),
700             service, errmsg);
701         goto send_pkt_out;
702     }
703     aclose(as->reqfd);
704
705     amfree(pktbody);
706     amfree(service);
707     amfree(service_path);
708     amfree(arguments);
709
710     /*
711      * Move to the sendack state, and start up the state
712      * machine.
713      */
714     as->state = s_sendack;
715     state_machine(as, A_START, NULL);
716     return;
717
718 badreq:
719     pkt_init(&pkt_out, P_NAK, _("ERROR invalid REQ\n"));
720     dbprintf(_("received invalid %s packet:\n<<<<<\n%s>>>>>\n\n"),
721         pkt_type2str(pkt->type), pkt->body);
722
723 send_pkt_out:
724     if(as)
725         service_delete(as);
726 send_pkt_out_no_delete:
727     amfree(pktbody);
728     amfree(service_path);
729     amfree(service);
730     amfree(arguments);
731     do_sendpkt(handle, &pkt_out);
732     security_close(handle);
733     amfree(pkt_out.body);
734 }
735
736 /*
737  * Handles incoming protocol packets.  Routes responses to the proper
738  * running service.
739  */
740 static void
741 state_machine(
742     struct active_service *     as,
743     action_t                    action,
744     pkt_t *                     pkt)
745 {
746     action_t retaction;
747     state_t curstate;
748     pkt_t nak;
749
750     amandad_debug(1, _("state_machine: %p entering\n"), as);
751     for (;;) {
752         curstate = as->state;
753         amandad_debug(1, _("state_machine: %p curstate=%s action=%s\n"), as,
754                           state2str(curstate), action2str(action));
755         retaction = (*curstate)(as, action, pkt);
756         amandad_debug(1, _("state_machine: %p curstate=%s returned %s (nextstate=%s)\n"),
757                           as, state2str(curstate), action2str(retaction),
758                           state2str(as->state));
759
760         switch (retaction) {
761         /*
762          * State has queued up and is now blocking on input.
763          */
764         case A_PENDING:
765             amandad_debug(1, _("state_machine: %p leaving (A_PENDING)\n"), as);
766             return;
767
768         /*
769          * service has switched states.  Loop.
770          */
771         case A_CONTINUE:
772             break;
773
774         /*
775          * state has determined that the packet it received was bogus.
776          * Send a nak, and return.
777          */
778         case A_SENDNAK:
779             dbprintf(_("received unexpected %s packet\n"),
780                 pkt_type2str(pkt->type));
781             dbprintf(_("<<<<<\n%s----\n\n"), pkt->body);
782             pkt_init(&nak, P_NAK, _("ERROR unexpected packet type %s\n"),
783                 pkt_type2str(pkt->type));
784             do_sendpkt(as->security_handle, &nak);
785             amfree(nak.body);
786             security_recvpkt(as->security_handle, protocol_recv, as, -1);
787             amandad_debug(1, _("state_machine: %p leaving (A_SENDNAK)\n"), as);
788             return;
789
790         /*
791          * Service is done.  Remove it and finish.
792          */
793         case A_FINISH:
794             amandad_debug(1, _("state_machine: %p leaving (A_FINISH)\n"), as);
795             service_delete(as);
796             return;
797
798         default:
799             assert(0);
800             break;
801         }
802     }
803     /*NOTREACHED*/
804 }
805
806 /*
807  * This state just sends an ack.  After that, we move to the repwait
808  * state to wait for REP data to arrive from the subprocess.
809  */
810 static action_t
811 s_sendack(
812     struct active_service *     as,
813     action_t                    action,
814     pkt_t *                     pkt)
815 {
816     pkt_t ack;
817
818     (void)action;       /* Quiet unused parameter warning */
819     (void)pkt;          /* Quiet unused parameter warning */
820
821     pkt_init_empty(&ack, P_ACK);
822     if (do_sendpkt(as->security_handle, &ack) < 0) {
823         dbprintf(_("error sending ACK: %s\n"),
824             security_geterror(as->security_handle));
825         amfree(ack.body);
826         return (A_FINISH);
827     }
828     amfree(ack.body);
829
830     /*
831      * move to the repwait state
832      * Setup a listener for data on the reply fd, but also
833      * listen for packets over the wire, as the server may
834      * poll us if we take a long time.
835      * Setup a timeout that will fire if it takes too long to
836      * receive rep data.
837      */
838     as->state = s_repwait;
839     as->ev_repfd = event_register((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
840     as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
841         timeout_repfd, as);
842     as->errbuf = NULL;
843     as->ev_errfd = event_register((event_id_t)as->errfd, EV_READFD, errfd_recv, as);
844     security_recvpkt(as->security_handle, protocol_recv, as, -1);
845     return (A_PENDING);
846 }
847
848 /*
849  * This is the repwait state.  We have responded to the initial REQ with
850  * an ACK, and we are now waiting for the process we spawned to pass us 
851  * data to send in a REP.
852  */
853 static action_t
854 s_repwait(
855     struct active_service *     as,
856     action_t                    action,
857     pkt_t *                     pkt)
858 {
859     ssize_t   n;
860     char     *repbuf_temp;
861     char     *what;
862     char     *msg;
863     int       code = 0;
864     int       pid;
865     amwait_t  retstat;
866
867     /*
868      * We normally shouldn't receive any packets while waiting
869      * for our REP data, but in some cases we do.
870      */
871     if (action == A_RECVPKT) {
872         assert(pkt != NULL);
873         /*
874          * Another req for something that's running.  Just send an ACK
875          * and go back and wait for more data.
876          */
877         if (pkt->type == P_REQ) {
878             dbprintf(_("received dup P_REQ packet, ACKing it\n"));
879             amfree(as->rep_pkt.body);
880             pkt_init_empty(&as->rep_pkt, P_ACK);
881             do_sendpkt(as->security_handle, &as->rep_pkt);
882             security_recvpkt(as->security_handle, protocol_recv, as, -1);
883             return (A_PENDING);
884         }
885         /* something unexpected.  Nak it */
886         return (A_SENDNAK);
887     }
888
889     if (action == A_TIMEOUT) {
890         amfree(as->rep_pkt.body);
891         pkt_init(&as->rep_pkt, P_NAK, _("ERROR timeout on reply pipe\n"));
892         dbprintf(_("%s timed out waiting for REP data\n"), as->cmd);
893         do_sendpkt(as->security_handle, &as->rep_pkt);
894         return (A_FINISH);
895     }
896
897     assert(action == A_RECVREP);
898     if(as->bufsize == 0) {
899         as->bufsize = NETWORK_BLOCK_BYTES;
900         as->repbuf = alloc(as->bufsize);
901     }
902
903     do {
904         n = read(as->repfd, as->repbuf + as->repbufsize,
905                  as->bufsize - as->repbufsize - 1);
906     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
907     if (n < 0) {
908         const char *errstr = strerror(errno);
909         dbprintf(_("read error on reply pipe: %s\n"), errstr);
910         amfree(as->rep_pkt.body);
911         pkt_init(&as->rep_pkt, P_NAK, _("ERROR read error on reply pipe: %s\n"),
912                  errstr);
913         do_sendpkt(as->security_handle, &as->rep_pkt);
914         return (A_FINISH);
915     }
916
917     /* If end of service, wait for process status */
918     if (n == 0) {
919         pid = waitpid(as->pid, &retstat, WNOHANG);
920         if (as->service  == SERVICE_NOOP ||
921             as->service  == SERVICE_SENDSIZE ||
922             as->service  == SERVICE_SELFCHECK) {
923             int t = 0;
924             while (t<5 && pid == 0) {
925                 sleep(1);
926                 t++;
927                 pid = waitpid(as->pid, &retstat, WNOHANG);
928             }
929         }
930
931         process_errfd(as);
932
933         if (pid == 0)
934             pid = waitpid(as->pid, &retstat, WNOHANG);
935
936         if (pid > 0) {
937             what = NULL;
938             if (! WIFEXITED(retstat)) {
939                 what = _("signal");
940                 code = WTERMSIG(retstat);
941             } else if (WEXITSTATUS(retstat) != 0) {
942                 what = _("code");
943                 code = WEXITSTATUS(retstat);
944             }
945             if (what) {
946                 dbprintf(_("service %s failed: pid %u exited with %s %d\n"),
947                          (as->cmd)?as->cmd:_("??UNKONWN??"),
948                          (unsigned)as->pid,
949                          what, code);
950                 msg = vstrallocf(
951                      _("ERROR service %s failed: pid %u exited with %s %d\n"),
952                      (as->cmd)?as->cmd:_("??UNKONWN??"), (unsigned)as->pid,
953                      what, code);
954                 if (as->repbufsize + strlen(msg) >= (as->bufsize - 1)) {
955                         as->bufsize *= 2;
956                         repbuf_temp = alloc(as->bufsize);
957                         memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
958                         amfree(as->repbuf);
959                         as->repbuf = repbuf_temp;
960                 }
961                 strcpy(as->repbuf + as->repbufsize, msg);
962                 as->repbufsize += strlen(msg);
963                 amfree(msg);
964             }
965         }
966     }
967
968     /*
969      * If we got some data, go back and wait for more, or EOF.  Nul terminate
970      * the buffer first.
971      */
972     as->repbuf[n + as->repbufsize] = '\0';
973     if (n > 0) {
974         as->repbufsize += n;
975         if(as->repbufsize >= (as->bufsize - 1)) {
976             as->bufsize *= 2;
977             repbuf_temp = alloc(as->bufsize);
978             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
979             amfree(as->repbuf);
980             as->repbuf = repbuf_temp;
981         }
982         else if(as->send_partial_reply) {
983             amfree(as->rep_pkt.body);
984             pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
985             do_sendpkt(as->security_handle, &as->rep_pkt);
986             amfree(as->rep_pkt.body);
987             pkt_init_empty(&as->rep_pkt, P_REP);
988         }
989  
990         return (A_PENDING);
991     }
992
993     /*
994      * If we got 0, then we hit EOF.  Process the data and release
995      * the timeout.
996      */
997     assert(n == 0);
998
999     assert(as->ev_repfd != NULL);
1000     event_release(as->ev_repfd);
1001     as->ev_repfd = NULL;
1002
1003     assert(as->ev_reptimeout != NULL);
1004     event_release(as->ev_reptimeout);
1005     as->ev_reptimeout = NULL;
1006
1007     as->state = s_processrep;
1008     aclose(as->repfd);
1009     return (A_CONTINUE);
1010 }
1011
1012 /*
1013  * After we have read in all of the rep data, we process it and send
1014  * it out as a REP packet.
1015  */
1016 static action_t
1017 s_processrep(
1018     struct active_service *     as,
1019     action_t                    action,
1020     pkt_t *                     pkt)
1021 {
1022     char *tok, *repbuf;
1023
1024     (void)action;       /* Quiet unused parameter warning */
1025     (void)pkt;          /* Quiet unused parameter warning */
1026
1027     /*
1028      * Copy the rep lines into the outgoing packet.
1029      *
1030      * If this line is a CONNECT, translate it
1031      * Format is "CONNECT <tag> <handle> <tag> <handle> etc...
1032      * Example:
1033      *
1034      *  CONNECT DATA 4 MESG 5 INDEX 6
1035      *
1036      * The tags are arbitrary.  The handles are in the DATA_FD pool.
1037      * We need to map these to security streams and pass them back
1038      * to the amanda server.  If the handle is -1, then we don't map.
1039      */
1040     if (strncmp_const(as->repbuf,"KENCRYPT\n") == 0) {
1041         amandad_kencrypt = KENCRYPT_WILL_DO;
1042         repbuf = stralloc(as->repbuf + 9);
1043     } else {
1044         repbuf = stralloc(as->repbuf);
1045     }
1046     amfree(as->rep_pkt.body);
1047     pkt_init_empty(&as->rep_pkt, P_REP);
1048     tok = strtok(repbuf, " ");
1049     if (tok == NULL)
1050         goto error;
1051     if (strcmp(tok, "CONNECT") == 0) {
1052         char *line, *nextbuf;
1053
1054         /* Save the entire line */
1055         line = strtok(NULL, "\n");
1056         /* Save the buf following the line */
1057         nextbuf = strtok(NULL, "");
1058
1059         if (line == NULL || nextbuf == NULL)
1060             goto error;
1061
1062         pkt_cat(&as->rep_pkt, "CONNECT");
1063
1064         /* loop over the id/handle pairs */
1065         for (;;) {
1066             /* id */
1067             tok = strtok(line, " ");
1068             line = NULL;        /* keep working from line */
1069             if (tok == NULL)
1070                 break;
1071             pkt_cat(&as->rep_pkt, " %s", tok);
1072
1073             /* handle */
1074             tok = strtok(NULL, " \n");
1075             if (tok == NULL)
1076                 goto error;
1077             /* convert the handle into something the server can process */
1078             pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
1079         }
1080         pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
1081     } else {
1082 error:
1083         pkt_cat(&as->rep_pkt, "%s", as->repbuf);
1084     }
1085
1086     /*
1087      * We've setup our REP packet in as->rep_pkt.  Now move to the transmission
1088      * state.
1089      */
1090     as->state = s_sendrep;
1091     as->repretry = getconf_int(CNF_REP_TRIES);
1092     amfree(repbuf);
1093     return (A_CONTINUE);
1094 }
1095
1096 /*
1097  * This is the state where we send the REP we just collected from our child.
1098  */
1099 static action_t
1100 s_sendrep(
1101     struct active_service *     as,
1102     action_t                    action,
1103     pkt_t *                     pkt)
1104 {
1105     (void)action;       /* Quiet unused parameter warning */
1106     (void)pkt;          /* Quiet unused parameter warning */
1107
1108     /*
1109      * Transmit it and move to the ack state.
1110      */
1111     do_sendpkt(as->security_handle, &as->rep_pkt);
1112     security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
1113     as->state = s_ackwait;
1114     return (A_PENDING);
1115 }
1116
1117 /*
1118  * This is the state in which we wait for the server to ACK the REP
1119  * we just sent it.
1120  */
1121 static action_t
1122 s_ackwait(
1123     struct active_service *     as,
1124     action_t                    action,
1125     pkt_t *                     pkt)
1126 {
1127     struct datafd_handle *dh;
1128     int npipes;
1129
1130     /*
1131      * If we got a timeout, try again, but eventually give up.
1132      */
1133     if (action == A_TIMEOUT) {
1134         if (--as->repretry > 0) {
1135             as->state = s_sendrep;
1136             return (A_CONTINUE);
1137         }
1138         dbprintf(_("timeout waiting for ACK for our REP\n"));
1139         return (A_FINISH);
1140     }
1141     amandad_debug(1, _("received ACK, now opening streams\n"));
1142
1143     assert(action == A_RECVPKT);
1144
1145     if (pkt->type == P_REQ) {
1146         dbprintf(_("received dup P_REQ packet, resending REP\n"));
1147         as->state = s_sendrep;
1148         return (A_CONTINUE);
1149     }
1150
1151     if (pkt->type != P_ACK)
1152         return (A_SENDNAK);
1153
1154     if (amandad_kencrypt == KENCRYPT_WILL_DO) {
1155         amandad_kencrypt = KENCRYPT_YES;
1156     }
1157
1158     /*
1159      * Got the ack, now open the pipes
1160      */
1161     for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1162         if (dh->netfd == NULL)
1163             continue;
1164         dbprintf("opening security stream for fd %d\n", (int)(dh - as->data) + DATA_FD_OFFSET);
1165         if (security_stream_accept(dh->netfd) < 0) {
1166             dbprintf(_("stream %td accept failed: %s\n"),
1167                 dh - &as->data[0], security_geterror(as->security_handle));
1168             security_stream_close(dh->netfd);
1169             dh->netfd = NULL;
1170             continue;
1171         }
1172
1173         /* setup an event for reads from it.  As a special case, don't start
1174          * listening on as->data[0] until we read some data on another fd, if
1175          * the service is sendbackup.  This ensures that we send a MESG or 
1176          * INDEX token before any DATA tokens, as dumper assumes. This is a
1177          * hack, if that wasn't already obvious! */
1178         if (dh != &as->data[0] || as->service != SERVICE_SENDBACKUP) {
1179             dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1180                                          process_readnetfd, dh);
1181         } else {
1182             amandad_debug(1, "Skipping registration of sendbackup's data FD\n");
1183         }
1184
1185         security_stream_read(dh->netfd, process_writenetfd, dh);
1186
1187     }
1188
1189     /*
1190      * Pipes are open, so auth them.  Count them at the same time.
1191      */
1192     for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1193         if (dh->netfd == NULL)
1194             continue;
1195         if (security_stream_auth(dh->netfd) < 0) {
1196             security_stream_close(dh->netfd);
1197             dh->netfd = NULL;
1198             event_release(dh->ev_read);
1199             event_release(dh->ev_write);
1200             dh->ev_read = NULL;
1201             dh->ev_write = NULL;
1202         } else {
1203             npipes++;
1204         }
1205     }
1206
1207     /*
1208      * If no pipes are open, then we're done.  Otherwise, just start running.
1209      * The event handlers on all of the pipes will take it from here.
1210      */
1211     amandad_debug(1, _("at end of s_ackwait, npipes is %d\n"), npipes);
1212     if (npipes == 0)
1213         return (A_FINISH);
1214     else {
1215         security_close(as->security_handle);
1216         as->security_handle = NULL;
1217         return (A_PENDING);
1218     }
1219 }
1220
1221 /*
1222  * Called when a repfd has received data
1223  */
1224 static void
1225 repfd_recv(
1226     void *      cookie)
1227 {
1228     struct active_service *as = cookie;
1229
1230     assert(as != NULL);
1231     assert(as->ev_repfd != NULL);
1232
1233     state_machine(as, A_RECVREP, NULL);
1234 }
1235
1236 static void
1237 process_errfd(
1238     void *cookie)
1239 {
1240     struct active_service *as = cookie;
1241
1242     /* Process errfd before sending the REP packet */
1243     if (as->ev_errfd) {
1244         SELECT_ARG_TYPE readset;
1245         struct timeval  tv;
1246         int             nfound;
1247
1248         memset(&tv, 0, SIZEOF(tv));
1249         FD_ZERO(&readset);
1250         FD_SET(as->errfd, &readset);
1251         nfound = select(as->errfd+1, &readset, NULL, NULL, &tv);
1252         if (nfound && FD_ISSET(as->errfd, &readset)) {
1253             errfd_recv(as);
1254         }
1255     }
1256 }
1257
1258 /*
1259  * Called when a errfd has received data
1260  */
1261 static void
1262 errfd_recv(
1263     void *      cookie)
1264 {
1265     struct active_service *as = cookie;
1266     char  buf[32769];
1267     int   n;
1268     char *r;
1269
1270     assert(as != NULL);
1271     assert(as->ev_errfd != NULL);
1272
1273     n = read(as->errfd, &buf, 32768);
1274     /* merge buffer */
1275     if (n > 0) {
1276         /* Terminate it with '\0' */
1277         buf[n+1] = '\0';
1278
1279         if (as->errbuf) {
1280             as->errbuf = vstrextend(&as->errbuf, buf, NULL);
1281         } else {
1282             as->errbuf = stralloc(buf);
1283         }
1284     } else if (n == 0) {
1285         event_release(as->ev_errfd);
1286         as->ev_errfd = NULL;
1287     } else { /* n < 0 */
1288         event_release(as->ev_errfd);
1289         as->ev_errfd = NULL;
1290         g_snprintf(buf, 32768,
1291                    "error reading stderr or service: %s\n", strerror(errno));
1292     }
1293
1294     /* for each line terminate by '\n' */
1295     while (as->errbuf != NULL  && (r = strchr(as->errbuf, '\n')) != NULL) {
1296         char *s;
1297
1298         *r = '\0';
1299         s = vstrallocf("ERROR service %s: %s\n",
1300                        services[as->service].name, as->errbuf);
1301
1302         /* Add to repbuf, error message will be in the REP packet if it
1303          * is not already sent
1304          */
1305         n = strlen(s);
1306         if (as->bufsize == 0) {
1307             as->bufsize = NETWORK_BLOCK_BYTES;
1308             as->repbuf = alloc(as->bufsize);
1309         }
1310         while (as->bufsize < as->repbufsize + n) {
1311             char *repbuf_temp;
1312             as->bufsize *= 2;
1313             repbuf_temp = alloc(as->bufsize);
1314             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
1315             amfree(as->repbuf);
1316             as->repbuf = repbuf_temp;
1317         }
1318         memcpy(as->repbuf + as->repbufsize, s, n);
1319         as->repbufsize += n;
1320
1321         dbprintf("%s", s);
1322
1323         /* remove first line from buffer */
1324         r++;
1325         s = stralloc(r);
1326         amfree(as->errbuf);
1327         as->errbuf = s;
1328     }
1329 }
1330
1331 /*
1332  * Called when a repfd has timed out
1333  */
1334 static void
1335 timeout_repfd(
1336     void *      cookie)
1337 {
1338     struct active_service *as = cookie;
1339
1340     assert(as != NULL);
1341     assert(as->ev_reptimeout != NULL);
1342
1343     state_machine(as, A_TIMEOUT, NULL);
1344 }
1345
1346 /*
1347  * Called when a handle has received data
1348  */
1349 static void
1350 protocol_recv(
1351     void *              cookie,
1352     pkt_t *             pkt,
1353     security_status_t   status)
1354 {
1355     struct active_service *as = cookie;
1356
1357     assert(as != NULL);
1358
1359     switch (status) {
1360     case S_OK:
1361         dbprintf(_("received %s pkt:\n<<<<<\n%s>>>>>\n"),
1362             pkt_type2str(pkt->type), pkt->body);
1363         state_machine(as, A_RECVPKT, pkt);
1364         break;
1365     case S_TIMEOUT:
1366         dbprintf(_("timeout\n"));
1367         state_machine(as, A_TIMEOUT, NULL);
1368         break;
1369     case S_ERROR:
1370         dbprintf(_("receive error: %s\n"),
1371             security_geterror(as->security_handle));
1372         break;
1373     }
1374 }
1375
1376 /*
1377  * This is a generic relay function that just reads data from one of
1378  * the process's pipes and passes it up the equivalent security_stream_t
1379  */
1380 static void
1381 process_readnetfd(
1382     void *      cookie)
1383 {
1384     pkt_t nak;
1385     struct datafd_handle *dh = cookie;
1386     struct active_service *as = dh->as;
1387     ssize_t n;
1388
1389     nak.body = NULL;
1390
1391     do {
1392         n = read(dh->fd_read, as->databuf, SIZEOF(as->databuf));
1393     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
1394
1395     /*
1396      * Process has died.
1397      */
1398     if (n < 0) {
1399         pkt_init(&nak, P_NAK, _("A ERROR data descriptor %d broken: %s\n"),
1400             dh->fd_read, strerror(errno));
1401         goto sendnak;
1402     }
1403     /*
1404      * Process has closed the pipe.  Just remove this event handler.
1405      * If all pipes are closed, shut down this service.
1406      */
1407     if (n == 0) {
1408         event_release(dh->ev_read);
1409         dh->ev_read = NULL;
1410         if(dh->ev_write == NULL) {
1411             security_stream_close(dh->netfd);
1412             dh->netfd = NULL;
1413         }
1414         for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1415             if (dh->netfd != NULL)
1416                 return;
1417         }
1418         service_delete(as);
1419         return;
1420     }
1421
1422     /* Handle the special case of recognizing "sendbackup info end"
1423      * from sendbackup's MESG fd */
1424     if (as->service == SERVICE_SENDBACKUP && !as->seen_info_end && dh == &as->data[1]) {
1425         /* make a buffer containing the combined data from info_end_buf
1426          * and what we've read this time, and search it for info_end_strj
1427          * This includes a NULL byte for strstr's sanity. */
1428         char *combined_buf = malloc(INFO_END_LEN + n + 1);
1429         memcpy(combined_buf, as->info_end_buf, INFO_END_LEN);
1430         memcpy(combined_buf+INFO_END_LEN, as->databuf, n);
1431         combined_buf[INFO_END_LEN+n] = '\0';
1432
1433         as->seen_info_end = (strstr(combined_buf, info_end_str) != NULL);
1434
1435         /* fill info_end_buf from the tail end of combined_buf */
1436         memcpy(as->info_end_buf, combined_buf + n, INFO_END_LEN);
1437         amfree(combined_buf);
1438
1439         /* if we did see info_end_str, start reading the data fd (fd 0) */
1440         if (as->seen_info_end) {
1441             struct datafd_handle *dh = &as->data[0];
1442             amandad_debug(1, "Opening datafd to sendbackup (delayed until sendbackup sent header info)\n");
1443             dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1444                                          process_readnetfd, dh);
1445         } else {
1446             amandad_debug(1, "sendbackup header info still not complete\n");
1447         }
1448     }
1449
1450     if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
1451         /* stream has croaked */
1452         pkt_init(&nak, P_NAK, _("ERROR write error on stream %d: %s\n"),
1453             security_stream_id(dh->netfd),
1454             security_stream_geterror(dh->netfd));
1455         goto sendnak;
1456     }
1457     return;
1458
1459 sendnak:
1460     do_sendpkt(as->security_handle, &nak);
1461     service_delete(as);
1462     amfree(nak.body);
1463 }
1464
1465 /*
1466  * This is a generic relay function that just read data from one of
1467  * the security_stream_t and passes it up the equivalent process's pipes
1468  */
1469 static void
1470 process_writenetfd(
1471     void *      cookie,
1472     void *      buf,
1473     ssize_t     size)
1474 {
1475     struct datafd_handle *dh;
1476
1477     assert(cookie != NULL);
1478     dh = cookie;
1479
1480     if (dh->fd_write <= 0) {
1481         dbprintf(_("process_writenetfd: dh->fd_write <= 0\n"));
1482     } else if (size > 0) {
1483         full_write(dh->fd_write, buf, (size_t)size);
1484         security_stream_read(dh->netfd, process_writenetfd, dh);
1485     }
1486     else {
1487         aclose(dh->fd_write);
1488     }
1489 }
1490
1491
1492 /*
1493  * Convert a local stream handle (DATA_FD...) into something that
1494  * can be sent to the amanda server.
1495  *
1496  * Returns a number that should be sent to the server in the REP packet.
1497  */
1498 static int
1499 allocstream(
1500     struct active_service *     as,
1501     int                         handle)
1502 {
1503     struct datafd_handle *dh;
1504
1505     /* note that handle is in the range DATA_FD_OFFSET to DATA_FD_COUNT, but
1506      * it is NOT a file descriptor! */
1507
1508     /* if the handle is -1, then we don't bother */
1509     if (handle < 0)
1510         return (-1);
1511
1512     /* make sure the handle's kosher */
1513     if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
1514         return (-1);
1515
1516     /* get a pointer into our handle array */
1517     dh = &as->data[handle - DATA_FD_OFFSET];
1518
1519     /* make sure we're not already using the net handle */
1520     if (dh->netfd != NULL)
1521         return (-1);
1522
1523     /* allocate a stream from the security layer and return */
1524     dh->netfd = security_stream_server(as->security_handle);
1525     if (dh->netfd == NULL) {
1526         dbprintf(_("couldn't open stream to server: %s\n"),
1527             security_geterror(as->security_handle));
1528         return (-1);
1529     }
1530
1531     /*
1532      * convert the stream into a numeric id that can be sent to the
1533      * remote end.
1534      */
1535     return (security_stream_id(dh->netfd));
1536 }
1537
1538 /*
1539  * Create a new service instance
1540  */
1541 static struct active_service *
1542 service_new(
1543     security_handle_t * security_handle,
1544     const char *        cmd,
1545     service_t           service,
1546     const char *        arguments)
1547 {
1548     int i;
1549     int data_read[DATA_FD_COUNT + 2][2];
1550     int data_write[DATA_FD_COUNT + 2][2];
1551     struct active_service *as;
1552     pid_t pid;
1553     int newfd;
1554     char *peer_name;
1555     char *amanda_remote_host_env[2];
1556
1557     assert(security_handle != NULL);
1558     assert(cmd != NULL);
1559     assert(arguments != NULL);
1560
1561     /* a plethora of pipes */
1562     /* data_read[0]                : stdin
1563      * data_write[0]               : stdout
1564      * data_read[1], data_write[1] : first  stream
1565      * data_read[2], data_write[2] : second stream
1566      * data_read[3], data_write[3] : third stream
1567      * data_write[4]               : stderr
1568      */
1569     for (i = 0; i < DATA_FD_COUNT + 1; i++) {
1570         if (pipe(data_read[i]) < 0) {
1571             error(_("pipe: %s\n"), strerror(errno));
1572             /*NOTREACHED*/
1573         }
1574         if (pipe(data_write[i]) < 0) {
1575             error(_("pipe: %s\n"), strerror(errno));
1576             /*NOTREACHED*/
1577         }
1578     }
1579     if (pipe(data_write[STDERR_PIPE]) < 0) {
1580         error(_("pipe: %s\n"), strerror(errno));
1581         /*NOTREACHED*/
1582     }
1583
1584     switch(pid = fork()) {
1585     case -1:
1586         error(_("could not fork service %s: %s\n"), cmd, strerror(errno));
1587         /*NOTREACHED*/
1588     default:
1589         /*
1590          * The parent.  Close the far ends of our pipes and return.
1591          */
1592         as = g_new0(struct active_service, 1);
1593         as->cmd = stralloc(cmd);
1594         as->arguments = stralloc(arguments);
1595         as->security_handle = security_handle;
1596         as->state = NULL;
1597         as->service = service;
1598         as->pid = pid;
1599         as->send_partial_reply = 0;
1600         as->seen_info_end = FALSE;
1601         /* fill in info_end_buf with non-null characters */
1602         memset(as->info_end_buf, '-', sizeof(as->info_end_buf));
1603         if(service == SERVICE_SENDSIZE) {
1604             g_option_t *g_options;
1605             char *option_str, *p;
1606
1607             option_str = stralloc(as->arguments+8);
1608             p = strchr(option_str,'\n');
1609             if(p) *p = '\0';
1610
1611             g_options = parse_g_options(option_str, 1);
1612             if(am_has_feature(g_options->features, fe_partial_estimate)) {
1613                 as->send_partial_reply = 1;
1614             }
1615             free_g_options(g_options);
1616             amfree(option_str);
1617         }
1618
1619         /* write to the request pipe */
1620         aclose(data_read[0][0]);
1621         as->reqfd = data_read[0][1];
1622
1623         /*
1624          * read from the reply pipe
1625          */
1626         as->repfd = data_write[0][0];
1627         aclose(data_write[0][1]);
1628         as->ev_repfd = NULL;
1629         as->repbuf = NULL;
1630         as->repbufsize = 0;
1631         as->bufsize = 0;
1632         as->repretry = 0;
1633         as->rep_pkt.body = NULL;
1634
1635         /*
1636          * read from the stderr pipe
1637          */
1638         as->errfd = data_write[STDERR_PIPE][0];
1639         aclose(data_write[STDERR_PIPE][1]);
1640         as->ev_errfd = NULL;
1641         as->errbuf = NULL;
1642
1643         /*
1644          * read from the rest of the general-use pipes
1645          * (netfds are opened as the client requests them)
1646          */
1647         for (i = 0; i < DATA_FD_COUNT; i++) {
1648             aclose(data_read[i + 1][1]);
1649             aclose(data_write[i + 1][0]);
1650             as->data[i].fd_read = data_read[i + 1][0];
1651             as->data[i].fd_write = data_write[i + 1][1];
1652             as->data[i].ev_read = NULL;
1653             as->data[i].ev_write = NULL;
1654             as->data[i].netfd = NULL;
1655             as->data[i].as = as;
1656         }
1657
1658         /* add it to the service queue */
1659         /* increment the active service count */
1660         serviceq = g_slist_append(serviceq, (gpointer)as);
1661
1662         return (as);
1663     case 0:
1664         /*
1665          * The child.  Put our pipes in their advertised locations
1666          * and start up.
1667          */
1668
1669         /* set up the AMANDA_AUTHENTICATED_PEER env var so child services
1670          * can use it to authenticate */
1671         peer_name = security_get_authenticated_peer_name(security_handle);
1672         amanda_remote_host_env[0] = NULL;
1673         amanda_remote_host_env[1] = NULL;
1674         if (*peer_name) {
1675             amanda_remote_host_env[0] =
1676                 g_strdup_printf("AMANDA_AUTHENTICATED_PEER=%s", peer_name);
1677         }
1678
1679         /*
1680          * The data stream is stdin in the new process
1681          */
1682         if (dup2(data_read[0][0], 0) < 0) {
1683             error(_("dup %d to %d failed: %s\n"), data_read[0][0], 0,
1684                 strerror(errno));
1685             /*NOTREACHED*/
1686         }
1687         aclose(data_read[0][0]);
1688         aclose(data_read[0][1]);
1689
1690         /*
1691          * The reply stream is stdout
1692          */
1693         if (dup2(data_write[0][1], 1) < 0) {
1694             error(_("dup %d to %d failed: %s\n"), data_write[0][1], 1,
1695                 strerror(errno));
1696         }
1697         aclose(data_write[0][0]);
1698         aclose(data_write[0][1]);
1699
1700         for (i = 0; i < DATA_FD_COUNT; i++) {
1701             aclose(data_read[i + 1][0]);
1702             aclose(data_write[i + 1][1]);
1703         }
1704
1705         /*
1706          *  Make sure they are not open in the range DATA_FD_OFFSET to
1707          *      DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1
1708          */
1709         for (i = 0; i < DATA_FD_COUNT; i++) {
1710             while(data_read[i + 1][1] >= DATA_FD_OFFSET &&
1711                   data_read[i + 1][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1712                 newfd = dup(data_read[i + 1][1]);
1713                 if(newfd == -1)
1714                     error(_("Can't dup out off DATA_FD range"));
1715                 data_read[i + 1][1] = newfd;
1716             }
1717             while(data_write[i + 1][0] >= DATA_FD_OFFSET &&
1718                   data_write[i + 1][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1719                 newfd = dup(data_write[i + 1][0]);
1720                 if(newfd == -1)
1721                     error(_("Can't dup out off DATA_FD range"));
1722                 data_write[i + 1][0] = newfd;
1723             }
1724         }
1725         while(data_write[4][0] >= DATA_FD_OFFSET &&
1726               data_write[4][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1727             newfd = dup(data_write[4][0]);
1728             if (newfd == -1)
1729                 error(_("Can't dup out off DATA_FD range"));
1730             data_write[4][0] = newfd;
1731         }
1732         while(data_write[4][1] >= DATA_FD_OFFSET &&
1733               data_write[4][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1734             newfd = dup(data_write[4][1]);
1735             if (newfd == -1)
1736                 error(_("Can't dup out off DATA_FD range"));
1737             data_write[4][1] = newfd;
1738         }
1739
1740         for (i = 0; i < DATA_FD_COUNT*2; i++)
1741             close(DATA_FD_OFFSET + i);
1742
1743         /*
1744          * The rest start at the offset defined in amandad.h, and continue
1745          * through the internal defined.
1746          */
1747         for (i = 0; i < DATA_FD_COUNT; i++) {
1748             if (dup2(data_read[i + 1][1], i*2 + DATA_FD_OFFSET) < 0) {
1749                 error(_("dup %d to %d failed: %s\n"), data_read[i + 1][1],
1750                     i + DATA_FD_OFFSET, strerror(errno));
1751             }
1752             aclose(data_read[i + 1][1]);
1753
1754             if (dup2(data_write[i + 1][0], i*2 + 1 + DATA_FD_OFFSET) < 0) {
1755                 error(_("dup %d to %d failed: %s\n"), data_write[i + 1][0],
1756                     i + DATA_FD_OFFSET, strerror(errno));
1757             }
1758             aclose(data_write[i + 1][0]);
1759         }
1760
1761         /* close all unneeded fd */
1762         close(STDERR_FILENO);
1763         dup2(data_write[STDERR_PIPE][1], 2);
1764         aclose(data_write[STDERR_PIPE][0]);
1765         aclose(data_write[STDERR_PIPE][1]);
1766         safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
1767
1768         execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env_full(amanda_remote_host_env));
1769         error(_("could not exec service %s: %s\n"), cmd, strerror(errno));
1770         /*NOTREACHED*/
1771     }
1772     return NULL;
1773 }
1774
1775 /*
1776  * Unallocate a service instance
1777  */
1778 static void
1779 service_delete(
1780     struct active_service *     as)
1781 {
1782     int i;
1783     int   count;
1784     pid_t pid;
1785     struct datafd_handle *dh;
1786
1787     amandad_debug(1, _("closing service: %s\n"),
1788                       (as->cmd)?as->cmd:_("??UNKONWN??"));
1789
1790     assert(as != NULL);
1791
1792     assert(as->cmd != NULL);
1793     amfree(as->cmd);
1794
1795     assert(as->arguments != NULL);
1796     amfree(as->arguments);
1797
1798     if (as->reqfd != -1)
1799         aclose(as->reqfd);
1800     if (as->repfd != -1)
1801         aclose(as->repfd);
1802     if (as->errfd != -1) {
1803         process_errfd(as);
1804         aclose(as->errfd);
1805     }
1806
1807     if (as->ev_repfd != NULL)
1808         event_release(as->ev_repfd);
1809     if (as->ev_reptimeout != NULL)
1810         event_release(as->ev_reptimeout);
1811     if (as->ev_errfd != NULL)
1812         event_release(as->ev_errfd);
1813
1814     for (i = 0; i < DATA_FD_COUNT; i++) {
1815         dh = &as->data[i];
1816
1817         aclose(dh->fd_read);
1818         aclose(dh->fd_write);
1819
1820         if (dh->netfd != NULL)
1821             security_stream_close(dh->netfd);
1822
1823         if (dh->ev_read != NULL)
1824             event_release(dh->ev_read);
1825         if (dh->ev_write != NULL)
1826             event_release(dh->ev_write);
1827     }
1828
1829     if (as->security_handle != NULL)
1830         security_close(as->security_handle);
1831
1832     /* try to kill the process; if this fails, then it's already dead and
1833      * likely some of the other zombie cleanup ate its brains, so we don't
1834      * bother to waitpid for it */
1835     assert(as->pid > 0);
1836     pid = waitpid(as->pid, NULL, WNOHANG);
1837     if (pid != as->pid && kill(as->pid, SIGTERM) == 0) {
1838         pid = waitpid(as->pid, NULL, WNOHANG);
1839         count = 5;
1840         while (pid != as->pid && count > 0) {
1841             count--;
1842             sleep(1);
1843             pid = waitpid(as->pid, NULL, WNOHANG);
1844         }
1845         if (pid != as->pid) {
1846             g_debug("Process %d failed to exit", (int)as->pid);
1847         }
1848     }
1849
1850     serviceq = g_slist_remove(serviceq, (gpointer)as);
1851
1852     amfree(as->cmd);
1853     amfree(as->arguments);
1854     amfree(as->repbuf);
1855     amfree(as->rep_pkt.body);
1856     amfree(as);
1857
1858     if(exit_on_qlength == 0 && g_slist_length(serviceq) == 0) {
1859         dbclose();
1860         exit(0);
1861     }
1862 }
1863
1864 /*
1865  * Like 'fullwrite', but does the work in a child process so pipelines
1866  * do not hang.
1867  */
1868 static int
1869 writebuf(
1870     struct active_service *     as,
1871     const void *                bufp,
1872     size_t                      size)
1873 {
1874     pid_t pid;
1875     size_t    writesize;
1876
1877     switch (pid=fork()) {
1878     case -1:
1879         break;
1880
1881     default:
1882         waitpid(pid, NULL, WNOHANG);
1883         return 0;                       /* this is the parent */
1884
1885     case 0:                             /* this is the child */
1886         close(as->repfd);
1887         writesize = full_write(as->reqfd, bufp, size);
1888         exit(writesize != size);
1889         /* NOTREACHED */
1890     }
1891     return -1;
1892 }
1893
1894 static ssize_t
1895 do_sendpkt(
1896     security_handle_t * handle,
1897     pkt_t *             pkt)
1898 {
1899     dbprintf(_("sending %s pkt:\n<<<<<\n%s>>>>>\n"),
1900         pkt_type2str(pkt->type), pkt->body);
1901     if (handle)
1902         return security_sendpkt(handle, pkt);
1903     else
1904         return 1;
1905 }
1906
1907 /*
1908  * Convert a state into a string
1909  */
1910 static const char *
1911 state2str(
1912     state_t     state)
1913 {
1914     static const struct {
1915         state_t state;
1916         const char str[13];
1917     } states[] = {
1918 #define X(state)        { state, stringize(state) }
1919         X(s_sendack),
1920         X(s_repwait),
1921         X(s_processrep),
1922         X(s_sendrep),
1923         X(s_ackwait),
1924 #undef X
1925     };
1926     int i;
1927
1928     for (i = 0; i < (int)(sizeof(states) / sizeof(states[0])); i++)
1929         if (state == states[i].state)
1930             return (states[i].str);
1931     return (_("INVALID STATE"));
1932 }
1933
1934 /*
1935  * Convert an action into a string
1936  */
1937 static const char *
1938 action2str(
1939     action_t    action)
1940 {
1941     static const struct {
1942         action_t action;
1943         const char str[12];
1944     } actions[] = {
1945 #define X(action)       { action, stringize(action) }
1946         X(A_START),
1947         X(A_RECVPKT),
1948         X(A_RECVREP),
1949         X(A_PENDING),
1950         X(A_FINISH),
1951         X(A_CONTINUE),
1952         X(A_SENDNAK),
1953         X(A_TIMEOUT),
1954 #undef X
1955     };
1956     int i;
1957
1958     for (i = 0; i < (int)(sizeof(actions) / sizeof(actions[0])); i++)
1959         if (action == actions[i].action)
1960             return (actions[i].str);
1961     return (_("UNKNOWN ACTION"));
1962 }
1963
1964 static char *
1965 amandad_get_security_conf(
1966     char *      string,
1967     void *      arg)
1968 {
1969     (void)arg;      /* Quiet unused parameter warning */
1970
1971     if (!string || !*string)
1972         return(NULL);
1973
1974     if (strcmp(string, "kencrypt")==0) {
1975         if (amandad_kencrypt == KENCRYPT_YES)
1976             return ("yes");
1977         else
1978             return (NULL);
1979     }
1980     return(NULL);
1981 }
1982