Imported Upstream version 3.3.1
[debian/amanda] / amandad-src / amandad.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1999 University of Maryland at College Park
4  * All Rights Reserved.
5  *
6  * Permission to use, copy, modify, distribute, and sell this software and its
7  * documentation for any purpose is hereby granted without fee, provided that
8  * the above copyright notice appear in all copies and that both that
9  * copyright notice and this permission notice appear in supporting
10  * documentation, and that the name of U.M. not be used in advertising or
11  * publicity pertaining to distribution of the software without specific,
12  * written prior permission.  U.M. makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as is"
14  * without express or implied warranty.
15  *
16  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22  *
23  * Authors: the Amanda Development Team.  Its members are listed in a
24  * file named AUTHORS, in the root directory of this distribution.
25  */
26
27 /*
28  * $Id: amandad.c,v 1.18 2006/08/21 20:17:09 martinea Exp $
29  *
30  * handle client-host side of Amanda network communications, including
31  * security checks, execution of the proper service, and acking the
32  * master side
33  */
34
35 #include "amanda.h"
36 #include "amandad.h"
37 #include "clock.h"
38 #include "event.h"
39 #include "amfeatures.h"
40 #include "packet.h"
41 #include "version.h"
42 #include "security.h"
43 #include "stream.h"
44 #include "util.h"
45 #include "conffile.h"
46
47 #define REP_TIMEOUT     (6*60*60)       /* secs for service to reply */
48 #define ACK_TIMEOUT     10              /* XXX should be configurable */
49 #define STDERR_PIPE (DATA_FD_COUNT + 1)
50
51 #define amandad_debug(i, ...) do {      \
52         if ((i) <= debug_amandad) {     \
53                 dbprintf(__VA_ARGS__);  \
54         }                               \
55 } while (0)
56
57 /*
58  * These are the actions for entering the state machine
59  */
60 typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
61     A_SENDNAK, A_TIMEOUT } action_t;
62
63 /*
64  * This is a state in the state machine.  It is a function pointer to
65  * the function that actually implements the state.
66  */
67 struct active_service;
68 typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
69
70 /* string that we scan for in sendbackup's MESG stream */
71 static const char info_end_str[] = "sendbackup: info end\n";
72 #define INFO_END_LEN (sizeof(info_end_str)-1)
73
74 /* 
75  * Here are the services that we allow.
76  * Must be in the same order as services[].
77  */
78 typedef enum {
79     SERVICE_NOOP,
80     SERVICE_SENDSIZE,
81     SERVICE_SENDBACKUP,
82     SERVICE_SELFCHECK,
83     SERVICE_AMINDEXD,
84     SERVICE_AMIDXTAPED,
85     SERVICE_AMDUMPD
86 } service_t;
87
88 static struct services {
89     char *name;
90     int  active;
91     service_t service;
92 } services[] = {
93    { "noop", 1, SERVICE_NOOP },
94    { "sendsize", 1, SERVICE_SENDSIZE },
95    { "sendbackup", 1, SERVICE_SENDBACKUP },
96    { "selfcheck", 1, SERVICE_SELFCHECK },
97    { "amindexd", 0, SERVICE_AMINDEXD },
98    { "amidxtaped", 0, SERVICE_AMIDXTAPED },
99    { "amdumpd", 0, SERVICE_AMDUMPD }
100 };
101 #define NSERVICES       (int)(sizeof(services) / sizeof(services[0]))
102
103 /*
104  * This structure describes an active running service.
105  *
106  * An active service is something running that we have received
107  * a request for.  This structure holds info on that service, including
108  * file descriptors for data, etc, as well as the security handle
109  * for communications with the amanda server.
110  */
111 struct active_service {
112     service_t service;                  /* service name */
113     char *cmd;                          /* name of command we ran */
114     char *arguments;                    /* arguments we sent it */
115     security_handle_t *security_handle; /* remote server */
116     state_t state;                      /* how far this has progressed */
117     pid_t pid;                          /* pid of subprocess */
118     int send_partial_reply;             /* send PREP packet */
119     int reqfd;                          /* pipe to write requests */
120     int repfd;                          /* pipe to read replies */
121     int errfd;                          /* pipe to read stderr */
122     event_handle_t *ev_repfd;           /* read event handle for repfd */
123     event_handle_t *ev_reptimeout;      /* timeout for rep data */
124     event_handle_t *ev_errfd;           /* read event handle for errfd */
125     pkt_t rep_pkt;                      /* rep packet we're sending out */
126     char *errbuf;                       /* buffer to read the err into */
127     char *repbuf;                       /* buffer to read the rep into */
128     size_t bufsize;                     /* length of repbuf */
129     size_t repbufsize;                  /* length of repbuf */
130     int repretry;                       /* times we'll retry sending the rep */
131     int seen_info_end;                  /* have we seen "sendbackup info end\n"? */
132     char info_end_buf[INFO_END_LEN];    /* last few bytes read, used for scanning for info end */
133
134     /*
135      * General user streams to the process, and their equivalent
136      * network streams.
137      */
138     struct datafd_handle {
139         int fd_read;                    /* pipe to child process */
140         int fd_write;                   /* pipe to child process */
141         event_handle_t *ev_read;        /* it's read event handle */
142         event_handle_t *ev_write;       /* it's write event handle */
143         security_stream_t *netfd;       /* stream to amanda server */
144         struct active_service *as;      /* pointer back to our enclosure */
145     } data[DATA_FD_COUNT];
146     char databuf[NETWORK_BLOCK_BYTES];  /* buffer to relay netfd data in */
147 };
148
149 /*
150  * Queue of outstanding requests that we are running.
151  */
152 GSList *serviceq = NULL;
153
154 static int wait_30s = 1;
155 static int exit_on_qlength = 1;
156 static char *auth = NULL;
157 static kencrypt_type amandad_kencrypt = KENCRYPT_NONE;
158
159 int main(int argc, char **argv);
160
161 static int allocstream(struct active_service *, int);
162 static void exit_check(void *);
163 static void protocol_accept(security_handle_t *, pkt_t *);
164 static void state_machine(struct active_service *, action_t, pkt_t *);
165
166 static action_t s_sendack(struct active_service *, action_t, pkt_t *);
167 static action_t s_repwait(struct active_service *, action_t, pkt_t *);
168 static action_t s_processrep(struct active_service *, action_t, pkt_t *);
169 static action_t s_sendrep(struct active_service *, action_t, pkt_t *);
170 static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
171
172 static void repfd_recv(void *);
173 static void process_errfd(void *cookie);
174 static void errfd_recv(void *);
175 static void timeout_repfd(void *);
176 static void protocol_recv(void *, pkt_t *, security_status_t);
177 static void process_readnetfd(void *);
178 static void process_writenetfd(void *, void *, ssize_t);
179 static struct active_service *service_new(security_handle_t *,
180     const char *, service_t, const char *);
181 static void service_delete(struct active_service *);
182 static int writebuf(struct active_service *, const void *, size_t);
183 static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
184 static char *amandad_get_security_conf (char *, void *);
185
186 static const char *state2str(state_t);
187 static const char *action2str(action_t);
188
189 int
190 main(
191     int         argc,
192     char **     argv)
193 {
194     int i, j;
195     int have_services;
196     int in, out;
197     const security_driver_t *secdrv;
198     int no_exit = 0;
199     char *pgm = "amandad";              /* in case argv[0] is not set */
200 #if defined(USE_REUSEADDR)
201     const int on = 1;
202     int r;
203 #endif
204
205     /*
206      * Configure program for internationalization:
207      *   1) Only set the message locale for now.
208      *   2) Set textdomain for all amanda related programs to "amanda"
209      *      We don't want to be forced to support dozens of message catalogs.
210      */  
211     setlocale(LC_MESSAGES, "C");
212     textdomain("amanda"); 
213
214     safe_fd(-1, 0);
215     safe_cd();
216
217     /*
218      * Nexenta needs the SUN_PERSONALITY env variable to be unset, otherwise
219      * the Sun version of tar in /usr/sun/sbin/tar is called instead.
220      *
221      * On other operating systems this will have no effect.
222      */
223
224 #ifdef HAVE_UNSETENV
225     unsetenv("SUN_PERSONALITY");
226 #endif
227
228     /*
229      * When called via inetd, it is not uncommon to forget to put the
230      * argv[0] value on the config line.  On some systems (e.g. Solaris)
231      * this causes argv and/or argv[0] to be NULL, so we have to be
232      * careful getting our name.
233      */
234     if ((argv == NULL) || (argv[0] == NULL)) {
235             pgm = "amandad";            /* in case argv[0] is not set */
236     } else {
237             pgm = basename(argv[0]);    /* Strip of leading path get debug name */
238     }
239     set_pname(pgm);
240     dbopen(DBG_SUBDIR_AMANDAD);
241
242     if(argv == NULL) {
243         error(_("argv == NULL\n"));
244         /*NOTREACHED*/
245     }
246
247     if (argc > 1 && argv && argv[1] && g_str_equal(argv[1], "--version")) {
248         printf("amandad-%s\n", VERSION);
249         return (0);
250     }
251
252     /* Don't die when child closes pipe */
253     signal(SIGPIPE, SIG_IGN);
254
255     /* Parse the configuration; we'll handle errors later */
256     config_init(CONFIG_INIT_CLIENT, NULL);
257
258     if (geteuid() == 0) {
259         check_running_as(RUNNING_AS_ROOT);
260         initgroups(CLIENT_LOGIN, get_client_gid());
261         setgid(get_client_gid());
262         setegid(get_client_gid());
263         seteuid(get_client_uid());
264     } else {
265         check_running_as(RUNNING_AS_CLIENT_LOGIN);
266     }
267
268     add_amanda_log_handler(amanda_log_stderr);
269     add_amanda_log_handler(amanda_log_syslog);
270
271     /*
272      * ad-hoc argument parsing
273      *
274      * We accept        -auth=[authentication type]
275      *                  -no-exit
276      *                  -tcp=[port]
277      *                  -udp=[port]
278      * We also add a list of services that amandad can launch
279      */
280     secdrv = NULL;
281     in = 0; out = 1;            /* default to stdin/stdout */
282     have_services = 0;
283     for (i = 1; i < argc; i++) {
284         /*
285          * Get a driver for a security type specified after -auth=
286          */
287         if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
288             argv[i] += strlen("-auth=");
289             secdrv = security_getdriver(argv[i]);
290             auth = argv[i];
291             if (secdrv == NULL) {
292                 error(_("no driver for security type '%s'\n"), argv[i]);
293                 /*NOTREACHED*/
294             }
295             if (strcmp(auth, "local") == 0 ||
296                 strcmp(auth, "rsh") == 0 ||
297                 strcmp(auth, "ssh") == 0) {
298                 int i;
299                 for (i=0; i < NSERVICES; i++) {
300                     services[i].active = 1;
301                 }
302             }
303             continue;
304         }
305
306         /*
307          * If -no-exit is specified, always run even after requests have
308          * been satisfied.
309          */
310         else if (strcmp(argv[i], "-no-exit") == 0) {
311             no_exit = 1;
312             continue;
313         }
314
315         /*
316          * Allow us to directly bind to a udp port for debugging.
317          * This may only apply to some security types.
318          */
319         else if (strncmp(argv[i], "-udp=", strlen("-udp=")) == 0) {
320 #ifdef WORKING_IPV6
321             struct sockaddr_in6 sin;
322 #else
323             struct sockaddr_in sin;
324 #endif
325
326             argv[i] += strlen("-udp=");
327 #ifdef WORKING_IPV6
328             in = out = socket(AF_INET6, SOCK_DGRAM, 0);
329 #else
330             in = out = socket(AF_INET, SOCK_DGRAM, 0);
331 #endif
332             if (in < 0) {
333                 error(_("can't create dgram socket: %s\n"), strerror(errno));
334                 /*NOTREACHED*/
335             }
336 #ifdef USE_REUSEADDR
337             r = setsockopt(in, SOL_SOCKET, SO_REUSEADDR,
338                 (void *)&on, (socklen_t_equiv)sizeof(on));
339             if (r < 0) {
340                 dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
341                           strerror(errno));
342             }
343 #endif
344
345 #ifdef WORKING_IPV6
346             sin.sin6_family = (sa_family_t)AF_INET6;
347             sin.sin6_addr = in6addr_any;
348             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
349 #else
350             sin.sin_family = (sa_family_t)AF_INET;
351             sin.sin_addr.s_addr = INADDR_ANY;
352             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
353 #endif
354             if (bind(in, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
355                 error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
356                     strerror(errno));
357                 /*NOTREACHED*/
358             }
359         }
360         /*
361          * Ditto for tcp ports.
362          */
363         else if (strncmp(argv[i], "-tcp=", strlen("-tcp=")) == 0) {
364 #ifdef WORKING_IPV6
365             struct sockaddr_in6 sin;
366 #else
367             struct sockaddr_in sin;
368 #endif
369             int sock;
370             socklen_t_equiv n;
371
372             argv[i] += strlen("-tcp=");
373 #ifdef WORKING_IPV6
374             sock = socket(AF_INET6, SOCK_STREAM, 0);
375 #else
376             sock = socket(AF_INET, SOCK_STREAM, 0);
377 #endif
378             if (sock < 0) {
379                 error(_("can't create tcp socket: %s\n"), strerror(errno));
380                 /*NOTREACHED*/
381             }
382 #ifdef USE_REUSEADDR
383             r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
384                 (void *)&on, (socklen_t_equiv)sizeof(on));
385             if (r < 0) {
386                 dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
387                           strerror(errno));
388             }
389 #endif
390 #ifdef WORKING_IPV6
391             sin.sin6_family = (sa_family_t)AF_INET6;
392             sin.sin6_addr = in6addr_any;
393             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
394 #else
395             sin.sin_family = (sa_family_t)AF_INET;
396             sin.sin_addr.s_addr = INADDR_ANY;
397             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
398 #endif
399             if (bind(sock, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
400                 error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
401                     strerror(errno));
402                 /*NOTREACHED*/
403             }
404             listen(sock, 10);
405             n = (socklen_t_equiv)sizeof(sin);
406             in = out = accept(sock, (struct sockaddr *)&sin, &n);
407         }
408         /*
409          * It must be a service name
410          */
411         else {
412             /* clear all services */
413             if(!have_services) {
414                 for (j = 0; j < (int)NSERVICES; j++)
415                     services[j].active = 0;
416             }
417             have_services = 1;
418
419             if(strcmp(argv[i],"amdump") == 0) {
420                 services[0].active = 1;
421                 services[1].active = 1;
422                 services[2].active = 1;
423                 services[3].active = 1;
424             }
425             else {
426                 for (j = 0; j < (int)NSERVICES; j++)
427                     if (strcmp(services[j].name, argv[i]) == 0)
428                         break;
429                 if (j == (int)NSERVICES) {
430                     dbprintf(_("%s: invalid service\n"), argv[i]);
431                     exit(1);
432                 }
433                 services[j].active = 1;
434             }
435         }
436     }
437
438     /*
439      * If no security type specified, use BSDTCP
440      */
441     if (secdrv == NULL) {
442         secdrv = security_getdriver("BSDTCP");
443         auth = "bsdtcp";
444         if (secdrv == NULL) {
445             error(_("no driver for default security type 'BSDTCP'\n"));
446             /*NOTREACHED*/
447         }
448     }
449
450     if(strcasecmp(auth, "rsh") == 0 ||
451        strcasecmp(auth, "ssh") == 0 ||
452        strcasecmp(auth, "local") == 0 ||
453        strcasecmp(auth, "bsdtcp") == 0) {
454         wait_30s = 0;
455         exit_on_qlength = 1;
456     }
457
458 #ifndef SINGLE_USERID
459     if (geteuid() == 0) {
460         if (strcasecmp(auth, "krb5") != 0) {
461             struct passwd *pwd;
462             /* lookup our local user name */
463             if ((pwd = getpwnam(CLIENT_LOGIN)) == NULL) {
464                 error(_("getpwnam(%s) failed."), CLIENT_LOGIN);
465             }
466
467             if (pwd->pw_uid != 0) {
468                 error(_("'amandad' must be run as user '%s' when using '%s' authentication"),
469                       CLIENT_LOGIN, auth);
470             }
471         }
472     } else {
473         if (strcasecmp(auth, "krb5") == 0) {
474             error(_("'amandad' must be run as user 'root' when using 'krb5' authentication"));
475         }
476     }
477 #endif
478
479     /* initialize */
480
481     startclock();
482
483     dbprintf(_("version %s\n"), VERSION);
484     for (i = 0; version_info[i] != NULL; i++) {
485         dbprintf("    %s", version_info[i]);
486     }
487
488     if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
489         dbprintf(_("WARNING: argv[0] not defined: check inetd.conf\n"));
490     }
491
492     /* krb5 require the euid to be 0 */
493     if (strcasecmp(auth, "krb5") == 0) {
494         seteuid((uid_t)0);
495     }
496
497     /*
498      * Schedule to call protocol_accept() when new security handles
499      * are created on stdin.
500      */
501     security_accept(secdrv, amandad_get_security_conf, in, out, protocol_accept, NULL);
502
503     /*
504      * Schedule an event that will try to exit every 30 seconds if there
505      * are no requests outstanding.
506      */
507     if(wait_30s)
508         (void)event_register((event_id_t)30, EV_TIME, exit_check, &no_exit);
509
510     /*
511      * Call event_loop() with an arg of 0, telling it to block until all
512      * events are completed.
513      */
514     event_loop(0);
515
516     close(in);
517     close(out);
518     dbclose();
519     return(0);
520 }
521
522 /*
523  * This runs periodically and checks to see if we have any active services
524  * still running.  If we don't, then we quit.
525  */
526 static void
527 exit_check(
528     void *      cookie)
529 {
530     int no_exit;
531
532     assert(cookie != NULL);
533     no_exit = *(int *)cookie;
534
535     /*
536      * If things are still running, then don't exit.
537      */
538     if (g_slist_length(serviceq) > 0)
539         return;
540
541     /*
542      * If the caller asked us to never exit, then we're done
543      */
544     if (no_exit)
545         return;
546
547     dbclose();
548     exit(0);
549 }
550
551 /*
552  * Handles new incoming protocol handles.  This is a callback for
553  * security_accept(), which gets called when new handles are detected.
554  */
555 static void
556 protocol_accept(
557     security_handle_t * handle,
558     pkt_t *             pkt)
559 {
560     pkt_t pkt_out;
561     GSList *iter;
562     struct active_service *as;
563     char *pktbody, *tok, *service, *arguments;
564     char *service_path = NULL;
565     GSList *errlist = NULL;
566     int i;
567
568     pkt_out.body = NULL;
569
570     /*
571      * If handle is NULL, then the connection is closed.
572      */
573     if(handle == NULL) {
574         return;
575     }
576
577     /*
578      * If we have errors (not warnings) from the config file, let the remote system
579      * know immediately.  Unfortunately, we only get one ERROR line, so if there
580      * are multiple errors, we just show the first.
581      */
582     if (config_errors(&errlist) >= CFGERR_ERRORS) {
583         GSList *iter = errlist;
584         char *errmsg;
585         gboolean multiple_errors = FALSE;
586
587         if (iter) {
588             errmsg = (char *)iter->data;
589             if (iter->next)
590                 multiple_errors = TRUE;
591         } else {
592             errmsg = "(no error message)";
593         }
594
595         pkt_init(&pkt_out, P_NAK, "ERROR %s%s", errmsg,
596             multiple_errors? _(" (additional errors not displayed)"):"");
597         do_sendpkt(handle, &pkt_out);
598         amfree(pkt_out.body);
599         security_close(handle);
600         return;
601     }
602
603     g_debug("authenticated peer name is '%s'", security_get_authenticated_peer_name(handle));
604
605     /*
606      * If pkt is NULL, then there was a problem with the new connection.
607      */
608     if (pkt == NULL) {
609         dbprintf(_("accept error: %s\n"), security_geterror(handle));
610         pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
611         do_sendpkt(handle, &pkt_out);
612         amfree(pkt_out.body);
613         security_close(handle);
614         return;
615     }
616
617     dbprintf(_("accept recv %s pkt:\n<<<<<\n%s>>>>>\n"),
618         pkt_type2str(pkt->type), pkt->body);
619
620     /*
621      * If this is not a REQ packet, just forget about it.
622      */
623     if (pkt->type != P_REQ) {
624         dbprintf(_("received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n"),
625             pkt_type2str(pkt->type), pkt->body);
626         security_close(handle);
627         return;
628     }
629
630     pktbody = service = arguments = NULL;
631     as = NULL;
632
633     /*
634      * Parse out the service and arguments
635      */
636
637     pktbody = stralloc(pkt->body);
638
639     tok = strtok(pktbody, " ");
640     if (tok == NULL)
641         goto badreq;
642     if (strcmp(tok, "SERVICE") != 0)
643         goto badreq;
644
645     tok = strtok(NULL, " \n");
646     if (tok == NULL)
647         goto badreq;
648     service = stralloc(tok);
649
650     /* we call everything else 'arguments' */
651     tok = strtok(NULL, "");
652     if (tok == NULL)
653         goto badreq;
654     arguments = stralloc(tok);
655
656     /* see if it's one we allow */
657     for (i = 0; i < (int)NSERVICES; i++)
658         if (services[i].active == 1 && strcmp(services[i].name, service) == 0)
659             break;
660     if (i == (int)NSERVICES) {
661         dbprintf(_("%s: invalid service\n"), service);
662         pkt_init(&pkt_out, P_NAK, _("ERROR %s: invalid service, add '%s' as argument to amandad\n"), service, service);
663         goto send_pkt_out;
664     }
665
666     service_path = vstralloc(amlibexecdir, "/", service, NULL);
667     if (access(service_path, X_OK) < 0) {
668         dbprintf(_("can't execute %s: %s\n"), service_path, strerror(errno));
669             pkt_init(&pkt_out, P_NAK,
670                      _("ERROR execute access to \"%s\" denied\n"),
671                      service_path);
672         goto send_pkt_out;
673     }
674
675     /* see if its already running */
676     for (iter = serviceq; iter != NULL; iter = g_slist_next(iter)) {
677         as = (struct active_service *)iter->data;
678             if (strcmp(as->cmd, service_path) == 0 &&
679                 strcmp(as->arguments, arguments) == 0) {
680                     dbprintf(_("%s %s: already running, acking req\n"),
681                         service, arguments);
682                     pkt_init_empty(&pkt_out, P_ACK);
683                     goto send_pkt_out_no_delete;
684             }
685     }
686
687     /*
688      * create a new service instance, and send the arguments down
689      * the request pipe.
690      */
691     dbprintf(_("creating new service: %s\n%s\n"), service, arguments);
692     as = service_new(handle, service_path, services[i].service, arguments);
693     if (writebuf(as, arguments, strlen(arguments)) < 0) {
694         const char *errmsg = strerror(errno);
695         dbprintf(_("error sending arguments to %s: %s\n"), service, errmsg);
696         pkt_init(&pkt_out, P_NAK, _("ERROR error writing arguments to %s: %s\n"),
697             service, errmsg);
698         goto send_pkt_out;
699     }
700     aclose(as->reqfd);
701
702     amfree(pktbody);
703     amfree(service);
704     amfree(service_path);
705     amfree(arguments);
706
707     /*
708      * Move to the sendack state, and start up the state
709      * machine.
710      */
711     as->state = s_sendack;
712     state_machine(as, A_START, NULL);
713     return;
714
715 badreq:
716     pkt_init(&pkt_out, P_NAK, _("ERROR invalid REQ\n"));
717     dbprintf(_("received invalid %s packet:\n<<<<<\n%s>>>>>\n\n"),
718         pkt_type2str(pkt->type), pkt->body);
719
720 send_pkt_out:
721     if(as)
722         service_delete(as);
723 send_pkt_out_no_delete:
724     amfree(pktbody);
725     amfree(service_path);
726     amfree(service);
727     amfree(arguments);
728     do_sendpkt(handle, &pkt_out);
729     security_close(handle);
730     amfree(pkt_out.body);
731 }
732
733 /*
734  * Handles incoming protocol packets.  Routes responses to the proper
735  * running service.
736  */
737 static void
738 state_machine(
739     struct active_service *     as,
740     action_t                    action,
741     pkt_t *                     pkt)
742 {
743     action_t retaction;
744     state_t curstate;
745     pkt_t nak;
746
747     amandad_debug(1, _("state_machine: %p entering\n"), as);
748     for (;;) {
749         curstate = as->state;
750         amandad_debug(1, _("state_machine: %p curstate=%s action=%s\n"), as,
751                           state2str(curstate), action2str(action));
752         retaction = (*curstate)(as, action, pkt);
753         amandad_debug(1, _("state_machine: %p curstate=%s returned %s (nextstate=%s)\n"),
754                           as, state2str(curstate), action2str(retaction),
755                           state2str(as->state));
756
757         switch (retaction) {
758         /*
759          * State has queued up and is now blocking on input.
760          */
761         case A_PENDING:
762             amandad_debug(1, _("state_machine: %p leaving (A_PENDING)\n"), as);
763             return;
764
765         /*
766          * service has switched states.  Loop.
767          */
768         case A_CONTINUE:
769             break;
770
771         /*
772          * state has determined that the packet it received was bogus.
773          * Send a nak, and return.
774          */
775         case A_SENDNAK:
776             dbprintf(_("received unexpected %s packet\n"),
777                 pkt_type2str(pkt->type));
778             dbprintf(_("<<<<<\n%s----\n\n"), pkt->body);
779             pkt_init(&nak, P_NAK, _("ERROR unexpected packet type %s\n"),
780                 pkt_type2str(pkt->type));
781             do_sendpkt(as->security_handle, &nak);
782             amfree(nak.body);
783             security_recvpkt(as->security_handle, protocol_recv, as, -1);
784             amandad_debug(1, _("state_machine: %p leaving (A_SENDNAK)\n"), as);
785             return;
786
787         /*
788          * Service is done.  Remove it and finish.
789          */
790         case A_FINISH:
791             amandad_debug(1, _("state_machine: %p leaving (A_FINISH)\n"), as);
792             service_delete(as);
793             return;
794
795         default:
796             assert(0);
797             break;
798         }
799     }
800     /*NOTREACHED*/
801 }
802
803 /*
804  * This state just sends an ack.  After that, we move to the repwait
805  * state to wait for REP data to arrive from the subprocess.
806  */
807 static action_t
808 s_sendack(
809     struct active_service *     as,
810     action_t                    action,
811     pkt_t *                     pkt)
812 {
813     pkt_t ack;
814
815     (void)action;       /* Quiet unused parameter warning */
816     (void)pkt;          /* Quiet unused parameter warning */
817
818     pkt_init_empty(&ack, P_ACK);
819     if (do_sendpkt(as->security_handle, &ack) < 0) {
820         dbprintf(_("error sending ACK: %s\n"),
821             security_geterror(as->security_handle));
822         amfree(ack.body);
823         return (A_FINISH);
824     }
825     amfree(ack.body);
826
827     /*
828      * move to the repwait state
829      * Setup a listener for data on the reply fd, but also
830      * listen for packets over the wire, as the server may
831      * poll us if we take a long time.
832      * Setup a timeout that will fire if it takes too long to
833      * receive rep data.
834      */
835     as->state = s_repwait;
836     as->ev_repfd = event_register((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
837     as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
838         timeout_repfd, as);
839     as->errbuf = NULL;
840     as->ev_errfd = event_register((event_id_t)as->errfd, EV_READFD, errfd_recv, as);
841     security_recvpkt(as->security_handle, protocol_recv, as, -1);
842     return (A_PENDING);
843 }
844
845 /*
846  * This is the repwait state.  We have responded to the initial REQ with
847  * an ACK, and we are now waiting for the process we spawned to pass us 
848  * data to send in a REP.
849  */
850 static action_t
851 s_repwait(
852     struct active_service *     as,
853     action_t                    action,
854     pkt_t *                     pkt)
855 {
856     ssize_t   n;
857     char     *repbuf_temp;
858     char     *what;
859     char     *msg;
860     int       code = 0;
861     int       pid;
862     amwait_t  retstat;
863
864     /*
865      * We normally shouldn't receive any packets while waiting
866      * for our REP data, but in some cases we do.
867      */
868     if (action == A_RECVPKT) {
869         assert(pkt != NULL);
870         /*
871          * Another req for something that's running.  Just send an ACK
872          * and go back and wait for more data.
873          */
874         if (pkt->type == P_REQ) {
875             dbprintf(_("received dup P_REQ packet, ACKing it\n"));
876             amfree(as->rep_pkt.body);
877             pkt_init_empty(&as->rep_pkt, P_ACK);
878             do_sendpkt(as->security_handle, &as->rep_pkt);
879             security_recvpkt(as->security_handle, protocol_recv, as, -1);
880             return (A_PENDING);
881         }
882         /* something unexpected.  Nak it */
883         return (A_SENDNAK);
884     }
885
886     if (action == A_TIMEOUT) {
887         amfree(as->rep_pkt.body);
888         pkt_init(&as->rep_pkt, P_NAK, _("ERROR timeout on reply pipe\n"));
889         dbprintf(_("%s timed out waiting for REP data\n"), as->cmd);
890         do_sendpkt(as->security_handle, &as->rep_pkt);
891         return (A_FINISH);
892     }
893
894     assert(action == A_RECVREP);
895     if(as->bufsize == 0) {
896         as->bufsize = NETWORK_BLOCK_BYTES;
897         as->repbuf = alloc(as->bufsize);
898     }
899
900     do {
901         n = read(as->repfd, as->repbuf + as->repbufsize,
902                  as->bufsize - as->repbufsize - 1);
903     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
904     if (n < 0) {
905         const char *errstr = strerror(errno);
906         dbprintf(_("read error on reply pipe: %s\n"), errstr);
907         amfree(as->rep_pkt.body);
908         pkt_init(&as->rep_pkt, P_NAK, _("ERROR read error on reply pipe: %s\n"),
909                  errstr);
910         do_sendpkt(as->security_handle, &as->rep_pkt);
911         return (A_FINISH);
912     }
913
914     /* If end of service, wait for process status */
915     if (n == 0) {
916         pid = waitpid(as->pid, &retstat, WNOHANG);
917         if (as->service  == SERVICE_NOOP ||
918             as->service  == SERVICE_SENDSIZE ||
919             as->service  == SERVICE_SELFCHECK) {
920             int t = 0;
921             while (t<5 && pid == 0) {
922                 sleep(1);
923                 t++;
924                 pid = waitpid(as->pid, &retstat, WNOHANG);
925             }
926         }
927
928         process_errfd(as);
929
930         if (pid == 0)
931             pid = waitpid(as->pid, &retstat, WNOHANG);
932
933         if (pid > 0) {
934             what = NULL;
935             if (! WIFEXITED(retstat)) {
936                 what = _("signal");
937                 code = WTERMSIG(retstat);
938             } else if (WEXITSTATUS(retstat) != 0) {
939                 what = _("code");
940                 code = WEXITSTATUS(retstat);
941             }
942             if (what) {
943                 dbprintf(_("service %s failed: pid %u exited with %s %d\n"),
944                          (as->cmd)?as->cmd:_("??UNKONWN??"),
945                          (unsigned)as->pid,
946                          what, code);
947                 msg = vstrallocf(
948                      _("ERROR service %s failed: pid %u exited with %s %d\n"),
949                      (as->cmd)?as->cmd:_("??UNKONWN??"), (unsigned)as->pid,
950                      what, code);
951                 if (as->repbufsize + strlen(msg) >= (as->bufsize - 1)) {
952                         as->bufsize *= 2;
953                         repbuf_temp = alloc(as->bufsize);
954                         memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
955                         amfree(as->repbuf);
956                         as->repbuf = repbuf_temp;
957                 }
958                 strcpy(as->repbuf + as->repbufsize, msg);
959                 as->repbufsize += strlen(msg);
960                 amfree(msg);
961             }
962         }
963     }
964
965     /*
966      * If we got some data, go back and wait for more, or EOF.  Nul terminate
967      * the buffer first.
968      */
969     as->repbuf[n + as->repbufsize] = '\0';
970     if (n > 0) {
971         as->repbufsize += n;
972         if(as->repbufsize >= (as->bufsize - 1)) {
973             as->bufsize *= 2;
974             repbuf_temp = alloc(as->bufsize);
975             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
976             amfree(as->repbuf);
977             as->repbuf = repbuf_temp;
978         }
979         else if(as->send_partial_reply) {
980             amfree(as->rep_pkt.body);
981             pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
982             do_sendpkt(as->security_handle, &as->rep_pkt);
983             amfree(as->rep_pkt.body);
984             pkt_init_empty(&as->rep_pkt, P_REP);
985         }
986  
987         return (A_PENDING);
988     }
989
990     /*
991      * If we got 0, then we hit EOF.  Process the data and release
992      * the timeout.
993      */
994     assert(n == 0);
995
996     assert(as->ev_repfd != NULL);
997     event_release(as->ev_repfd);
998     as->ev_repfd = NULL;
999
1000     assert(as->ev_reptimeout != NULL);
1001     event_release(as->ev_reptimeout);
1002     as->ev_reptimeout = NULL;
1003
1004     as->state = s_processrep;
1005     aclose(as->repfd);
1006     return (A_CONTINUE);
1007 }
1008
1009 /*
1010  * After we have read in all of the rep data, we process it and send
1011  * it out as a REP packet.
1012  */
1013 static action_t
1014 s_processrep(
1015     struct active_service *     as,
1016     action_t                    action,
1017     pkt_t *                     pkt)
1018 {
1019     char *tok, *repbuf;
1020
1021     (void)action;       /* Quiet unused parameter warning */
1022     (void)pkt;          /* Quiet unused parameter warning */
1023
1024     /*
1025      * Copy the rep lines into the outgoing packet.
1026      *
1027      * If this line is a CONNECT, translate it
1028      * Format is "CONNECT <tag> <handle> <tag> <handle> etc...
1029      * Example:
1030      *
1031      *  CONNECT DATA 4 MESG 5 INDEX 6
1032      *
1033      * The tags are arbitrary.  The handles are in the DATA_FD pool.
1034      * We need to map these to security streams and pass them back
1035      * to the amanda server.  If the handle is -1, then we don't map.
1036      */
1037     if (strncmp_const(as->repbuf,"KENCRYPT\n") == 0) {
1038         amandad_kencrypt = KENCRYPT_WILL_DO;
1039         repbuf = stralloc(as->repbuf + 9);
1040     } else {
1041         repbuf = stralloc(as->repbuf);
1042     }
1043     amfree(as->rep_pkt.body);
1044     pkt_init_empty(&as->rep_pkt, P_REP);
1045     tok = strtok(repbuf, " ");
1046     if (tok == NULL)
1047         goto error;
1048     if (strcmp(tok, "CONNECT") == 0) {
1049         char *line, *nextbuf;
1050
1051         /* Save the entire line */
1052         line = strtok(NULL, "\n");
1053         /* Save the buf following the line */
1054         nextbuf = strtok(NULL, "");
1055
1056         if (line == NULL || nextbuf == NULL)
1057             goto error;
1058
1059         pkt_cat(&as->rep_pkt, "CONNECT");
1060
1061         /* loop over the id/handle pairs */
1062         for (;;) {
1063             /* id */
1064             tok = strtok(line, " ");
1065             line = NULL;        /* keep working from line */
1066             if (tok == NULL)
1067                 break;
1068             pkt_cat(&as->rep_pkt, " %s", tok);
1069
1070             /* handle */
1071             tok = strtok(NULL, " \n");
1072             if (tok == NULL)
1073                 goto error;
1074             /* convert the handle into something the server can process */
1075             pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
1076         }
1077         pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
1078     } else {
1079 error:
1080         pkt_cat(&as->rep_pkt, "%s", as->repbuf);
1081     }
1082
1083     /*
1084      * We've setup our REP packet in as->rep_pkt.  Now move to the transmission
1085      * state.
1086      */
1087     as->state = s_sendrep;
1088     as->repretry = getconf_int(CNF_REP_TRIES);
1089     amfree(repbuf);
1090     return (A_CONTINUE);
1091 }
1092
1093 /*
1094  * This is the state where we send the REP we just collected from our child.
1095  */
1096 static action_t
1097 s_sendrep(
1098     struct active_service *     as,
1099     action_t                    action,
1100     pkt_t *                     pkt)
1101 {
1102     (void)action;       /* Quiet unused parameter warning */
1103     (void)pkt;          /* Quiet unused parameter warning */
1104
1105     /*
1106      * Transmit it and move to the ack state.
1107      */
1108     do_sendpkt(as->security_handle, &as->rep_pkt);
1109     security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
1110     as->state = s_ackwait;
1111     return (A_PENDING);
1112 }
1113
1114 /*
1115  * This is the state in which we wait for the server to ACK the REP
1116  * we just sent it.
1117  */
1118 static action_t
1119 s_ackwait(
1120     struct active_service *     as,
1121     action_t                    action,
1122     pkt_t *                     pkt)
1123 {
1124     struct datafd_handle *dh;
1125     int npipes;
1126
1127     /*
1128      * If we got a timeout, try again, but eventually give up.
1129      */
1130     if (action == A_TIMEOUT) {
1131         if (--as->repretry > 0) {
1132             as->state = s_sendrep;
1133             return (A_CONTINUE);
1134         }
1135         dbprintf(_("timeout waiting for ACK for our REP\n"));
1136         return (A_FINISH);
1137     }
1138     amandad_debug(1, _("received ACK, now opening streams\n"));
1139
1140     assert(action == A_RECVPKT);
1141
1142     if (pkt->type == P_REQ) {
1143         dbprintf(_("received dup P_REQ packet, resending REP\n"));
1144         as->state = s_sendrep;
1145         return (A_CONTINUE);
1146     }
1147
1148     if (pkt->type != P_ACK)
1149         return (A_SENDNAK);
1150
1151     if (amandad_kencrypt == KENCRYPT_WILL_DO) {
1152         amandad_kencrypt = KENCRYPT_YES;
1153     }
1154
1155     /*
1156      * Got the ack, now open the pipes
1157      */
1158     for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1159         if (dh->netfd == NULL)
1160             continue;
1161         dbprintf("opening security stream for fd %d\n", (int)(dh - as->data) + DATA_FD_OFFSET);
1162         if (security_stream_accept(dh->netfd) < 0) {
1163             dbprintf(_("stream %td accept failed: %s\n"),
1164                 dh - &as->data[0], security_geterror(as->security_handle));
1165             security_stream_close(dh->netfd);
1166             dh->netfd = NULL;
1167             continue;
1168         }
1169
1170         /* setup an event for reads from it.  As a special case, don't start
1171          * listening on as->data[0] until we read some data on another fd, if
1172          * the service is sendbackup.  This ensures that we send a MESG or 
1173          * INDEX token before any DATA tokens, as dumper assumes. This is a
1174          * hack, if that wasn't already obvious! */
1175         if (dh != &as->data[0] || as->service != SERVICE_SENDBACKUP) {
1176             dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1177                                          process_readnetfd, dh);
1178         } else {
1179             amandad_debug(1, "Skipping registration of sendbackup's data FD\n");
1180         }
1181
1182         security_stream_read(dh->netfd, process_writenetfd, dh);
1183
1184     }
1185
1186     /*
1187      * Pipes are open, so auth them.  Count them at the same time.
1188      */
1189     for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1190         if (dh->netfd == NULL)
1191             continue;
1192         if (security_stream_auth(dh->netfd) < 0) {
1193             security_stream_close(dh->netfd);
1194             dh->netfd = NULL;
1195             event_release(dh->ev_read);
1196             event_release(dh->ev_write);
1197             dh->ev_read = NULL;
1198             dh->ev_write = NULL;
1199         } else {
1200             npipes++;
1201         }
1202     }
1203
1204     /*
1205      * If no pipes are open, then we're done.  Otherwise, just start running.
1206      * The event handlers on all of the pipes will take it from here.
1207      */
1208     amandad_debug(1, _("at end of s_ackwait, npipes is %d\n"), npipes);
1209     if (npipes == 0)
1210         return (A_FINISH);
1211     else {
1212         security_close(as->security_handle);
1213         as->security_handle = NULL;
1214         return (A_PENDING);
1215     }
1216 }
1217
1218 /*
1219  * Called when a repfd has received data
1220  */
1221 static void
1222 repfd_recv(
1223     void *      cookie)
1224 {
1225     struct active_service *as = cookie;
1226
1227     assert(as != NULL);
1228     assert(as->ev_repfd != NULL);
1229
1230     state_machine(as, A_RECVREP, NULL);
1231 }
1232
1233 static void
1234 process_errfd(
1235     void *cookie)
1236 {
1237     struct active_service *as = cookie;
1238
1239     /* Process errfd before sending the REP packet */
1240     if (as->ev_errfd) {
1241         SELECT_ARG_TYPE readset;
1242         struct timeval  tv;
1243         int             nfound;
1244
1245         memset(&tv, 0, SIZEOF(tv));
1246         FD_ZERO(&readset);
1247         FD_SET(as->errfd, &readset);
1248         nfound = select(as->errfd+1, &readset, NULL, NULL, &tv);
1249         if (nfound && FD_ISSET(as->errfd, &readset)) {
1250             errfd_recv(as);
1251         }
1252     }
1253 }
1254
1255 /*
1256  * Called when a errfd has received data
1257  */
1258 static void
1259 errfd_recv(
1260     void *      cookie)
1261 {
1262     struct active_service *as = cookie;
1263     char  buf[32769];
1264     int   n;
1265     char *r;
1266
1267     assert(as != NULL);
1268     assert(as->ev_errfd != NULL);
1269
1270     n = read(as->errfd, &buf, 32768);
1271     /* merge buffer */
1272     if (n > 0) {
1273         /* Terminate it with '\0' */
1274         buf[n+1] = '\0';
1275
1276         if (as->errbuf) {
1277             as->errbuf = vstrextend(&as->errbuf, buf, NULL);
1278         } else {
1279             as->errbuf = stralloc(buf);
1280         }
1281     } else if (n == 0) {
1282         event_release(as->ev_errfd);
1283         as->ev_errfd = NULL;
1284     } else { /* n < 0 */
1285         event_release(as->ev_errfd);
1286         as->ev_errfd = NULL;
1287         g_snprintf(buf, 32768,
1288                    "error reading stderr or service: %s\n", strerror(errno));
1289     }
1290
1291     /* for each line terminate by '\n' */
1292     while (as->errbuf != NULL  && (r = strchr(as->errbuf, '\n')) != NULL) {
1293         char *s;
1294
1295         *r = '\0';
1296         s = vstrallocf("ERROR service %s: %s\n",
1297                        services[as->service].name, as->errbuf);
1298
1299         /* Add to repbuf, error message will be in the REP packet if it
1300          * is not already sent
1301          */
1302         n = strlen(s);
1303         if (as->bufsize == 0) {
1304             as->bufsize = NETWORK_BLOCK_BYTES;
1305             as->repbuf = alloc(as->bufsize);
1306         }
1307         while (as->bufsize < as->repbufsize + n) {
1308             char *repbuf_temp;
1309             as->bufsize *= 2;
1310             repbuf_temp = alloc(as->bufsize);
1311             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
1312             amfree(as->repbuf);
1313             as->repbuf = repbuf_temp;
1314         }
1315         memcpy(as->repbuf + as->repbufsize, s, n);
1316         as->repbufsize += n;
1317
1318         dbprintf("%s", s);
1319
1320         /* remove first line from buffer */
1321         r++;
1322         s = stralloc(r);
1323         amfree(as->errbuf);
1324         as->errbuf = s;
1325     }
1326 }
1327
1328 /*
1329  * Called when a repfd has timed out
1330  */
1331 static void
1332 timeout_repfd(
1333     void *      cookie)
1334 {
1335     struct active_service *as = cookie;
1336
1337     assert(as != NULL);
1338     assert(as->ev_reptimeout != NULL);
1339
1340     state_machine(as, A_TIMEOUT, NULL);
1341 }
1342
1343 /*
1344  * Called when a handle has received data
1345  */
1346 static void
1347 protocol_recv(
1348     void *              cookie,
1349     pkt_t *             pkt,
1350     security_status_t   status)
1351 {
1352     struct active_service *as = cookie;
1353
1354     assert(as != NULL);
1355
1356     switch (status) {
1357     case S_OK:
1358         dbprintf(_("received %s pkt:\n<<<<<\n%s>>>>>\n"),
1359             pkt_type2str(pkt->type), pkt->body);
1360         state_machine(as, A_RECVPKT, pkt);
1361         break;
1362     case S_TIMEOUT:
1363         dbprintf(_("timeout\n"));
1364         state_machine(as, A_TIMEOUT, NULL);
1365         break;
1366     case S_ERROR:
1367         dbprintf(_("receive error: %s\n"),
1368             security_geterror(as->security_handle));
1369         break;
1370     }
1371 }
1372
1373 /*
1374  * This is a generic relay function that just reads data from one of
1375  * the process's pipes and passes it up the equivalent security_stream_t
1376  */
1377 static void
1378 process_readnetfd(
1379     void *      cookie)
1380 {
1381     pkt_t nak;
1382     struct datafd_handle *dh = cookie;
1383     struct active_service *as = dh->as;
1384     ssize_t n;
1385
1386     nak.body = NULL;
1387
1388     do {
1389         n = read(dh->fd_read, as->databuf, SIZEOF(as->databuf));
1390     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
1391
1392     /*
1393      * Process has died.
1394      */
1395     if (n < 0) {
1396         pkt_init(&nak, P_NAK, _("A ERROR data descriptor %d broken: %s\n"),
1397             dh->fd_read, strerror(errno));
1398         goto sendnak;
1399     }
1400     /*
1401      * Process has closed the pipe.  Just remove this event handler.
1402      * If all pipes are closed, shut down this service.
1403      */
1404     if (n == 0) {
1405         event_release(dh->ev_read);
1406         dh->ev_read = NULL;
1407         if(dh->ev_write == NULL) {
1408             security_stream_close(dh->netfd);
1409             dh->netfd = NULL;
1410         }
1411         for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1412             if (dh->netfd != NULL)
1413                 return;
1414         }
1415         service_delete(as);
1416         return;
1417     }
1418
1419     /* Handle the special case of recognizing "sendbackup info end"
1420      * from sendbackup's MESG fd */
1421     if (as->service == SERVICE_SENDBACKUP && !as->seen_info_end && dh == &as->data[1]) {
1422         /* make a buffer containing the combined data from info_end_buf
1423          * and what we've read this time, and search it for info_end_strj
1424          * This includes a NULL byte for strstr's sanity. */
1425         char *combined_buf = malloc(INFO_END_LEN + n + 1);
1426         memcpy(combined_buf, as->info_end_buf, INFO_END_LEN);
1427         memcpy(combined_buf+INFO_END_LEN, as->databuf, n);
1428         combined_buf[INFO_END_LEN+n] = '\0';
1429
1430         as->seen_info_end = (strstr(combined_buf, info_end_str) != NULL);
1431
1432         /* fill info_end_buf from the tail end of combined_buf */
1433         memcpy(as->info_end_buf, combined_buf + n, INFO_END_LEN);
1434         amfree(combined_buf);
1435
1436         /* if we did see info_end_str, start reading the data fd (fd 0) */
1437         if (as->seen_info_end) {
1438             struct datafd_handle *dh = &as->data[0];
1439             amandad_debug(1, "Opening datafd to sendbackup (delayed until sendbackup sent header info)\n");
1440             dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1441                                          process_readnetfd, dh);
1442         } else {
1443             amandad_debug(1, "sendbackup header info still not complete\n");
1444         }
1445     }
1446
1447     if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
1448         /* stream has croaked */
1449         pkt_init(&nak, P_NAK, _("ERROR write error on stream %d: %s\n"),
1450             security_stream_id(dh->netfd),
1451             security_stream_geterror(dh->netfd));
1452         goto sendnak;
1453     }
1454     return;
1455
1456 sendnak:
1457     do_sendpkt(as->security_handle, &nak);
1458     service_delete(as);
1459     amfree(nak.body);
1460 }
1461
1462 /*
1463  * This is a generic relay function that just read data from one of
1464  * the security_stream_t and passes it up the equivalent process's pipes
1465  */
1466 static void
1467 process_writenetfd(
1468     void *      cookie,
1469     void *      buf,
1470     ssize_t     size)
1471 {
1472     struct datafd_handle *dh;
1473
1474     assert(cookie != NULL);
1475     dh = cookie;
1476
1477     if (dh->fd_write <= 0) {
1478         dbprintf(_("process_writenetfd: dh->fd_write <= 0\n"));
1479     } else if (size > 0) {
1480         full_write(dh->fd_write, buf, (size_t)size);
1481         security_stream_read(dh->netfd, process_writenetfd, dh);
1482     }
1483     else {
1484         aclose(dh->fd_write);
1485     }
1486 }
1487
1488
1489 /*
1490  * Convert a local stream handle (DATA_FD...) into something that
1491  * can be sent to the amanda server.
1492  *
1493  * Returns a number that should be sent to the server in the REP packet.
1494  */
1495 static int
1496 allocstream(
1497     struct active_service *     as,
1498     int                         handle)
1499 {
1500     struct datafd_handle *dh;
1501
1502     /* note that handle is in the range DATA_FD_OFFSET to DATA_FD_COUNT, but
1503      * it is NOT a file descriptor! */
1504
1505     /* if the handle is -1, then we don't bother */
1506     if (handle < 0)
1507         return (-1);
1508
1509     /* make sure the handle's kosher */
1510     if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
1511         return (-1);
1512
1513     /* get a pointer into our handle array */
1514     dh = &as->data[handle - DATA_FD_OFFSET];
1515
1516     /* make sure we're not already using the net handle */
1517     if (dh->netfd != NULL)
1518         return (-1);
1519
1520     /* allocate a stream from the security layer and return */
1521     dh->netfd = security_stream_server(as->security_handle);
1522     if (dh->netfd == NULL) {
1523         dbprintf(_("couldn't open stream to server: %s\n"),
1524             security_geterror(as->security_handle));
1525         return (-1);
1526     }
1527
1528     /*
1529      * convert the stream into a numeric id that can be sent to the
1530      * remote end.
1531      */
1532     return (security_stream_id(dh->netfd));
1533 }
1534
1535 /*
1536  * Create a new service instance
1537  */
1538 static struct active_service *
1539 service_new(
1540     security_handle_t * security_handle,
1541     const char *        cmd,
1542     service_t           service,
1543     const char *        arguments)
1544 {
1545     int i;
1546     int data_read[DATA_FD_COUNT + 2][2];
1547     int data_write[DATA_FD_COUNT + 2][2];
1548     struct active_service *as;
1549     pid_t pid;
1550     int newfd;
1551     char *peer_name;
1552     char *amanda_remote_host_env[2];
1553
1554     assert(security_handle != NULL);
1555     assert(cmd != NULL);
1556     assert(arguments != NULL);
1557
1558     /* a plethora of pipes */
1559     /* data_read[0]                : stdin
1560      * data_write[0]               : stdout
1561      * data_read[1], data_write[1] : first  stream
1562      * data_read[2], data_write[2] : second stream
1563      * data_read[3], data_write[3] : third stream
1564      * data_write[4]               : stderr
1565      */
1566     for (i = 0; i < DATA_FD_COUNT + 1; i++) {
1567         if (pipe(data_read[i]) < 0) {
1568             error(_("pipe: %s\n"), strerror(errno));
1569             /*NOTREACHED*/
1570         }
1571         if (pipe(data_write[i]) < 0) {
1572             error(_("pipe: %s\n"), strerror(errno));
1573             /*NOTREACHED*/
1574         }
1575     }
1576     if (pipe(data_write[STDERR_PIPE]) < 0) {
1577         error(_("pipe: %s\n"), strerror(errno));
1578         /*NOTREACHED*/
1579     }
1580
1581     switch(pid = fork()) {
1582     case -1:
1583         error(_("could not fork service %s: %s\n"), cmd, strerror(errno));
1584         /*NOTREACHED*/
1585     default:
1586         /*
1587          * The parent.  Close the far ends of our pipes and return.
1588          */
1589         as = g_new0(struct active_service, 1);
1590         as->cmd = stralloc(cmd);
1591         as->arguments = stralloc(arguments);
1592         as->security_handle = security_handle;
1593         as->state = NULL;
1594         as->service = service;
1595         as->pid = pid;
1596         as->send_partial_reply = 0;
1597         as->seen_info_end = FALSE;
1598         /* fill in info_end_buf with non-null characters */
1599         memset(as->info_end_buf, '-', sizeof(as->info_end_buf));
1600         if(service == SERVICE_SENDSIZE) {
1601             g_option_t *g_options;
1602             char *option_str, *p;
1603
1604             option_str = stralloc(as->arguments+8);
1605             p = strchr(option_str,'\n');
1606             if(p) *p = '\0';
1607
1608             g_options = parse_g_options(option_str, 1);
1609             if(am_has_feature(g_options->features, fe_partial_estimate)) {
1610                 as->send_partial_reply = 1;
1611             }
1612             free_g_options(g_options);
1613             amfree(option_str);
1614         }
1615
1616         /* write to the request pipe */
1617         aclose(data_read[0][0]);
1618         as->reqfd = data_read[0][1];
1619
1620         /*
1621          * read from the reply pipe
1622          */
1623         as->repfd = data_write[0][0];
1624         aclose(data_write[0][1]);
1625         as->ev_repfd = NULL;
1626         as->repbuf = NULL;
1627         as->repbufsize = 0;
1628         as->bufsize = 0;
1629         as->repretry = 0;
1630         as->rep_pkt.body = NULL;
1631
1632         /*
1633          * read from the stderr pipe
1634          */
1635         as->errfd = data_write[STDERR_PIPE][0];
1636         aclose(data_write[STDERR_PIPE][1]);
1637         as->ev_errfd = NULL;
1638         as->errbuf = NULL;
1639
1640         /*
1641          * read from the rest of the general-use pipes
1642          * (netfds are opened as the client requests them)
1643          */
1644         for (i = 0; i < DATA_FD_COUNT; i++) {
1645             aclose(data_read[i + 1][1]);
1646             aclose(data_write[i + 1][0]);
1647             as->data[i].fd_read = data_read[i + 1][0];
1648             as->data[i].fd_write = data_write[i + 1][1];
1649             as->data[i].ev_read = NULL;
1650             as->data[i].ev_write = NULL;
1651             as->data[i].netfd = NULL;
1652             as->data[i].as = as;
1653         }
1654
1655         /* add it to the service queue */
1656         /* increment the active service count */
1657         serviceq = g_slist_append(serviceq, (gpointer)as);
1658
1659         return (as);
1660     case 0:
1661         /*
1662          * The child.  Put our pipes in their advertised locations
1663          * and start up.
1664          */
1665
1666         /* set up the AMANDA_AUTHENTICATED_PEER env var so child services
1667          * can use it to authenticate */
1668         peer_name = security_get_authenticated_peer_name(security_handle);
1669         amanda_remote_host_env[0] = NULL;
1670         amanda_remote_host_env[1] = NULL;
1671         if (*peer_name) {
1672             amanda_remote_host_env[0] =
1673                 g_strdup_printf("AMANDA_AUTHENTICATED_PEER=%s", peer_name);
1674         }
1675
1676         /*
1677          * The data stream is stdin in the new process
1678          */
1679         if (dup2(data_read[0][0], 0) < 0) {
1680             error(_("dup %d to %d failed: %s\n"), data_read[0][0], 0,
1681                 strerror(errno));
1682             /*NOTREACHED*/
1683         }
1684         aclose(data_read[0][0]);
1685         aclose(data_read[0][1]);
1686
1687         /*
1688          * The reply stream is stdout
1689          */
1690         if (dup2(data_write[0][1], 1) < 0) {
1691             error(_("dup %d to %d failed: %s\n"), data_write[0][1], 1,
1692                 strerror(errno));
1693         }
1694         aclose(data_write[0][0]);
1695         aclose(data_write[0][1]);
1696
1697         for (i = 0; i < DATA_FD_COUNT; i++) {
1698             aclose(data_read[i + 1][0]);
1699             aclose(data_write[i + 1][1]);
1700         }
1701
1702         /*
1703          *  Make sure they are not open in the range DATA_FD_OFFSET to
1704          *      DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1
1705          */
1706         for (i = 0; i < DATA_FD_COUNT; i++) {
1707             while(data_read[i + 1][1] >= DATA_FD_OFFSET &&
1708                   data_read[i + 1][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1709                 newfd = dup(data_read[i + 1][1]);
1710                 if(newfd == -1)
1711                     error(_("Can't dup out off DATA_FD range"));
1712                 data_read[i + 1][1] = newfd;
1713             }
1714             while(data_write[i + 1][0] >= DATA_FD_OFFSET &&
1715                   data_write[i + 1][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1716                 newfd = dup(data_write[i + 1][0]);
1717                 if(newfd == -1)
1718                     error(_("Can't dup out off DATA_FD range"));
1719                 data_write[i + 1][0] = newfd;
1720             }
1721         }
1722         while(data_write[4][0] >= DATA_FD_OFFSET &&
1723               data_write[4][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1724             newfd = dup(data_write[4][0]);
1725             if (newfd == -1)
1726                 error(_("Can't dup out off DATA_FD range"));
1727             data_write[4][0] = newfd;
1728         }
1729         while(data_write[4][1] >= DATA_FD_OFFSET &&
1730               data_write[4][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1731             newfd = dup(data_write[4][1]);
1732             if (newfd == -1)
1733                 error(_("Can't dup out off DATA_FD range"));
1734             data_write[4][1] = newfd;
1735         }
1736
1737         for (i = 0; i < DATA_FD_COUNT*2; i++)
1738             close(DATA_FD_OFFSET + i);
1739
1740         /*
1741          * The rest start at the offset defined in amandad.h, and continue
1742          * through the internal defined.
1743          */
1744         for (i = 0; i < DATA_FD_COUNT; i++) {
1745             if (dup2(data_read[i + 1][1], i*2 + DATA_FD_OFFSET) < 0) {
1746                 error(_("dup %d to %d failed: %s\n"), data_read[i + 1][1],
1747                     i + DATA_FD_OFFSET, strerror(errno));
1748             }
1749             aclose(data_read[i + 1][1]);
1750
1751             if (dup2(data_write[i + 1][0], i*2 + 1 + DATA_FD_OFFSET) < 0) {
1752                 error(_("dup %d to %d failed: %s\n"), data_write[i + 1][0],
1753                     i + DATA_FD_OFFSET, strerror(errno));
1754             }
1755             aclose(data_write[i + 1][0]);
1756         }
1757
1758         /* close all unneeded fd */
1759         close(STDERR_FILENO);
1760         dup2(data_write[STDERR_PIPE][1], 2);
1761         aclose(data_write[STDERR_PIPE][0]);
1762         aclose(data_write[STDERR_PIPE][1]);
1763         safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
1764
1765         execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env_full(amanda_remote_host_env));
1766         error(_("could not exec service %s: %s\n"), cmd, strerror(errno));
1767         /*NOTREACHED*/
1768     }
1769     return NULL;
1770 }
1771
1772 /*
1773  * Unallocate a service instance
1774  */
1775 static void
1776 service_delete(
1777     struct active_service *     as)
1778 {
1779     int i;
1780     int   count;
1781     pid_t pid;
1782     struct datafd_handle *dh;
1783
1784     amandad_debug(1, _("closing service: %s\n"),
1785                       (as->cmd)?as->cmd:_("??UNKONWN??"));
1786
1787     assert(as != NULL);
1788
1789     assert(as->cmd != NULL);
1790     amfree(as->cmd);
1791
1792     assert(as->arguments != NULL);
1793     amfree(as->arguments);
1794
1795     if (as->reqfd != -1)
1796         aclose(as->reqfd);
1797     if (as->repfd != -1)
1798         aclose(as->repfd);
1799     if (as->errfd != -1) {
1800         process_errfd(as);
1801         aclose(as->errfd);
1802     }
1803
1804     if (as->ev_repfd != NULL)
1805         event_release(as->ev_repfd);
1806     if (as->ev_reptimeout != NULL)
1807         event_release(as->ev_reptimeout);
1808     if (as->ev_errfd != NULL)
1809         event_release(as->ev_errfd);
1810
1811     for (i = 0; i < DATA_FD_COUNT; i++) {
1812         dh = &as->data[i];
1813
1814         aclose(dh->fd_read);
1815         aclose(dh->fd_write);
1816
1817         if (dh->netfd != NULL)
1818             security_stream_close(dh->netfd);
1819
1820         if (dh->ev_read != NULL)
1821             event_release(dh->ev_read);
1822         if (dh->ev_write != NULL)
1823             event_release(dh->ev_write);
1824     }
1825
1826     if (as->security_handle != NULL)
1827         security_close(as->security_handle);
1828
1829     /* try to kill the process; if this fails, then it's already dead and
1830      * likely some of the other zombie cleanup ate its brains, so we don't
1831      * bother to waitpid for it */
1832     assert(as->pid > 0);
1833     pid = waitpid(as->pid, NULL, WNOHANG);
1834     if (pid != as->pid && kill(as->pid, SIGTERM) == 0) {
1835         pid = waitpid(as->pid, NULL, WNOHANG);
1836         count = 5;
1837         while (pid != as->pid && count > 0) {
1838             count--;
1839             sleep(1);
1840             pid = waitpid(as->pid, NULL, WNOHANG);
1841         }
1842         if (pid != as->pid) {
1843             g_debug("Process %d failed to exit", (int)as->pid);
1844         }
1845     }
1846
1847     serviceq = g_slist_remove(serviceq, (gpointer)as);
1848
1849     amfree(as->cmd);
1850     amfree(as->arguments);
1851     amfree(as->repbuf);
1852     amfree(as->rep_pkt.body);
1853     amfree(as);
1854
1855     if(exit_on_qlength == 0 && g_slist_length(serviceq) == 0) {
1856         dbclose();
1857         exit(0);
1858     }
1859 }
1860
1861 /*
1862  * Like 'fullwrite', but does the work in a child process so pipelines
1863  * do not hang.
1864  */
1865 static int
1866 writebuf(
1867     struct active_service *     as,
1868     const void *                bufp,
1869     size_t                      size)
1870 {
1871     pid_t pid;
1872     size_t    writesize;
1873
1874     switch (pid=fork()) {
1875     case -1:
1876         break;
1877
1878     default:
1879         waitpid(pid, NULL, WNOHANG);
1880         return 0;                       /* this is the parent */
1881
1882     case 0:                             /* this is the child */
1883         close(as->repfd);
1884         writesize = full_write(as->reqfd, bufp, size);
1885         exit(writesize != size);
1886         /* NOTREACHED */
1887     }
1888     return -1;
1889 }
1890
1891 static ssize_t
1892 do_sendpkt(
1893     security_handle_t * handle,
1894     pkt_t *             pkt)
1895 {
1896     dbprintf(_("sending %s pkt:\n<<<<<\n%s>>>>>\n"),
1897         pkt_type2str(pkt->type), pkt->body);
1898     if (handle)
1899         return security_sendpkt(handle, pkt);
1900     else
1901         return 1;
1902 }
1903
1904 /*
1905  * Convert a state into a string
1906  */
1907 static const char *
1908 state2str(
1909     state_t     state)
1910 {
1911     static const struct {
1912         state_t state;
1913         const char str[13];
1914     } states[] = {
1915 #define X(state)        { state, stringize(state) }
1916         X(s_sendack),
1917         X(s_repwait),
1918         X(s_processrep),
1919         X(s_sendrep),
1920         X(s_ackwait),
1921 #undef X
1922     };
1923     int i;
1924
1925     for (i = 0; i < (int)(sizeof(states) / sizeof(states[0])); i++)
1926         if (state == states[i].state)
1927             return (states[i].str);
1928     return (_("INVALID STATE"));
1929 }
1930
1931 /*
1932  * Convert an action into a string
1933  */
1934 static const char *
1935 action2str(
1936     action_t    action)
1937 {
1938     static const struct {
1939         action_t action;
1940         const char str[12];
1941     } actions[] = {
1942 #define X(action)       { action, stringize(action) }
1943         X(A_START),
1944         X(A_RECVPKT),
1945         X(A_RECVREP),
1946         X(A_PENDING),
1947         X(A_FINISH),
1948         X(A_CONTINUE),
1949         X(A_SENDNAK),
1950         X(A_TIMEOUT),
1951 #undef X
1952     };
1953     int i;
1954
1955     for (i = 0; i < (int)(sizeof(actions) / sizeof(actions[0])); i++)
1956         if (action == actions[i].action)
1957             return (actions[i].str);
1958     return (_("UNKNOWN ACTION"));
1959 }
1960
1961 static char *
1962 amandad_get_security_conf(
1963     char *      string,
1964     void *      arg)
1965 {
1966     (void)arg;      /* Quiet unused parameter warning */
1967
1968     if (!string || !*string)
1969         return(NULL);
1970
1971     if (strcmp(string, "kencrypt")==0) {
1972         if (amandad_kencrypt == KENCRYPT_YES)
1973             return ("yes");
1974         else
1975             return (NULL);
1976     }
1977     return(NULL);
1978 }
1979