Imported Upstream version 3.3.0
[debian/amanda] / amandad-src / amandad.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1999 University of Maryland at College Park
4  * All Rights Reserved.
5  *
6  * Permission to use, copy, modify, distribute, and sell this software and its
7  * documentation for any purpose is hereby granted without fee, provided that
8  * the above copyright notice appear in all copies and that both that
9  * copyright notice and this permission notice appear in supporting
10  * documentation, and that the name of U.M. not be used in advertising or
11  * publicity pertaining to distribution of the software without specific,
12  * written prior permission.  U.M. makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as is"
14  * without express or implied warranty.
15  *
16  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22  *
23  * Authors: the Amanda Development Team.  Its members are listed in a
24  * file named AUTHORS, in the root directory of this distribution.
25  */
26
27 /*
28  * $Id: amandad.c,v 1.18 2006/08/21 20:17:09 martinea Exp $
29  *
30  * handle client-host side of Amanda network communications, including
31  * security checks, execution of the proper service, and acking the
32  * master side
33  */
34
35 #include "amanda.h"
36 #include "amandad.h"
37 #include "clock.h"
38 #include "event.h"
39 #include "amfeatures.h"
40 #include "packet.h"
41 #include "version.h"
42 #include "security.h"
43 #include "stream.h"
44 #include "util.h"
45 #include "conffile.h"
46
47 #define REP_TIMEOUT     (6*60*60)       /* secs for service to reply */
48 #define ACK_TIMEOUT     10              /* XXX should be configurable */
49 #define STDERR_PIPE (DATA_FD_COUNT + 1)
50
51 #define amandad_debug(i, ...) do {      \
52         if ((i) <= debug_amandad) {     \
53                 dbprintf(__VA_ARGS__);  \
54         }                               \
55 } while (0)
56
57 /*
58  * These are the actions for entering the state machine
59  */
60 typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
61     A_SENDNAK, A_TIMEOUT } action_t;
62
63 /*
64  * This is a state in the state machine.  It is a function pointer to
65  * the function that actually implements the state.
66  */
67 struct active_service;
68 typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
69
70 /* string that we scan for in sendbackup's MESG stream */
71 static const char info_end_str[] = "sendbackup: info end\n";
72 #define INFO_END_LEN (sizeof(info_end_str)-1)
73
74 /* 
75  * Here are the services that we allow.
76  * Must be in the same order as services[].
77  */
78 typedef enum {
79     SERVICE_NOOP,
80     SERVICE_SENDSIZE,
81     SERVICE_SENDBACKUP,
82     SERVICE_SELFCHECK,
83     SERVICE_AMINDEXD,
84     SERVICE_AMIDXTAPED,
85     SERVICE_AMDUMPD
86 } service_t;
87
88 static struct services {
89     char *name;
90     int  active;
91     service_t service;
92 } services[] = {
93    { "noop", 1, SERVICE_NOOP },
94    { "sendsize", 1, SERVICE_SENDSIZE },
95    { "sendbackup", 1, SERVICE_SENDBACKUP },
96    { "selfcheck", 1, SERVICE_SELFCHECK },
97    { "amindexd", 0, SERVICE_AMINDEXD },
98    { "amidxtaped", 0, SERVICE_AMIDXTAPED },
99    { "amdumpd", 0, SERVICE_AMDUMPD }
100 };
101 #define NSERVICES       (int)(sizeof(services) / sizeof(services[0]))
102
103 /*
104  * This structure describes an active running service.
105  *
106  * An active service is something running that we have received
107  * a request for.  This structure holds info on that service, including
108  * file descriptors for data, etc, as well as the security handle
109  * for communications with the amanda server.
110  */
111 struct active_service {
112     service_t service;                  /* service name */
113     char *cmd;                          /* name of command we ran */
114     char *arguments;                    /* arguments we sent it */
115     security_handle_t *security_handle; /* remote server */
116     state_t state;                      /* how far this has progressed */
117     pid_t pid;                          /* pid of subprocess */
118     int send_partial_reply;             /* send PREP packet */
119     int reqfd;                          /* pipe to write requests */
120     int repfd;                          /* pipe to read replies */
121     int errfd;                          /* pipe to read stderr */
122     event_handle_t *ev_repfd;           /* read event handle for repfd */
123     event_handle_t *ev_reptimeout;      /* timeout for rep data */
124     event_handle_t *ev_errfd;           /* read event handle for errfd */
125     pkt_t rep_pkt;                      /* rep packet we're sending out */
126     char *errbuf;                       /* buffer to read the err into */
127     char *repbuf;                       /* buffer to read the rep into */
128     size_t bufsize;                     /* length of repbuf */
129     size_t repbufsize;                  /* length of repbuf */
130     int repretry;                       /* times we'll retry sending the rep */
131     int seen_info_end;                  /* have we seen "sendbackup info end\n"? */
132     char info_end_buf[INFO_END_LEN];    /* last few bytes read, used for scanning for info end */
133
134     /*
135      * General user streams to the process, and their equivalent
136      * network streams.
137      */
138     struct datafd_handle {
139         int fd_read;                    /* pipe to child process */
140         int fd_write;                   /* pipe to child process */
141         event_handle_t *ev_read;        /* it's read event handle */
142         event_handle_t *ev_write;       /* it's write event handle */
143         security_stream_t *netfd;       /* stream to amanda server */
144         struct active_service *as;      /* pointer back to our enclosure */
145     } data[DATA_FD_COUNT];
146     char databuf[NETWORK_BLOCK_BYTES];  /* buffer to relay netfd data in */
147 };
148
149 /*
150  * Queue of outstanding requests that we are running.
151  */
152 GSList *serviceq = NULL;
153
154 static int wait_30s = 1;
155 static int exit_on_qlength = 1;
156 static char *auth = NULL;
157 static kencrypt_type amandad_kencrypt = KENCRYPT_NONE;
158
159 int main(int argc, char **argv);
160
161 static int allocstream(struct active_service *, int);
162 static void exit_check(void *);
163 static void protocol_accept(security_handle_t *, pkt_t *);
164 static void state_machine(struct active_service *, action_t, pkt_t *);
165
166 static action_t s_sendack(struct active_service *, action_t, pkt_t *);
167 static action_t s_repwait(struct active_service *, action_t, pkt_t *);
168 static action_t s_processrep(struct active_service *, action_t, pkt_t *);
169 static action_t s_sendrep(struct active_service *, action_t, pkt_t *);
170 static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
171
172 static void repfd_recv(void *);
173 static void process_errfd(void *cookie);
174 static void errfd_recv(void *);
175 static void timeout_repfd(void *);
176 static void protocol_recv(void *, pkt_t *, security_status_t);
177 static void process_readnetfd(void *);
178 static void process_writenetfd(void *, void *, ssize_t);
179 static struct active_service *service_new(security_handle_t *,
180     const char *, service_t, const char *);
181 static void service_delete(struct active_service *);
182 static int writebuf(struct active_service *, const void *, size_t);
183 static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
184 static char *amandad_get_security_conf (char *, void *);
185
186 static const char *state2str(state_t);
187 static const char *action2str(action_t);
188
189 int
190 main(
191     int         argc,
192     char **     argv)
193 {
194     int i, j;
195     int have_services;
196     int in, out;
197     const security_driver_t *secdrv;
198     int no_exit = 0;
199     char *pgm = "amandad";              /* in case argv[0] is not set */
200 #if defined(USE_REUSEADDR)
201     const int on = 1;
202     int r;
203 #endif
204
205     /*
206      * Configure program for internationalization:
207      *   1) Only set the message locale for now.
208      *   2) Set textdomain for all amanda related programs to "amanda"
209      *      We don't want to be forced to support dozens of message catalogs.
210      */  
211     setlocale(LC_MESSAGES, "C");
212     textdomain("amanda"); 
213
214     safe_fd(-1, 0);
215     safe_cd();
216
217     /*
218      * Nexenta needs the SUN_PERSONALITY env variable to be unset, otherwise
219      * the Sun version of tar in /usr/sun/sbin/tar is called instead.
220      *
221      * On other operating systems this will have no effect.
222      */
223
224 #ifdef HAVE_UNSETENV
225     unsetenv("SUN_PERSONALITY");
226 #endif
227
228     /*
229      * When called via inetd, it is not uncommon to forget to put the
230      * argv[0] value on the config line.  On some systems (e.g. Solaris)
231      * this causes argv and/or argv[0] to be NULL, so we have to be
232      * careful getting our name.
233      */
234     if ((argv == NULL) || (argv[0] == NULL)) {
235             pgm = "amandad";            /* in case argv[0] is not set */
236     } else {
237             pgm = basename(argv[0]);    /* Strip of leading path get debug name */
238     }
239     set_pname(pgm);
240     dbopen(DBG_SUBDIR_AMANDAD);
241
242     if(argv == NULL) {
243         error(_("argv == NULL\n"));
244         /*NOTREACHED*/
245     }
246
247     /* Don't die when child closes pipe */
248     signal(SIGPIPE, SIG_IGN);
249
250     /* Parse the configuration; we'll handle errors later */
251     config_init(CONFIG_INIT_CLIENT, NULL);
252
253     if (geteuid() == 0) {
254         check_running_as(RUNNING_AS_ROOT);
255         initgroups(CLIENT_LOGIN, get_client_gid());
256         setgid(get_client_gid());
257         setegid(get_client_gid());
258         seteuid(get_client_uid());
259     } else {
260         check_running_as(RUNNING_AS_CLIENT_LOGIN);
261     }
262
263     add_amanda_log_handler(amanda_log_stderr);
264     add_amanda_log_handler(amanda_log_syslog);
265
266     /*
267      * ad-hoc argument parsing
268      *
269      * We accept        -auth=[authentication type]
270      *                  -no-exit
271      *                  -tcp=[port]
272      *                  -udp=[port]
273      * We also add a list of services that amandad can launch
274      */
275     secdrv = NULL;
276     in = 0; out = 1;            /* default to stdin/stdout */
277     have_services = 0;
278     for (i = 1; i < argc; i++) {
279         /*
280          * Get a driver for a security type specified after -auth=
281          */
282         if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
283             argv[i] += strlen("-auth=");
284             secdrv = security_getdriver(argv[i]);
285             auth = argv[i];
286             if (secdrv == NULL) {
287                 error(_("no driver for security type '%s'\n"), argv[i]);
288                 /*NOTREACHED*/
289             }
290             if (strcmp(auth, "local") == 0 ||
291                 strcmp(auth, "rsh") == 0 ||
292                 strcmp(auth, "ssh") == 0) {
293                 int i;
294                 for (i=0; i < NSERVICES; i++) {
295                     services[i].active = 1;
296                 }
297             }
298             continue;
299         }
300
301         /*
302          * If -no-exit is specified, always run even after requests have
303          * been satisfied.
304          */
305         else if (strcmp(argv[i], "-no-exit") == 0) {
306             no_exit = 1;
307             continue;
308         }
309
310         /*
311          * Allow us to directly bind to a udp port for debugging.
312          * This may only apply to some security types.
313          */
314         else if (strncmp(argv[i], "-udp=", strlen("-udp=")) == 0) {
315 #ifdef WORKING_IPV6
316             struct sockaddr_in6 sin;
317 #else
318             struct sockaddr_in sin;
319 #endif
320
321             argv[i] += strlen("-udp=");
322 #ifdef WORKING_IPV6
323             in = out = socket(AF_INET6, SOCK_DGRAM, 0);
324 #else
325             in = out = socket(AF_INET, SOCK_DGRAM, 0);
326 #endif
327             if (in < 0) {
328                 error(_("can't create dgram socket: %s\n"), strerror(errno));
329                 /*NOTREACHED*/
330             }
331 #ifdef USE_REUSEADDR
332             r = setsockopt(in, SOL_SOCKET, SO_REUSEADDR,
333                 (void *)&on, (socklen_t_equiv)sizeof(on));
334             if (r < 0) {
335                 dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
336                           strerror(errno));
337             }
338 #endif
339
340 #ifdef WORKING_IPV6
341             sin.sin6_family = (sa_family_t)AF_INET6;
342             sin.sin6_addr = in6addr_any;
343             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
344 #else
345             sin.sin_family = (sa_family_t)AF_INET;
346             sin.sin_addr.s_addr = INADDR_ANY;
347             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
348 #endif
349             if (bind(in, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
350                 error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
351                     strerror(errno));
352                 /*NOTREACHED*/
353             }
354         }
355         /*
356          * Ditto for tcp ports.
357          */
358         else if (strncmp(argv[i], "-tcp=", strlen("-tcp=")) == 0) {
359 #ifdef WORKING_IPV6
360             struct sockaddr_in6 sin;
361 #else
362             struct sockaddr_in sin;
363 #endif
364             int sock;
365             socklen_t_equiv n;
366
367             argv[i] += strlen("-tcp=");
368 #ifdef WORKING_IPV6
369             sock = socket(AF_INET6, SOCK_STREAM, 0);
370 #else
371             sock = socket(AF_INET, SOCK_STREAM, 0);
372 #endif
373             if (sock < 0) {
374                 error(_("can't create tcp socket: %s\n"), strerror(errno));
375                 /*NOTREACHED*/
376             }
377 #ifdef USE_REUSEADDR
378             r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
379                 (void *)&on, (socklen_t_equiv)sizeof(on));
380             if (r < 0) {
381                 dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
382                           strerror(errno));
383             }
384 #endif
385 #ifdef WORKING_IPV6
386             sin.sin6_family = (sa_family_t)AF_INET6;
387             sin.sin6_addr = in6addr_any;
388             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
389 #else
390             sin.sin_family = (sa_family_t)AF_INET;
391             sin.sin_addr.s_addr = INADDR_ANY;
392             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
393 #endif
394             if (bind(sock, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
395                 error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
396                     strerror(errno));
397                 /*NOTREACHED*/
398             }
399             listen(sock, 10);
400             n = (socklen_t_equiv)sizeof(sin);
401             in = out = accept(sock, (struct sockaddr *)&sin, &n);
402         }
403         /*
404          * It must be a service name
405          */
406         else {
407             /* clear all services */
408             if(!have_services) {
409                 for (j = 0; j < (int)NSERVICES; j++)
410                     services[j].active = 0;
411             }
412             have_services = 1;
413
414             if(strcmp(argv[i],"amdump") == 0) {
415                 services[0].active = 1;
416                 services[1].active = 1;
417                 services[2].active = 1;
418                 services[3].active = 1;
419             }
420             else {
421                 for (j = 0; j < (int)NSERVICES; j++)
422                     if (strcmp(services[j].name, argv[i]) == 0)
423                         break;
424                 if (j == (int)NSERVICES) {
425                     dbprintf(_("%s: invalid service\n"), argv[i]);
426                     exit(1);
427                 }
428                 services[j].active = 1;
429             }
430         }
431     }
432
433     /*
434      * If no security type specified, use BSDTCP
435      */
436     if (secdrv == NULL) {
437         secdrv = security_getdriver("BSDTCP");
438         auth = "bsdtcp";
439         if (secdrv == NULL) {
440             error(_("no driver for default security type 'BSDTCP'\n"));
441             /*NOTREACHED*/
442         }
443     }
444
445     if(strcasecmp(auth, "rsh") == 0 ||
446        strcasecmp(auth, "ssh") == 0 ||
447        strcasecmp(auth, "local") == 0 ||
448        strcasecmp(auth, "bsdtcp") == 0) {
449         wait_30s = 0;
450         exit_on_qlength = 1;
451     }
452
453 #ifndef SINGLE_USERID
454     if (geteuid() == 0) {
455         if (strcasecmp(auth, "krb5") != 0) {
456             struct passwd *pwd;
457             /* lookup our local user name */
458             if ((pwd = getpwnam(CLIENT_LOGIN)) == NULL) {
459                 error(_("getpwnam(%s) failed."), CLIENT_LOGIN);
460             }
461
462             if (pwd->pw_uid != 0) {
463                 error(_("'amandad' must be run as user '%s' when using '%s' authentication"),
464                       CLIENT_LOGIN, auth);
465             }
466         }
467     } else {
468         if (strcasecmp(auth, "krb5") == 0) {
469             error(_("'amandad' must be run as user 'root' when using 'krb5' authentication"));
470         }
471     }
472 #endif
473
474     /* initialize */
475
476     startclock();
477
478     dbprintf(_("version %s\n"), VERSION);
479     for (i = 0; version_info[i] != NULL; i++) {
480         dbprintf("    %s", version_info[i]);
481     }
482
483     if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
484         dbprintf(_("WARNING: argv[0] not defined: check inetd.conf\n"));
485     }
486
487     /* krb5 require the euid to be 0 */
488     if (strcasecmp(auth, "krb5") == 0) {
489         seteuid((uid_t)0);
490     }
491
492     /*
493      * Schedule to call protocol_accept() when new security handles
494      * are created on stdin.
495      */
496     security_accept(secdrv, amandad_get_security_conf, in, out, protocol_accept, NULL);
497
498     /*
499      * Schedule an event that will try to exit every 30 seconds if there
500      * are no requests outstanding.
501      */
502     if(wait_30s)
503         (void)event_register((event_id_t)30, EV_TIME, exit_check, &no_exit);
504
505     /*
506      * Call event_loop() with an arg of 0, telling it to block until all
507      * events are completed.
508      */
509     event_loop(0);
510
511     close(in);
512     close(out);
513     dbclose();
514     return(0);
515 }
516
517 /*
518  * This runs periodically and checks to see if we have any active services
519  * still running.  If we don't, then we quit.
520  */
521 static void
522 exit_check(
523     void *      cookie)
524 {
525     int no_exit;
526
527     assert(cookie != NULL);
528     no_exit = *(int *)cookie;
529
530     /*
531      * If things are still running, then don't exit.
532      */
533     if (g_slist_length(serviceq) > 0)
534         return;
535
536     /*
537      * If the caller asked us to never exit, then we're done
538      */
539     if (no_exit)
540         return;
541
542     dbclose();
543     exit(0);
544 }
545
546 /*
547  * Handles new incoming protocol handles.  This is a callback for
548  * security_accept(), which gets called when new handles are detected.
549  */
550 static void
551 protocol_accept(
552     security_handle_t * handle,
553     pkt_t *             pkt)
554 {
555     pkt_t pkt_out;
556     GSList *iter;
557     struct active_service *as;
558     char *pktbody, *tok, *service, *arguments;
559     char *service_path = NULL;
560     GSList *errlist = NULL;
561     int i;
562
563     pkt_out.body = NULL;
564
565     /*
566      * If handle is NULL, then the connection is closed.
567      */
568     if(handle == NULL) {
569         return;
570     }
571
572     /*
573      * If we have errors (not warnings) from the config file, let the remote system
574      * know immediately.  Unfortunately, we only get one ERROR line, so if there
575      * are multiple errors, we just show the first.
576      */
577     if (config_errors(&errlist) >= CFGERR_ERRORS) {
578         GSList *iter = errlist;
579         char *errmsg;
580         gboolean multiple_errors = FALSE;
581
582         if (iter) {
583             errmsg = (char *)iter->data;
584             if (iter->next)
585                 multiple_errors = TRUE;
586         } else {
587             errmsg = "(no error message)";
588         }
589
590         pkt_init(&pkt_out, P_NAK, "ERROR %s%s", errmsg,
591             multiple_errors? _(" (additional errors not displayed)"):"");
592         do_sendpkt(handle, &pkt_out);
593         amfree(pkt_out.body);
594         security_close(handle);
595         return;
596     }
597
598     g_debug("authenticated peer name is '%s'", security_get_authenticated_peer_name(handle));
599
600     /*
601      * If pkt is NULL, then there was a problem with the new connection.
602      */
603     if (pkt == NULL) {
604         dbprintf(_("accept error: %s\n"), security_geterror(handle));
605         pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
606         do_sendpkt(handle, &pkt_out);
607         amfree(pkt_out.body);
608         security_close(handle);
609         return;
610     }
611
612     dbprintf(_("accept recv %s pkt:\n<<<<<\n%s>>>>>\n"),
613         pkt_type2str(pkt->type), pkt->body);
614
615     /*
616      * If this is not a REQ packet, just forget about it.
617      */
618     if (pkt->type != P_REQ) {
619         dbprintf(_("received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n"),
620             pkt_type2str(pkt->type), pkt->body);
621         security_close(handle);
622         return;
623     }
624
625     pktbody = service = arguments = NULL;
626     as = NULL;
627
628     /*
629      * Parse out the service and arguments
630      */
631
632     pktbody = stralloc(pkt->body);
633
634     tok = strtok(pktbody, " ");
635     if (tok == NULL)
636         goto badreq;
637     if (strcmp(tok, "SERVICE") != 0)
638         goto badreq;
639
640     tok = strtok(NULL, " \n");
641     if (tok == NULL)
642         goto badreq;
643     service = stralloc(tok);
644
645     /* we call everything else 'arguments' */
646     tok = strtok(NULL, "");
647     if (tok == NULL)
648         goto badreq;
649     arguments = stralloc(tok);
650
651     /* see if it's one we allow */
652     for (i = 0; i < (int)NSERVICES; i++)
653         if (services[i].active == 1 && strcmp(services[i].name, service) == 0)
654             break;
655     if (i == (int)NSERVICES) {
656         dbprintf(_("%s: invalid service\n"), service);
657         pkt_init(&pkt_out, P_NAK, _("ERROR %s: invalid service, add '%s' as argument to amandad\n"), service, service);
658         goto send_pkt_out;
659     }
660
661     service_path = vstralloc(amlibexecdir, "/", service, NULL);
662     if (access(service_path, X_OK) < 0) {
663         dbprintf(_("can't execute %s: %s\n"), service_path, strerror(errno));
664             pkt_init(&pkt_out, P_NAK,
665                      _("ERROR execute access to \"%s\" denied\n"),
666                      service_path);
667         goto send_pkt_out;
668     }
669
670     /* see if its already running */
671     for (iter = serviceq; iter != NULL; iter = g_slist_next(iter)) {
672         as = (struct active_service *)iter->data;
673             if (strcmp(as->cmd, service_path) == 0 &&
674                 strcmp(as->arguments, arguments) == 0) {
675                     dbprintf(_("%s %s: already running, acking req\n"),
676                         service, arguments);
677                     pkt_init_empty(&pkt_out, P_ACK);
678                     goto send_pkt_out_no_delete;
679             }
680     }
681
682     /*
683      * create a new service instance, and send the arguments down
684      * the request pipe.
685      */
686     dbprintf(_("creating new service: %s\n%s\n"), service, arguments);
687     as = service_new(handle, service_path, services[i].service, arguments);
688     if (writebuf(as, arguments, strlen(arguments)) < 0) {
689         const char *errmsg = strerror(errno);
690         dbprintf(_("error sending arguments to %s: %s\n"), service, errmsg);
691         pkt_init(&pkt_out, P_NAK, _("ERROR error writing arguments to %s: %s\n"),
692             service, errmsg);
693         goto send_pkt_out;
694     }
695     aclose(as->reqfd);
696
697     amfree(pktbody);
698     amfree(service);
699     amfree(service_path);
700     amfree(arguments);
701
702     /*
703      * Move to the sendack state, and start up the state
704      * machine.
705      */
706     as->state = s_sendack;
707     state_machine(as, A_START, NULL);
708     return;
709
710 badreq:
711     pkt_init(&pkt_out, P_NAK, _("ERROR invalid REQ\n"));
712     dbprintf(_("received invalid %s packet:\n<<<<<\n%s>>>>>\n\n"),
713         pkt_type2str(pkt->type), pkt->body);
714
715 send_pkt_out:
716     if(as)
717         service_delete(as);
718 send_pkt_out_no_delete:
719     amfree(pktbody);
720     amfree(service_path);
721     amfree(service);
722     amfree(arguments);
723     do_sendpkt(handle, &pkt_out);
724     security_close(handle);
725     amfree(pkt_out.body);
726 }
727
728 /*
729  * Handles incoming protocol packets.  Routes responses to the proper
730  * running service.
731  */
732 static void
733 state_machine(
734     struct active_service *     as,
735     action_t                    action,
736     pkt_t *                     pkt)
737 {
738     action_t retaction;
739     state_t curstate;
740     pkt_t nak;
741
742     amandad_debug(1, _("state_machine: %p entering\n"), as);
743     for (;;) {
744         curstate = as->state;
745         amandad_debug(1, _("state_machine: %p curstate=%s action=%s\n"), as,
746                           state2str(curstate), action2str(action));
747         retaction = (*curstate)(as, action, pkt);
748         amandad_debug(1, _("state_machine: %p curstate=%s returned %s (nextstate=%s)\n"),
749                           as, state2str(curstate), action2str(retaction),
750                           state2str(as->state));
751
752         switch (retaction) {
753         /*
754          * State has queued up and is now blocking on input.
755          */
756         case A_PENDING:
757             amandad_debug(1, _("state_machine: %p leaving (A_PENDING)\n"), as);
758             return;
759
760         /*
761          * service has switched states.  Loop.
762          */
763         case A_CONTINUE:
764             break;
765
766         /*
767          * state has determined that the packet it received was bogus.
768          * Send a nak, and return.
769          */
770         case A_SENDNAK:
771             dbprintf(_("received unexpected %s packet\n"),
772                 pkt_type2str(pkt->type));
773             dbprintf(_("<<<<<\n%s----\n\n"), pkt->body);
774             pkt_init(&nak, P_NAK, _("ERROR unexpected packet type %s\n"),
775                 pkt_type2str(pkt->type));
776             do_sendpkt(as->security_handle, &nak);
777             amfree(nak.body);
778             security_recvpkt(as->security_handle, protocol_recv, as, -1);
779             amandad_debug(1, _("state_machine: %p leaving (A_SENDNAK)\n"), as);
780             return;
781
782         /*
783          * Service is done.  Remove it and finish.
784          */
785         case A_FINISH:
786             amandad_debug(1, _("state_machine: %p leaving (A_FINISH)\n"), as);
787             service_delete(as);
788             return;
789
790         default:
791             assert(0);
792             break;
793         }
794     }
795     /*NOTREACHED*/
796 }
797
798 /*
799  * This state just sends an ack.  After that, we move to the repwait
800  * state to wait for REP data to arrive from the subprocess.
801  */
802 static action_t
803 s_sendack(
804     struct active_service *     as,
805     action_t                    action,
806     pkt_t *                     pkt)
807 {
808     pkt_t ack;
809
810     (void)action;       /* Quiet unused parameter warning */
811     (void)pkt;          /* Quiet unused parameter warning */
812
813     pkt_init_empty(&ack, P_ACK);
814     if (do_sendpkt(as->security_handle, &ack) < 0) {
815         dbprintf(_("error sending ACK: %s\n"),
816             security_geterror(as->security_handle));
817         amfree(ack.body);
818         return (A_FINISH);
819     }
820     amfree(ack.body);
821
822     /*
823      * move to the repwait state
824      * Setup a listener for data on the reply fd, but also
825      * listen for packets over the wire, as the server may
826      * poll us if we take a long time.
827      * Setup a timeout that will fire if it takes too long to
828      * receive rep data.
829      */
830     as->state = s_repwait;
831     as->ev_repfd = event_register((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
832     as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
833         timeout_repfd, as);
834     as->errbuf = NULL;
835     as->ev_errfd = event_register((event_id_t)as->errfd, EV_READFD, errfd_recv, as);
836     security_recvpkt(as->security_handle, protocol_recv, as, -1);
837     return (A_PENDING);
838 }
839
840 /*
841  * This is the repwait state.  We have responded to the initial REQ with
842  * an ACK, and we are now waiting for the process we spawned to pass us 
843  * data to send in a REP.
844  */
845 static action_t
846 s_repwait(
847     struct active_service *     as,
848     action_t                    action,
849     pkt_t *                     pkt)
850 {
851     ssize_t   n;
852     char     *repbuf_temp;
853     char     *what;
854     char     *msg;
855     int       code = 0;
856     int       pid;
857     amwait_t  retstat;
858
859     /*
860      * We normally shouldn't receive any packets while waiting
861      * for our REP data, but in some cases we do.
862      */
863     if (action == A_RECVPKT) {
864         assert(pkt != NULL);
865         /*
866          * Another req for something that's running.  Just send an ACK
867          * and go back and wait for more data.
868          */
869         if (pkt->type == P_REQ) {
870             dbprintf(_("received dup P_REQ packet, ACKing it\n"));
871             amfree(as->rep_pkt.body);
872             pkt_init_empty(&as->rep_pkt, P_ACK);
873             do_sendpkt(as->security_handle, &as->rep_pkt);
874             security_recvpkt(as->security_handle, protocol_recv, as, -1);
875             return (A_PENDING);
876         }
877         /* something unexpected.  Nak it */
878         return (A_SENDNAK);
879     }
880
881     if (action == A_TIMEOUT) {
882         amfree(as->rep_pkt.body);
883         pkt_init(&as->rep_pkt, P_NAK, _("ERROR timeout on reply pipe\n"));
884         dbprintf(_("%s timed out waiting for REP data\n"), as->cmd);
885         do_sendpkt(as->security_handle, &as->rep_pkt);
886         return (A_FINISH);
887     }
888
889     assert(action == A_RECVREP);
890     if(as->bufsize == 0) {
891         as->bufsize = NETWORK_BLOCK_BYTES;
892         as->repbuf = alloc(as->bufsize);
893     }
894
895     do {
896         n = read(as->repfd, as->repbuf + as->repbufsize,
897                  as->bufsize - as->repbufsize - 1);
898     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
899     if (n < 0) {
900         const char *errstr = strerror(errno);
901         dbprintf(_("read error on reply pipe: %s\n"), errstr);
902         amfree(as->rep_pkt.body);
903         pkt_init(&as->rep_pkt, P_NAK, _("ERROR read error on reply pipe: %s\n"),
904                  errstr);
905         do_sendpkt(as->security_handle, &as->rep_pkt);
906         return (A_FINISH);
907     }
908
909     /* If end of service, wait for process status */
910     if (n == 0) {
911         pid = waitpid(as->pid, &retstat, WNOHANG);
912         if (as->service  == SERVICE_NOOP ||
913             as->service  == SERVICE_SENDSIZE ||
914             as->service  == SERVICE_SELFCHECK) {
915             int t = 0;
916             while (t<5 && pid == 0) {
917                 sleep(1);
918                 t++;
919                 pid = waitpid(as->pid, &retstat, WNOHANG);
920             }
921         }
922
923         process_errfd(as);
924
925         if (pid == 0)
926             pid = waitpid(as->pid, &retstat, WNOHANG);
927
928         if (pid > 0) {
929             what = NULL;
930             if (! WIFEXITED(retstat)) {
931                 what = _("signal");
932                 code = WTERMSIG(retstat);
933             } else if (WEXITSTATUS(retstat) != 0) {
934                 what = _("code");
935                 code = WEXITSTATUS(retstat);
936             }
937             if (what) {
938                 dbprintf(_("service %s failed: pid %u exited with %s %d\n"),
939                          (as->cmd)?as->cmd:_("??UNKONWN??"),
940                          (unsigned)as->pid,
941                          what, code);
942                 msg = vstrallocf(
943                      _("ERROR service %s failed: pid %u exited with %s %d\n"),
944                      (as->cmd)?as->cmd:_("??UNKONWN??"), (unsigned)as->pid,
945                      what, code);
946                 if (as->repbufsize + strlen(msg) >= (as->bufsize - 1)) {
947                         as->bufsize *= 2;
948                         repbuf_temp = alloc(as->bufsize);
949                         memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
950                         amfree(as->repbuf);
951                         as->repbuf = repbuf_temp;
952                 }
953                 strcpy(as->repbuf + as->repbufsize, msg);
954                 as->repbufsize += strlen(msg);
955                 amfree(msg);
956             }
957         }
958     }
959
960     /*
961      * If we got some data, go back and wait for more, or EOF.  Nul terminate
962      * the buffer first.
963      */
964     as->repbuf[n + as->repbufsize] = '\0';
965     if (n > 0) {
966         as->repbufsize += n;
967         if(as->repbufsize >= (as->bufsize - 1)) {
968             as->bufsize *= 2;
969             repbuf_temp = alloc(as->bufsize);
970             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
971             amfree(as->repbuf);
972             as->repbuf = repbuf_temp;
973         }
974         else if(as->send_partial_reply) {
975             amfree(as->rep_pkt.body);
976             pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
977             do_sendpkt(as->security_handle, &as->rep_pkt);
978             amfree(as->rep_pkt.body);
979             pkt_init_empty(&as->rep_pkt, P_REP);
980         }
981  
982         return (A_PENDING);
983     }
984
985     /*
986      * If we got 0, then we hit EOF.  Process the data and release
987      * the timeout.
988      */
989     assert(n == 0);
990
991     assert(as->ev_repfd != NULL);
992     event_release(as->ev_repfd);
993     as->ev_repfd = NULL;
994
995     assert(as->ev_reptimeout != NULL);
996     event_release(as->ev_reptimeout);
997     as->ev_reptimeout = NULL;
998
999     as->state = s_processrep;
1000     aclose(as->repfd);
1001     return (A_CONTINUE);
1002 }
1003
1004 /*
1005  * After we have read in all of the rep data, we process it and send
1006  * it out as a REP packet.
1007  */
1008 static action_t
1009 s_processrep(
1010     struct active_service *     as,
1011     action_t                    action,
1012     pkt_t *                     pkt)
1013 {
1014     char *tok, *repbuf;
1015
1016     (void)action;       /* Quiet unused parameter warning */
1017     (void)pkt;          /* Quiet unused parameter warning */
1018
1019     /*
1020      * Copy the rep lines into the outgoing packet.
1021      *
1022      * If this line is a CONNECT, translate it
1023      * Format is "CONNECT <tag> <handle> <tag> <handle> etc...
1024      * Example:
1025      *
1026      *  CONNECT DATA 4 MESG 5 INDEX 6
1027      *
1028      * The tags are arbitrary.  The handles are in the DATA_FD pool.
1029      * We need to map these to security streams and pass them back
1030      * to the amanda server.  If the handle is -1, then we don't map.
1031      */
1032     if (strncmp_const(as->repbuf,"KENCRYPT\n") == 0) {
1033         amandad_kencrypt = KENCRYPT_WILL_DO;
1034         repbuf = stralloc(as->repbuf + 9);
1035     } else {
1036         repbuf = stralloc(as->repbuf);
1037     }
1038     amfree(as->rep_pkt.body);
1039     pkt_init_empty(&as->rep_pkt, P_REP);
1040     tok = strtok(repbuf, " ");
1041     if (tok == NULL)
1042         goto error;
1043     if (strcmp(tok, "CONNECT") == 0) {
1044         char *line, *nextbuf;
1045
1046         /* Save the entire line */
1047         line = strtok(NULL, "\n");
1048         /* Save the buf following the line */
1049         nextbuf = strtok(NULL, "");
1050
1051         if (line == NULL || nextbuf == NULL)
1052             goto error;
1053
1054         pkt_cat(&as->rep_pkt, "CONNECT");
1055
1056         /* loop over the id/handle pairs */
1057         for (;;) {
1058             /* id */
1059             tok = strtok(line, " ");
1060             line = NULL;        /* keep working from line */
1061             if (tok == NULL)
1062                 break;
1063             pkt_cat(&as->rep_pkt, " %s", tok);
1064
1065             /* handle */
1066             tok = strtok(NULL, " \n");
1067             if (tok == NULL)
1068                 goto error;
1069             /* convert the handle into something the server can process */
1070             pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
1071         }
1072         pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
1073     } else {
1074 error:
1075         pkt_cat(&as->rep_pkt, "%s", as->repbuf);
1076     }
1077
1078     /*
1079      * We've setup our REP packet in as->rep_pkt.  Now move to the transmission
1080      * state.
1081      */
1082     as->state = s_sendrep;
1083     as->repretry = getconf_int(CNF_REP_TRIES);
1084     amfree(repbuf);
1085     return (A_CONTINUE);
1086 }
1087
1088 /*
1089  * This is the state where we send the REP we just collected from our child.
1090  */
1091 static action_t
1092 s_sendrep(
1093     struct active_service *     as,
1094     action_t                    action,
1095     pkt_t *                     pkt)
1096 {
1097     (void)action;       /* Quiet unused parameter warning */
1098     (void)pkt;          /* Quiet unused parameter warning */
1099
1100     /*
1101      * Transmit it and move to the ack state.
1102      */
1103     do_sendpkt(as->security_handle, &as->rep_pkt);
1104     security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
1105     as->state = s_ackwait;
1106     return (A_PENDING);
1107 }
1108
1109 /*
1110  * This is the state in which we wait for the server to ACK the REP
1111  * we just sent it.
1112  */
1113 static action_t
1114 s_ackwait(
1115     struct active_service *     as,
1116     action_t                    action,
1117     pkt_t *                     pkt)
1118 {
1119     struct datafd_handle *dh;
1120     int npipes;
1121
1122     /*
1123      * If we got a timeout, try again, but eventually give up.
1124      */
1125     if (action == A_TIMEOUT) {
1126         if (--as->repretry > 0) {
1127             as->state = s_sendrep;
1128             return (A_CONTINUE);
1129         }
1130         dbprintf(_("timeout waiting for ACK for our REP\n"));
1131         return (A_FINISH);
1132     }
1133     amandad_debug(1, _("received ACK, now opening streams\n"));
1134
1135     assert(action == A_RECVPKT);
1136
1137     if (pkt->type == P_REQ) {
1138         dbprintf(_("received dup P_REQ packet, resending REP\n"));
1139         as->state = s_sendrep;
1140         return (A_CONTINUE);
1141     }
1142
1143     if (pkt->type != P_ACK)
1144         return (A_SENDNAK);
1145
1146     if (amandad_kencrypt == KENCRYPT_WILL_DO) {
1147         amandad_kencrypt = KENCRYPT_YES;
1148     }
1149
1150     /*
1151      * Got the ack, now open the pipes
1152      */
1153     for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1154         if (dh->netfd == NULL)
1155             continue;
1156         dbprintf("opening security stream for fd %d\n", (int)(dh - as->data) + DATA_FD_OFFSET);
1157         if (security_stream_accept(dh->netfd) < 0) {
1158             dbprintf(_("stream %td accept failed: %s\n"),
1159                 dh - &as->data[0], security_geterror(as->security_handle));
1160             security_stream_close(dh->netfd);
1161             dh->netfd = NULL;
1162             continue;
1163         }
1164
1165         /* setup an event for reads from it.  As a special case, don't start
1166          * listening on as->data[0] until we read some data on another fd, if
1167          * the service is sendbackup.  This ensures that we send a MESG or 
1168          * INDEX token before any DATA tokens, as dumper assumes. This is a
1169          * hack, if that wasn't already obvious! */
1170         if (dh != &as->data[0] || as->service != SERVICE_SENDBACKUP) {
1171             dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1172                                          process_readnetfd, dh);
1173         } else {
1174             amandad_debug(1, "Skipping registration of sendbackup's data FD\n");
1175         }
1176
1177         security_stream_read(dh->netfd, process_writenetfd, dh);
1178
1179     }
1180
1181     /*
1182      * Pipes are open, so auth them.  Count them at the same time.
1183      */
1184     for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1185         if (dh->netfd == NULL)
1186             continue;
1187         if (security_stream_auth(dh->netfd) < 0) {
1188             security_stream_close(dh->netfd);
1189             dh->netfd = NULL;
1190             event_release(dh->ev_read);
1191             event_release(dh->ev_write);
1192             dh->ev_read = NULL;
1193             dh->ev_write = NULL;
1194         } else {
1195             npipes++;
1196         }
1197     }
1198
1199     /*
1200      * If no pipes are open, then we're done.  Otherwise, just start running.
1201      * The event handlers on all of the pipes will take it from here.
1202      */
1203     amandad_debug(1, _("at end of s_ackwait, npipes is %d\n"), npipes);
1204     if (npipes == 0)
1205         return (A_FINISH);
1206     else {
1207         security_close(as->security_handle);
1208         as->security_handle = NULL;
1209         return (A_PENDING);
1210     }
1211 }
1212
1213 /*
1214  * Called when a repfd has received data
1215  */
1216 static void
1217 repfd_recv(
1218     void *      cookie)
1219 {
1220     struct active_service *as = cookie;
1221
1222     assert(as != NULL);
1223     assert(as->ev_repfd != NULL);
1224
1225     state_machine(as, A_RECVREP, NULL);
1226 }
1227
1228 static void
1229 process_errfd(
1230     void *cookie)
1231 {
1232     struct active_service *as = cookie;
1233
1234     /* Process errfd before sending the REP packet */
1235     if (as->ev_errfd) {
1236         SELECT_ARG_TYPE readset;
1237         struct timeval  tv;
1238         int             nfound;
1239
1240         memset(&tv, 0, SIZEOF(tv));
1241         FD_ZERO(&readset);
1242         FD_SET(as->errfd, &readset);
1243         nfound = select(as->errfd+1, &readset, NULL, NULL, &tv);
1244         if (nfound && FD_ISSET(as->errfd, &readset)) {
1245             errfd_recv(as);
1246         }
1247     }
1248 }
1249
1250 /*
1251  * Called when a errfd has received data
1252  */
1253 static void
1254 errfd_recv(
1255     void *      cookie)
1256 {
1257     struct active_service *as = cookie;
1258     char  buf[32769];
1259     int   n;
1260     char *r;
1261
1262     assert(as != NULL);
1263     assert(as->ev_errfd != NULL);
1264
1265     n = read(as->errfd, &buf, 32768);
1266     /* merge buffer */
1267     if (n > 0) {
1268         /* Terminate it with '\0' */
1269         buf[n+1] = '\0';
1270
1271         if (as->errbuf) {
1272             as->errbuf = vstrextend(&as->errbuf, buf, NULL);
1273         } else {
1274             as->errbuf = stralloc(buf);
1275         }
1276     } else if (n == 0) {
1277         event_release(as->ev_errfd);
1278         as->ev_errfd = NULL;
1279     } else { /* n < 0 */
1280         event_release(as->ev_errfd);
1281         as->ev_errfd = NULL;
1282         g_snprintf(buf, 32768,
1283                    "error reading stderr or service: %s\n", strerror(errno));
1284     }
1285
1286     /* for each line terminate by '\n' */
1287     while (as->errbuf != NULL  && (r = strchr(as->errbuf, '\n')) != NULL) {
1288         char *s;
1289
1290         *r = '\0';
1291         s = vstrallocf("ERROR service %s: %s\n",
1292                        services[as->service].name, as->errbuf);
1293
1294         /* Add to repbuf, error message will be in the REP packet if it
1295          * is not already sent
1296          */
1297         n = strlen(s);
1298         if (as->bufsize == 0) {
1299             as->bufsize = NETWORK_BLOCK_BYTES;
1300             as->repbuf = alloc(as->bufsize);
1301         }
1302         while (as->bufsize < as->repbufsize + n) {
1303             char *repbuf_temp;
1304             as->bufsize *= 2;
1305             repbuf_temp = alloc(as->bufsize);
1306             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
1307             amfree(as->repbuf);
1308             as->repbuf = repbuf_temp;
1309         }
1310         memcpy(as->repbuf + as->repbufsize, s, n);
1311         as->repbufsize += n;
1312
1313         dbprintf("%s", s);
1314
1315         /* remove first line from buffer */
1316         r++;
1317         s = stralloc(r);
1318         amfree(as->errbuf);
1319         as->errbuf = s;
1320     }
1321 }
1322
1323 /*
1324  * Called when a repfd has timed out
1325  */
1326 static void
1327 timeout_repfd(
1328     void *      cookie)
1329 {
1330     struct active_service *as = cookie;
1331
1332     assert(as != NULL);
1333     assert(as->ev_reptimeout != NULL);
1334
1335     state_machine(as, A_TIMEOUT, NULL);
1336 }
1337
1338 /*
1339  * Called when a handle has received data
1340  */
1341 static void
1342 protocol_recv(
1343     void *              cookie,
1344     pkt_t *             pkt,
1345     security_status_t   status)
1346 {
1347     struct active_service *as = cookie;
1348
1349     assert(as != NULL);
1350
1351     switch (status) {
1352     case S_OK:
1353         dbprintf(_("received %s pkt:\n<<<<<\n%s>>>>>\n"),
1354             pkt_type2str(pkt->type), pkt->body);
1355         state_machine(as, A_RECVPKT, pkt);
1356         break;
1357     case S_TIMEOUT:
1358         dbprintf(_("timeout\n"));
1359         state_machine(as, A_TIMEOUT, NULL);
1360         break;
1361     case S_ERROR:
1362         dbprintf(_("receive error: %s\n"),
1363             security_geterror(as->security_handle));
1364         break;
1365     }
1366 }
1367
1368 /*
1369  * This is a generic relay function that just reads data from one of
1370  * the process's pipes and passes it up the equivalent security_stream_t
1371  */
1372 static void
1373 process_readnetfd(
1374     void *      cookie)
1375 {
1376     pkt_t nak;
1377     struct datafd_handle *dh = cookie;
1378     struct active_service *as = dh->as;
1379     ssize_t n;
1380
1381     nak.body = NULL;
1382
1383     do {
1384         n = read(dh->fd_read, as->databuf, SIZEOF(as->databuf));
1385     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
1386
1387     /*
1388      * Process has died.
1389      */
1390     if (n < 0) {
1391         pkt_init(&nak, P_NAK, _("A ERROR data descriptor %d broken: %s\n"),
1392             dh->fd_read, strerror(errno));
1393         goto sendnak;
1394     }
1395     /*
1396      * Process has closed the pipe.  Just remove this event handler.
1397      * If all pipes are closed, shut down this service.
1398      */
1399     if (n == 0) {
1400         event_release(dh->ev_read);
1401         dh->ev_read = NULL;
1402         if(dh->ev_write == NULL) {
1403             security_stream_close(dh->netfd);
1404             dh->netfd = NULL;
1405         }
1406         for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1407             if (dh->netfd != NULL)
1408                 return;
1409         }
1410         service_delete(as);
1411         return;
1412     }
1413
1414     /* Handle the special case of recognizing "sendbackup info end"
1415      * from sendbackup's MESG fd */
1416     if (as->service == SERVICE_SENDBACKUP && !as->seen_info_end && dh == &as->data[1]) {
1417         /* make a buffer containing the combined data from info_end_buf
1418          * and what we've read this time, and search it for info_end_strj
1419          * This includes a NULL byte for strstr's sanity. */
1420         char *combined_buf = malloc(INFO_END_LEN + n + 1);
1421         memcpy(combined_buf, as->info_end_buf, INFO_END_LEN);
1422         memcpy(combined_buf+INFO_END_LEN, as->databuf, n);
1423         combined_buf[INFO_END_LEN+n] = '\0';
1424
1425         as->seen_info_end = (strstr(combined_buf, info_end_str) != NULL);
1426
1427         /* fill info_end_buf from the tail end of combined_buf */
1428         memcpy(as->info_end_buf, combined_buf + n, INFO_END_LEN);
1429         amfree(combined_buf);
1430
1431         /* if we did see info_end_str, start reading the data fd (fd 0) */
1432         if (as->seen_info_end) {
1433             struct datafd_handle *dh = &as->data[0];
1434             amandad_debug(1, "Opening datafd to sendbackup (delayed until sendbackup sent header info)\n");
1435             dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1436                                          process_readnetfd, dh);
1437         } else {
1438             amandad_debug(1, "sendbackup header info still not complete\n");
1439         }
1440     }
1441
1442     if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
1443         /* stream has croaked */
1444         pkt_init(&nak, P_NAK, _("ERROR write error on stream %d: %s\n"),
1445             security_stream_id(dh->netfd),
1446             security_stream_geterror(dh->netfd));
1447         goto sendnak;
1448     }
1449     return;
1450
1451 sendnak:
1452     do_sendpkt(as->security_handle, &nak);
1453     service_delete(as);
1454     amfree(nak.body);
1455 }
1456
1457 /*
1458  * This is a generic relay function that just read data from one of
1459  * the security_stream_t and passes it up the equivalent process's pipes
1460  */
1461 static void
1462 process_writenetfd(
1463     void *      cookie,
1464     void *      buf,
1465     ssize_t     size)
1466 {
1467     struct datafd_handle *dh;
1468
1469     assert(cookie != NULL);
1470     dh = cookie;
1471
1472     if (dh->fd_write <= 0) {
1473         dbprintf(_("process_writenetfd: dh->fd_write <= 0\n"));
1474     } else if (size > 0) {
1475         full_write(dh->fd_write, buf, (size_t)size);
1476         security_stream_read(dh->netfd, process_writenetfd, dh);
1477     }
1478     else {
1479         aclose(dh->fd_write);
1480     }
1481 }
1482
1483
1484 /*
1485  * Convert a local stream handle (DATA_FD...) into something that
1486  * can be sent to the amanda server.
1487  *
1488  * Returns a number that should be sent to the server in the REP packet.
1489  */
1490 static int
1491 allocstream(
1492     struct active_service *     as,
1493     int                         handle)
1494 {
1495     struct datafd_handle *dh;
1496
1497     /* note that handle is in the range DATA_FD_OFFSET to DATA_FD_COUNT, but
1498      * it is NOT a file descriptor! */
1499
1500     /* if the handle is -1, then we don't bother */
1501     if (handle < 0)
1502         return (-1);
1503
1504     /* make sure the handle's kosher */
1505     if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
1506         return (-1);
1507
1508     /* get a pointer into our handle array */
1509     dh = &as->data[handle - DATA_FD_OFFSET];
1510
1511     /* make sure we're not already using the net handle */
1512     if (dh->netfd != NULL)
1513         return (-1);
1514
1515     /* allocate a stream from the security layer and return */
1516     dh->netfd = security_stream_server(as->security_handle);
1517     if (dh->netfd == NULL) {
1518         dbprintf(_("couldn't open stream to server: %s\n"),
1519             security_geterror(as->security_handle));
1520         return (-1);
1521     }
1522
1523     /*
1524      * convert the stream into a numeric id that can be sent to the
1525      * remote end.
1526      */
1527     return (security_stream_id(dh->netfd));
1528 }
1529
1530 /*
1531  * Create a new service instance
1532  */
1533 static struct active_service *
1534 service_new(
1535     security_handle_t * security_handle,
1536     const char *        cmd,
1537     service_t           service,
1538     const char *        arguments)
1539 {
1540     int i;
1541     int data_read[DATA_FD_COUNT + 2][2];
1542     int data_write[DATA_FD_COUNT + 2][2];
1543     struct active_service *as;
1544     pid_t pid;
1545     int newfd;
1546     char *peer_name;
1547     char *amanda_remote_host_env[2];
1548
1549     assert(security_handle != NULL);
1550     assert(cmd != NULL);
1551     assert(arguments != NULL);
1552
1553     /* a plethora of pipes */
1554     /* data_read[0]                : stdin
1555      * data_write[0]               : stdout
1556      * data_read[1], data_write[1] : first  stream
1557      * data_read[2], data_write[2] : second stream
1558      * data_read[3], data_write[3] : third stream
1559      * data_write[4]               : stderr
1560      */
1561     for (i = 0; i < DATA_FD_COUNT + 1; i++) {
1562         if (pipe(data_read[i]) < 0) {
1563             error(_("pipe: %s\n"), strerror(errno));
1564             /*NOTREACHED*/
1565         }
1566         if (pipe(data_write[i]) < 0) {
1567             error(_("pipe: %s\n"), strerror(errno));
1568             /*NOTREACHED*/
1569         }
1570     }
1571     if (pipe(data_write[STDERR_PIPE]) < 0) {
1572         error(_("pipe: %s\n"), strerror(errno));
1573         /*NOTREACHED*/
1574     }
1575
1576     switch(pid = fork()) {
1577     case -1:
1578         error(_("could not fork service %s: %s\n"), cmd, strerror(errno));
1579         /*NOTREACHED*/
1580     default:
1581         /*
1582          * The parent.  Close the far ends of our pipes and return.
1583          */
1584         as = g_new0(struct active_service, 1);
1585         as->cmd = stralloc(cmd);
1586         as->arguments = stralloc(arguments);
1587         as->security_handle = security_handle;
1588         as->state = NULL;
1589         as->service = service;
1590         as->pid = pid;
1591         as->send_partial_reply = 0;
1592         as->seen_info_end = FALSE;
1593         /* fill in info_end_buf with non-null characters */
1594         memset(as->info_end_buf, '-', sizeof(as->info_end_buf));
1595         if(service == SERVICE_SENDSIZE) {
1596             g_option_t *g_options;
1597             char *option_str, *p;
1598
1599             option_str = stralloc(as->arguments+8);
1600             p = strchr(option_str,'\n');
1601             if(p) *p = '\0';
1602
1603             g_options = parse_g_options(option_str, 1);
1604             if(am_has_feature(g_options->features, fe_partial_estimate)) {
1605                 as->send_partial_reply = 1;
1606             }
1607             free_g_options(g_options);
1608             amfree(option_str);
1609         }
1610
1611         /* write to the request pipe */
1612         aclose(data_read[0][0]);
1613         as->reqfd = data_read[0][1];
1614
1615         /*
1616          * read from the reply pipe
1617          */
1618         as->repfd = data_write[0][0];
1619         aclose(data_write[0][1]);
1620         as->ev_repfd = NULL;
1621         as->repbuf = NULL;
1622         as->repbufsize = 0;
1623         as->bufsize = 0;
1624         as->repretry = 0;
1625         as->rep_pkt.body = NULL;
1626
1627         /*
1628          * read from the stderr pipe
1629          */
1630         as->errfd = data_write[STDERR_PIPE][0];
1631         aclose(data_write[STDERR_PIPE][1]);
1632         as->ev_errfd = NULL;
1633         as->errbuf = NULL;
1634
1635         /*
1636          * read from the rest of the general-use pipes
1637          * (netfds are opened as the client requests them)
1638          */
1639         for (i = 0; i < DATA_FD_COUNT; i++) {
1640             aclose(data_read[i + 1][1]);
1641             aclose(data_write[i + 1][0]);
1642             as->data[i].fd_read = data_read[i + 1][0];
1643             as->data[i].fd_write = data_write[i + 1][1];
1644             as->data[i].ev_read = NULL;
1645             as->data[i].ev_write = NULL;
1646             as->data[i].netfd = NULL;
1647             as->data[i].as = as;
1648         }
1649
1650         /* add it to the service queue */
1651         /* increment the active service count */
1652         serviceq = g_slist_append(serviceq, (gpointer)as);
1653
1654         return (as);
1655     case 0:
1656         /*
1657          * The child.  Put our pipes in their advertised locations
1658          * and start up.
1659          */
1660
1661         /* set up the AMANDA_AUTHENTICATED_PEER env var so child services
1662          * can use it to authenticate */
1663         peer_name = security_get_authenticated_peer_name(security_handle);
1664         amanda_remote_host_env[0] = NULL;
1665         amanda_remote_host_env[1] = NULL;
1666         if (*peer_name) {
1667             amanda_remote_host_env[0] =
1668                 g_strdup_printf("AMANDA_AUTHENTICATED_PEER=%s", peer_name);
1669         }
1670
1671         /*
1672          * The data stream is stdin in the new process
1673          */
1674         if (dup2(data_read[0][0], 0) < 0) {
1675             error(_("dup %d to %d failed: %s\n"), data_read[0][0], 0,
1676                 strerror(errno));
1677             /*NOTREACHED*/
1678         }
1679         aclose(data_read[0][0]);
1680         aclose(data_read[0][1]);
1681
1682         /*
1683          * The reply stream is stdout
1684          */
1685         if (dup2(data_write[0][1], 1) < 0) {
1686             error(_("dup %d to %d failed: %s\n"), data_write[0][1], 1,
1687                 strerror(errno));
1688         }
1689         aclose(data_write[0][0]);
1690         aclose(data_write[0][1]);
1691
1692         for (i = 0; i < DATA_FD_COUNT; i++) {
1693             aclose(data_read[i + 1][0]);
1694             aclose(data_write[i + 1][1]);
1695         }
1696
1697         /*
1698          *  Make sure they are not open in the range DATA_FD_OFFSET to
1699          *      DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1
1700          */
1701         for (i = 0; i < DATA_FD_COUNT; i++) {
1702             while(data_read[i + 1][1] >= DATA_FD_OFFSET &&
1703                   data_read[i + 1][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1704                 newfd = dup(data_read[i + 1][1]);
1705                 if(newfd == -1)
1706                     error(_("Can't dup out off DATA_FD range"));
1707                 data_read[i + 1][1] = newfd;
1708             }
1709             while(data_write[i + 1][0] >= DATA_FD_OFFSET &&
1710                   data_write[i + 1][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1711                 newfd = dup(data_write[i + 1][0]);
1712                 if(newfd == -1)
1713                     error(_("Can't dup out off DATA_FD range"));
1714                 data_write[i + 1][0] = newfd;
1715             }
1716         }
1717         while(data_write[4][0] >= DATA_FD_OFFSET &&
1718               data_write[4][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1719             newfd = dup(data_write[4][0]);
1720             if (newfd == -1)
1721                 error(_("Can't dup out off DATA_FD range"));
1722             data_write[4][0] = newfd;
1723         }
1724         while(data_write[4][1] >= DATA_FD_OFFSET &&
1725               data_write[4][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1726             newfd = dup(data_write[4][1]);
1727             if (newfd == -1)
1728                 error(_("Can't dup out off DATA_FD range"));
1729             data_write[4][1] = newfd;
1730         }
1731
1732         for (i = 0; i < DATA_FD_COUNT*2; i++)
1733             close(DATA_FD_OFFSET + i);
1734
1735         /*
1736          * The rest start at the offset defined in amandad.h, and continue
1737          * through the internal defined.
1738          */
1739         for (i = 0; i < DATA_FD_COUNT; i++) {
1740             if (dup2(data_read[i + 1][1], i*2 + DATA_FD_OFFSET) < 0) {
1741                 error(_("dup %d to %d failed: %s\n"), data_read[i + 1][1],
1742                     i + DATA_FD_OFFSET, strerror(errno));
1743             }
1744             aclose(data_read[i + 1][1]);
1745
1746             if (dup2(data_write[i + 1][0], i*2 + 1 + DATA_FD_OFFSET) < 0) {
1747                 error(_("dup %d to %d failed: %s\n"), data_write[i + 1][0],
1748                     i + DATA_FD_OFFSET, strerror(errno));
1749             }
1750             aclose(data_write[i + 1][0]);
1751         }
1752
1753         /* close all unneeded fd */
1754         close(STDERR_FILENO);
1755         dup2(data_write[STDERR_PIPE][1], 2);
1756         aclose(data_write[STDERR_PIPE][0]);
1757         aclose(data_write[STDERR_PIPE][1]);
1758         safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
1759
1760         execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env_full(amanda_remote_host_env));
1761         error(_("could not exec service %s: %s\n"), cmd, strerror(errno));
1762         /*NOTREACHED*/
1763     }
1764     return NULL;
1765 }
1766
1767 /*
1768  * Unallocate a service instance
1769  */
1770 static void
1771 service_delete(
1772     struct active_service *     as)
1773 {
1774     int i;
1775     int   count;
1776     pid_t pid;
1777     struct datafd_handle *dh;
1778
1779     amandad_debug(1, _("closing service: %s\n"),
1780                       (as->cmd)?as->cmd:_("??UNKONWN??"));
1781
1782     assert(as != NULL);
1783
1784     assert(as->cmd != NULL);
1785     amfree(as->cmd);
1786
1787     assert(as->arguments != NULL);
1788     amfree(as->arguments);
1789
1790     if (as->reqfd != -1)
1791         aclose(as->reqfd);
1792     if (as->repfd != -1)
1793         aclose(as->repfd);
1794     if (as->errfd != -1) {
1795         process_errfd(as);
1796         aclose(as->errfd);
1797     }
1798
1799     if (as->ev_repfd != NULL)
1800         event_release(as->ev_repfd);
1801     if (as->ev_reptimeout != NULL)
1802         event_release(as->ev_reptimeout);
1803     if (as->ev_errfd != NULL)
1804         event_release(as->ev_errfd);
1805
1806     for (i = 0; i < DATA_FD_COUNT; i++) {
1807         dh = &as->data[i];
1808
1809         aclose(dh->fd_read);
1810         aclose(dh->fd_write);
1811
1812         if (dh->netfd != NULL)
1813             security_stream_close(dh->netfd);
1814
1815         if (dh->ev_read != NULL)
1816             event_release(dh->ev_read);
1817         if (dh->ev_write != NULL)
1818             event_release(dh->ev_write);
1819     }
1820
1821     if (as->security_handle != NULL)
1822         security_close(as->security_handle);
1823
1824     /* try to kill the process; if this fails, then it's already dead and
1825      * likely some of the other zombie cleanup ate its brains, so we don't
1826      * bother to waitpid for it */
1827     assert(as->pid > 0);
1828     pid = waitpid(as->pid, NULL, WNOHANG);
1829     if (pid != as->pid && kill(as->pid, SIGTERM) == 0) {
1830         pid = waitpid(as->pid, NULL, WNOHANG);
1831         count = 5;
1832         while (pid != as->pid && count > 0) {
1833             count--;
1834             sleep(1);
1835             pid = waitpid(as->pid, NULL, WNOHANG);
1836         }
1837         if (pid != as->pid) {
1838             g_debug("Process %d failed to exit", (int)as->pid);
1839         }
1840     }
1841
1842     serviceq = g_slist_remove(serviceq, (gpointer)as);
1843
1844     amfree(as->cmd);
1845     amfree(as->arguments);
1846     amfree(as->repbuf);
1847     amfree(as->rep_pkt.body);
1848     amfree(as);
1849
1850     if(exit_on_qlength == 0 && g_slist_length(serviceq) == 0) {
1851         dbclose();
1852         exit(0);
1853     }
1854 }
1855
1856 /*
1857  * Like 'fullwrite', but does the work in a child process so pipelines
1858  * do not hang.
1859  */
1860 static int
1861 writebuf(
1862     struct active_service *     as,
1863     const void *                bufp,
1864     size_t                      size)
1865 {
1866     pid_t pid;
1867     size_t    writesize;
1868
1869     switch (pid=fork()) {
1870     case -1:
1871         break;
1872
1873     default:
1874         waitpid(pid, NULL, WNOHANG);
1875         return 0;                       /* this is the parent */
1876
1877     case 0:                             /* this is the child */
1878         close(as->repfd);
1879         writesize = full_write(as->reqfd, bufp, size);
1880         exit(writesize != size);
1881         /* NOTREACHED */
1882     }
1883     return -1;
1884 }
1885
1886 static ssize_t
1887 do_sendpkt(
1888     security_handle_t * handle,
1889     pkt_t *             pkt)
1890 {
1891     dbprintf(_("sending %s pkt:\n<<<<<\n%s>>>>>\n"),
1892         pkt_type2str(pkt->type), pkt->body);
1893     if (handle)
1894         return security_sendpkt(handle, pkt);
1895     else
1896         return 1;
1897 }
1898
1899 /*
1900  * Convert a state into a string
1901  */
1902 static const char *
1903 state2str(
1904     state_t     state)
1905 {
1906     static const struct {
1907         state_t state;
1908         const char str[13];
1909     } states[] = {
1910 #define X(state)        { state, stringize(state) }
1911         X(s_sendack),
1912         X(s_repwait),
1913         X(s_processrep),
1914         X(s_sendrep),
1915         X(s_ackwait),
1916 #undef X
1917     };
1918     int i;
1919
1920     for (i = 0; i < (int)(sizeof(states) / sizeof(states[0])); i++)
1921         if (state == states[i].state)
1922             return (states[i].str);
1923     return (_("INVALID STATE"));
1924 }
1925
1926 /*
1927  * Convert an action into a string
1928  */
1929 static const char *
1930 action2str(
1931     action_t    action)
1932 {
1933     static const struct {
1934         action_t action;
1935         const char str[12];
1936     } actions[] = {
1937 #define X(action)       { action, stringize(action) }
1938         X(A_START),
1939         X(A_RECVPKT),
1940         X(A_RECVREP),
1941         X(A_PENDING),
1942         X(A_FINISH),
1943         X(A_CONTINUE),
1944         X(A_SENDNAK),
1945         X(A_TIMEOUT),
1946 #undef X
1947     };
1948     int i;
1949
1950     for (i = 0; i < (int)(sizeof(actions) / sizeof(actions[0])); i++)
1951         if (action == actions[i].action)
1952             return (actions[i].str);
1953     return (_("UNKNOWN ACTION"));
1954 }
1955
1956 static char *
1957 amandad_get_security_conf(
1958     char *      string,
1959     void *      arg)
1960 {
1961     (void)arg;      /* Quiet unused parameter warning */
1962
1963     if (!string || !*string)
1964         return(NULL);
1965
1966     if (strcmp(string, "kencrypt")==0) {
1967         if (amandad_kencrypt == KENCRYPT_YES)
1968             return ("yes");
1969         else
1970             return (NULL);
1971     }
1972     return(NULL);
1973 }
1974