8d9b3d6b680133603cc551655523d55d9fde2b91
[debian/amanda] / amandad-src / amandad.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1999 University of Maryland at College Park
4  * All Rights Reserved.
5  *
6  * Permission to use, copy, modify, distribute, and sell this software and its
7  * documentation for any purpose is hereby granted without fee, provided that
8  * the above copyright notice appear in all copies and that both that
9  * copyright notice and this permission notice appear in supporting
10  * documentation, and that the name of U.M. not be used in advertising or
11  * publicity pertaining to distribution of the software without specific,
12  * written prior permission.  U.M. makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as is"
14  * without express or implied warranty.
15  *
16  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22  *
23  * Authors: the Amanda Development Team.  Its members are listed in a
24  * file named AUTHORS, in the root directory of this distribution.
25  */
26
27 /*
28  * $Id: amandad.c,v 1.18 2006/08/21 20:17:09 martinea Exp $
29  *
30  * handle client-host side of Amanda network communications, including
31  * security checks, execution of the proper service, and acking the
32  * master side
33  */
34
35 #include "amanda.h"
36 #include "amandad.h"
37 #include "clock.h"
38 #include "event.h"
39 #include "amfeatures.h"
40 #include "packet.h"
41 #include "version.h"
42 #include "queue.h"
43 #include "security.h"
44 #include "stream.h"
45 #include "util.h"
46 #include "conffile.h"
47
48 #define REP_TIMEOUT     (6*60*60)       /* secs for service to reply */
49 #define ACK_TIMEOUT     10              /* XXX should be configurable */
50 #define STDERR_PIPE (DATA_FD_COUNT + 1)
51
52 #define amandad_debug(i, ...) do {      \
53         if ((i) <= debug_amandad) {     \
54                 dbprintf(__VA_ARGS__);  \
55         }                               \
56 } while (0)
57
58 /*
59  * These are the actions for entering the state machine
60  */
61 typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
62     A_SENDNAK, A_TIMEOUT } action_t;
63
64 /*
65  * This is a state in the state machine.  It is a function pointer to
66  * the function that actually implements the state.
67  */
68 struct active_service;
69 typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
70
71 /* string that we scan for in sendbackup's MESG stream */
72 static const char info_end_str[] = "sendbackup: info end\n";
73 #define INFO_END_LEN (sizeof(info_end_str)-1)
74
75 /* 
76  * Here are the services that we allow.
77  * Must be in the same order as services[].
78  */
79 typedef enum {
80     SERVICE_NOOP,
81     SERVICE_SENDSIZE,
82     SERVICE_SENDBACKUP,
83     SERVICE_SELFCHECK,
84     SERVICE_AMINDEXD,
85     SERVICE_AMIDXTAPED
86 } service_t;
87
88 static struct services {
89     char *name;
90     int  active;
91     service_t service;
92 } services[] = {
93    { "noop", 1, SERVICE_NOOP },
94    { "sendsize", 1, SERVICE_SENDSIZE },
95    { "sendbackup", 1, SERVICE_SENDBACKUP },
96    { "selfcheck", 1, SERVICE_SELFCHECK },
97    { "amindexd", 0, SERVICE_AMINDEXD },
98    { "amidxtaped", 0, SERVICE_AMIDXTAPED }
99 };
100 #define NSERVICES       (int)(sizeof(services) / sizeof(services[0]))
101
102 /*
103  * This structure describes an active running service.
104  *
105  * An active service is something running that we have received
106  * a request for.  This structure holds info on that service, including
107  * file descriptors for data, etc, as well as the security handle
108  * for communications with the amanda server.
109  */
110 struct active_service {
111     service_t service;                  /* service name */
112     char *cmd;                          /* name of command we ran */
113     char *arguments;                    /* arguments we sent it */
114     security_handle_t *security_handle; /* remote server */
115     state_t state;                      /* how far this has progressed */
116     pid_t pid;                          /* pid of subprocess */
117     int send_partial_reply;             /* send PREP packet */
118     int reqfd;                          /* pipe to write requests */
119     int repfd;                          /* pipe to read replies */
120     int errfd;                          /* pipe to read stderr */
121     event_handle_t *ev_repfd;           /* read event handle for repfd */
122     event_handle_t *ev_reptimeout;      /* timeout for rep data */
123     event_handle_t *ev_errfd;           /* read event handle for errfd */
124     pkt_t rep_pkt;                      /* rep packet we're sending out */
125     char *errbuf;                       /* buffer to read the err into */
126     char *repbuf;                       /* buffer to read the rep into */
127     size_t bufsize;                     /* length of repbuf */
128     size_t repbufsize;                  /* length of repbuf */
129     int repretry;                       /* times we'll retry sending the rep */
130     int seen_info_end;                  /* have we seen "sendbackup info end\n"? */
131     char info_end_buf[INFO_END_LEN];    /* last few bytes read, used for scanning for info end */
132
133     /*
134      * General user streams to the process, and their equivalent
135      * network streams.
136      */
137     struct datafd_handle {
138         int fd_read;                    /* pipe to child process */
139         int fd_write;                   /* pipe to child process */
140         event_handle_t *ev_read;        /* it's read event handle */
141         event_handle_t *ev_write;       /* it's write event handle */
142         security_stream_t *netfd;       /* stream to amanda server */
143         struct active_service *as;      /* pointer back to our enclosure */
144     } data[DATA_FD_COUNT];
145     char databuf[NETWORK_BLOCK_BYTES];  /* buffer to relay netfd data in */
146     TAILQ_ENTRY(active_service) tq;     /* queue handle */
147 };
148
149 /*
150  * Queue of outstanding requests that we are running.
151  */
152 static struct {
153     TAILQ_HEAD(, active_service) tailq;
154     int qlength;
155 } serviceq = {
156     TAILQ_HEAD_INITIALIZER(serviceq.tailq), 0
157 };
158
159 static int wait_30s = 1;
160 static int exit_on_qlength = 1;
161 static char *auth = NULL;
162 static kencrypt_type amandad_kencrypt = KENCRYPT_NONE;
163
164 int main(int argc, char **argv);
165
166 static int allocstream(struct active_service *, int);
167 static void exit_check(void *);
168 static void protocol_accept(security_handle_t *, pkt_t *);
169 static void state_machine(struct active_service *, action_t, pkt_t *);
170
171 static action_t s_sendack(struct active_service *, action_t, pkt_t *);
172 static action_t s_repwait(struct active_service *, action_t, pkt_t *);
173 static action_t s_processrep(struct active_service *, action_t, pkt_t *);
174 static action_t s_sendrep(struct active_service *, action_t, pkt_t *);
175 static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
176
177 static void repfd_recv(void *);
178 static void errfd_recv(void *);
179 static void timeout_repfd(void *);
180 static void protocol_recv(void *, pkt_t *, security_status_t);
181 static void process_readnetfd(void *);
182 static void process_writenetfd(void *, void *, ssize_t);
183 static struct active_service *service_new(security_handle_t *,
184     const char *, service_t, const char *);
185 static void service_delete(struct active_service *);
186 static int writebuf(struct active_service *, const void *, size_t);
187 static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
188 static char *amandad_get_security_conf (char *, void *);
189
190 static const char *state2str(state_t);
191 static const char *action2str(action_t);
192
193 int
194 main(
195     int         argc,
196     char **     argv)
197 {
198     int i, j;
199     int have_services;
200     int in, out;
201     const security_driver_t *secdrv;
202     int no_exit = 0;
203     char *pgm = "amandad";              /* in case argv[0] is not set */
204 #if defined(USE_REUSEADDR)
205     const int on = 1;
206     int r;
207 #endif
208
209     /*
210      * Configure program for internationalization:
211      *   1) Only set the message locale for now.
212      *   2) Set textdomain for all amanda related programs to "amanda"
213      *      We don't want to be forced to support dozens of message catalogs.
214      */  
215     setlocale(LC_MESSAGES, "C");
216     textdomain("amanda"); 
217
218     safe_fd(-1, 0);
219     safe_cd();
220
221     /*
222      * When called via inetd, it is not uncommon to forget to put the
223      * argv[0] value on the config line.  On some systems (e.g. Solaris)
224      * this causes argv and/or argv[0] to be NULL, so we have to be
225      * careful getting our name.
226      */
227     if ((argv == NULL) || (argv[0] == NULL)) {
228             pgm = "amandad";            /* in case argv[0] is not set */
229     } else {
230             pgm = basename(argv[0]);    /* Strip of leading path get debug name */
231     }
232     set_pname(pgm);
233     dbopen(DBG_SUBDIR_AMANDAD);
234
235     if(argv == NULL) {
236         error(_("argv == NULL\n"));
237         /*NOTREACHED*/
238     }
239
240     /* Don't die when child closes pipe */
241     signal(SIGPIPE, SIG_IGN);
242
243     /* Parse the configuration; we'll handle errors later */
244     config_init(CONFIG_INIT_CLIENT, NULL);
245
246     if (geteuid() == 0) {
247         check_running_as(RUNNING_AS_ROOT);
248         initgroups(CLIENT_LOGIN, get_client_gid());
249         setgid(get_client_gid());
250         setegid(get_client_gid());
251         seteuid(get_client_uid());
252     } else {
253         check_running_as(RUNNING_AS_CLIENT_LOGIN);
254     }
255
256     erroutput_type = (ERR_INTERACTIVE|ERR_SYSLOG);
257
258     /*
259      * ad-hoc argument parsing
260      *
261      * We accept        -auth=[authentication type]
262      *                  -no-exit
263      *                  -tcp=[port]
264      *                  -udp=[port]
265      * We also add a list of services that amandad can launch
266      */
267     secdrv = NULL;
268     in = 0; out = 1;            /* default to stdin/stdout */
269     have_services = 0;
270     for (i = 1; i < argc; i++) {
271         /*
272          * accept -krb4 as an alias for -auth=krb4 (for compatibility)
273          */
274         if (strcmp(argv[i], "-krb4") == 0) {
275             argv[i] = "-auth=krb4";
276             /* FALLTHROUGH */
277             auth = "krb4";
278         }
279
280         /*
281          * Get a driver for a security type specified after -auth=
282          */
283         else if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
284             argv[i] += strlen("-auth=");
285             secdrv = security_getdriver(argv[i]);
286             auth = argv[i];
287             if (secdrv == NULL) {
288                 error(_("no driver for security type '%s'\n"), argv[i]);
289                 /*NOTREACHED*/
290             }
291             continue;
292         }
293
294         /*
295          * If -no-exit is specified, always run even after requests have
296          * been satisfied.
297          */
298         else if (strcmp(argv[i], "-no-exit") == 0) {
299             no_exit = 1;
300             continue;
301         }
302
303         /*
304          * Allow us to directly bind to a udp port for debugging.
305          * This may only apply to some security types.
306          */
307         else if (strncmp(argv[i], "-udp=", strlen("-udp=")) == 0) {
308 #ifdef WORKING_IPV6
309             struct sockaddr_in6 sin;
310 #else
311             struct sockaddr_in sin;
312 #endif
313
314             argv[i] += strlen("-udp=");
315 #ifdef WORKING_IPV6
316             in = out = socket(AF_INET6, SOCK_DGRAM, 0);
317 #else
318             in = out = socket(AF_INET, SOCK_DGRAM, 0);
319 #endif
320             if (in < 0) {
321                 error(_("can't create dgram socket: %s\n"), strerror(errno));
322                 /*NOTREACHED*/
323             }
324 #ifdef USE_REUSEADDR
325             r = setsockopt(in, SOL_SOCKET, SO_REUSEADDR,
326                 (void *)&on, (socklen_t_equiv)sizeof(on));
327             if (r < 0) {
328                 dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
329                           strerror(errno));
330             }
331 #endif
332
333 #ifdef WORKING_IPV6
334             sin.sin6_family = (sa_family_t)AF_INET6;
335             sin.sin6_addr = in6addr_any;
336             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
337 #else
338             sin.sin_family = (sa_family_t)AF_INET;
339             sin.sin_addr.s_addr = INADDR_ANY;
340             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
341 #endif
342             if (bind(in, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
343                 error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
344                     strerror(errno));
345                 /*NOTREACHED*/
346             }
347         }
348         /*
349          * Ditto for tcp ports.
350          */
351         else if (strncmp(argv[i], "-tcp=", strlen("-tcp=")) == 0) {
352 #ifdef WORKING_IPV6
353             struct sockaddr_in6 sin;
354 #else
355             struct sockaddr_in sin;
356 #endif
357             int sock;
358             socklen_t_equiv n;
359
360             argv[i] += strlen("-tcp=");
361 #ifdef WORKING_IPV6
362             sock = socket(AF_INET6, SOCK_STREAM, 0);
363 #else
364             sock = socket(AF_INET, SOCK_STREAM, 0);
365 #endif
366             if (sock < 0) {
367                 error(_("can't create tcp socket: %s\n"), strerror(errno));
368                 /*NOTREACHED*/
369             }
370 #ifdef USE_REUSEADDR
371             r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
372                 (void *)&on, (socklen_t_equiv)sizeof(on));
373             if (r < 0) {
374                 dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
375                           strerror(errno));
376             }
377 #endif
378 #ifdef WORKING_IPV6
379             sin.sin6_family = (sa_family_t)AF_INET6;
380             sin.sin6_addr = in6addr_any;
381             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
382 #else
383             sin.sin_family = (sa_family_t)AF_INET;
384             sin.sin_addr.s_addr = INADDR_ANY;
385             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
386 #endif
387             if (bind(sock, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
388                 error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
389                     strerror(errno));
390                 /*NOTREACHED*/
391             }
392             listen(sock, 10);
393             n = (socklen_t_equiv)sizeof(sin);
394             in = out = accept(sock, (struct sockaddr *)&sin, &n);
395         }
396         /*
397          * It must be a service name
398          */
399         else {
400             /* clear all services */
401             if(!have_services) {
402                 for (j = 0; j < (int)NSERVICES; j++)
403                     services[j].active = 0;
404             }
405             have_services = 1;
406
407             if(strcmp(argv[i],"amdump") == 0) {
408                 services[0].active = 1;
409                 services[1].active = 1;
410                 services[2].active = 1;
411                 services[3].active = 1;
412             }
413             else {
414                 for (j = 0; j < (int)NSERVICES; j++)
415                     if (strcmp(services[j].name, argv[i]) == 0)
416                         break;
417                 if (j == (int)NSERVICES) {
418                     dbprintf(_("%s: invalid service\n"), argv[i]);
419                     exit(1);
420                 }
421                 services[j].active = 1;
422             }
423         }
424     }
425
426     /*
427      * If no security type specified, use BSD
428      */
429     if (secdrv == NULL) {
430         secdrv = security_getdriver("BSD");
431         auth = "bsd";
432         if (secdrv == NULL) {
433             error(_("no driver for default security type 'BSD'\n"));
434             /*NOTREACHED*/
435         }
436     }
437
438     if(strcasecmp(auth, "rsh") == 0 ||
439        strcasecmp(auth, "ssh") == 0 ||
440        strcasecmp(auth, "local") == 0 ||
441        strcasecmp(auth, "bsdtcp") == 0) {
442         wait_30s = 0;
443         exit_on_qlength = 1;
444     }
445
446     if (getuid() == 0) {
447         if (strcasecmp(auth, "krb5") != 0) {
448             struct passwd *pwd;
449             /* lookup our local user name */
450             if ((pwd = getpwnam(CLIENT_LOGIN)) == NULL) {
451                 error(_("getpwnam(%s) failed."), CLIENT_LOGIN);
452             }
453             if (pwd->pw_uid != 0) {
454                 error(_("'amandad' must be run as user '%s' when using '%s' authentication"),
455                       CLIENT_LOGIN, auth);
456             }
457         }
458     } else {
459         if (strcasecmp(auth, "krb5") == 0) {
460             error(_("'amandad' must be run as user 'root' when using 'krb5' authentication"));
461         }
462     }
463
464
465     /* initialize */
466
467     startclock();
468
469     dbprintf(_("version %s\n"), version());
470     for (i = 0; version_info[i] != NULL; i++) {
471         dbprintf("    %s", version_info[i]);
472     }
473
474     if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
475         dbprintf(_("WARNING: argv[0] not defined: check inetd.conf\n"));
476     }
477
478     /* krb5 require the euid to be 0 */
479     if (strcasecmp(auth, "krb5") == 0) {
480         seteuid((uid_t)0);
481     }
482
483     /*
484      * Schedule to call protocol_accept() when new security handles
485      * are created on stdin.
486      */
487     security_accept(secdrv, amandad_get_security_conf, in, out, protocol_accept, NULL);
488
489     /*
490      * Schedule an event that will try to exit every 30 seconds if there
491      * are no requests outstanding.
492      */
493     if(wait_30s)
494         (void)event_register((event_id_t)30, EV_TIME, exit_check, &no_exit);
495
496     /*
497      * Call event_loop() with an arg of 0, telling it to block until all
498      * events are completed.
499      */
500     event_loop(0);
501
502     close(in);
503     close(out);
504     dbclose();
505     return(0);
506 }
507
508 /*
509  * This runs periodically and checks to see if we have any active services
510  * still running.  If we don't, then we quit.
511  */
512 static void
513 exit_check(
514     void *      cookie)
515 {
516     int no_exit;
517
518     assert(cookie != NULL);
519     no_exit = *(int *)cookie;
520
521     /*
522      * If things are still running, then don't exit.
523      */
524     if (serviceq.qlength > 0)
525         return;
526
527     /*
528      * If the caller asked us to never exit, then we're done
529      */
530     if (no_exit)
531         return;
532
533     dbclose();
534     exit(0);
535 }
536
537 /*
538  * Handles new incoming protocol handles.  This is a callback for
539  * security_accept(), which gets called when new handles are detected.
540  */
541 static void
542 protocol_accept(
543     security_handle_t * handle,
544     pkt_t *             pkt)
545 {
546     pkt_t pkt_out;
547     struct active_service *as;
548     char *pktbody, *tok, *service, *arguments;
549     char *service_path = NULL;
550     GSList *errlist = NULL;
551     int i;
552
553     pkt_out.body = NULL;
554
555     /*
556      * If handle is NULL, then the connection is closed.
557      */
558     if(handle == NULL) {
559         return;
560     }
561
562     /*
563      * If we have errors (not warnings) from the config file, let the server
564      * know immediately.  Unfortunately, we only get one ERROR line, so if there
565      * are multiple errors, we just show the first.
566      */
567     if (config_errors(&errlist) >= CFGERR_ERRORS) {
568         GSList *iter = errlist;
569         char *errmsg;
570         gboolean multiple_errors = FALSE;
571
572         if (iter) {
573             errmsg = (char *)iter->data;
574             if (iter->next)
575                 multiple_errors = TRUE;
576         } else {
577             errmsg = "(no error message)";
578         }
579
580         pkt_init(&pkt_out, P_NAK, "ERROR %s%s", errmsg,
581             multiple_errors? _(" (additional errors not displayed)"):"");
582         do_sendpkt(handle, &pkt_out);
583         amfree(pkt_out.body);
584         security_close(handle);
585         return;
586     }
587
588     /*
589      * If pkt is NULL, then there was a problem with the new connection.
590      */
591     if (pkt == NULL) {
592         dbprintf(_("accept error: %s\n"), security_geterror(handle));
593         pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
594         do_sendpkt(handle, &pkt_out);
595         amfree(pkt_out.body);
596         security_close(handle);
597         return;
598     }
599
600     dbprintf(_("accept recv %s pkt:\n<<<<<\n%s>>>>>\n"),
601         pkt_type2str(pkt->type), pkt->body);
602
603     /*
604      * If this is not a REQ packet, just forget about it.
605      */
606     if (pkt->type != P_REQ) {
607         dbprintf(_("received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n"),
608             pkt_type2str(pkt->type), pkt->body);
609         security_close(handle);
610         return;
611     }
612
613     pktbody = service = arguments = NULL;
614     as = NULL;
615
616     /*
617      * Parse out the service and arguments
618      */
619
620     pktbody = stralloc(pkt->body);
621
622     tok = strtok(pktbody, " ");
623     if (tok == NULL)
624         goto badreq;
625     if (strcmp(tok, "SERVICE") != 0)
626         goto badreq;
627
628     tok = strtok(NULL, " \n");
629     if (tok == NULL)
630         goto badreq;
631     service = stralloc(tok);
632
633     /* we call everything else 'arguments' */
634     tok = strtok(NULL, "");
635     if (tok == NULL)
636         goto badreq;
637     arguments = stralloc(tok);
638
639     /* see if it's one we allow */
640     for (i = 0; i < (int)NSERVICES; i++)
641         if (services[i].active == 1 && strcmp(services[i].name, service) == 0)
642             break;
643     if (i == (int)NSERVICES) {
644         dbprintf(_("%s: invalid service\n"), service);
645         pkt_init(&pkt_out, P_NAK, _("ERROR %s: invalid service, add '%s' as argument to amandad\n"), service, service);
646         goto send_pkt_out;
647     }
648
649     service_path = vstralloc(amlibexecdir, "/", service, versionsuffix(), NULL);
650     if (access(service_path, X_OK) < 0) {
651         dbprintf(_("can't execute %s: %s\n"), service_path, strerror(errno));
652             pkt_init(&pkt_out, P_NAK,
653                      _("ERROR execute access to \"%s\" denied\n"),
654                      service_path);
655         goto send_pkt_out;
656     }
657
658     /* see if its already running */
659     for (as = TAILQ_FIRST(&serviceq.tailq); as != NULL;
660         as = TAILQ_NEXT(as, tq)) {
661             if (strcmp(as->cmd, service_path) == 0 &&
662                 strcmp(as->arguments, arguments) == 0) {
663                     dbprintf(_("%s %s: already running, acking req\n"),
664                         service, arguments);
665                     pkt_init_empty(&pkt_out, P_ACK);
666                     goto send_pkt_out_no_delete;
667             }
668     }
669
670     /*
671      * create a new service instance, and send the arguments down
672      * the request pipe.
673      */
674     dbprintf(_("creating new service: %s\n%s\n"), service, arguments);
675     as = service_new(handle, service_path, services[i].service, arguments);
676     if (writebuf(as, arguments, strlen(arguments)) < 0) {
677         const char *errmsg = strerror(errno);
678         dbprintf(_("error sending arguments to %s: %s\n"), service, errmsg);
679         pkt_init(&pkt_out, P_NAK, _("ERROR error writing arguments to %s: %s\n"),
680             service, errmsg);
681         goto send_pkt_out;
682     }
683     aclose(as->reqfd);
684
685     amfree(pktbody);
686     amfree(service);
687     amfree(service_path);
688     amfree(arguments);
689
690     /*
691      * Move to the sendack state, and start up the state
692      * machine.
693      */
694     as->state = s_sendack;
695     state_machine(as, A_START, NULL);
696     return;
697
698 badreq:
699     pkt_init(&pkt_out, P_NAK, _("ERROR invalid REQ\n"));
700     dbprintf(_("received invalid %s packet:\n<<<<<\n%s>>>>>\n\n"),
701         pkt_type2str(pkt->type), pkt->body);
702
703 send_pkt_out:
704     if(as)
705         service_delete(as);
706 send_pkt_out_no_delete:
707     amfree(pktbody);
708     amfree(service_path);
709     amfree(service);
710     amfree(arguments);
711     do_sendpkt(handle, &pkt_out);
712     security_close(handle);
713     amfree(pkt_out.body);
714 }
715
716 /*
717  * Handles incoming protocol packets.  Routes responses to the proper
718  * running service.
719  */
720 static void
721 state_machine(
722     struct active_service *     as,
723     action_t                    action,
724     pkt_t *                     pkt)
725 {
726     action_t retaction;
727     state_t curstate;
728     pkt_t nak;
729
730     amandad_debug(1, _("state_machine: %p entering\n"), as);
731     for (;;) {
732         curstate = as->state;
733         amandad_debug(1, _("state_machine: %p curstate=%s action=%s\n"), as,
734                           state2str(curstate), action2str(action));
735         retaction = (*curstate)(as, action, pkt);
736         amandad_debug(1, _("state_machine: %p curstate=%s returned %s (nextstate=%s)\n"),
737                           as, state2str(curstate), action2str(retaction),
738                           state2str(as->state));
739
740         switch (retaction) {
741         /*
742          * State has queued up and is now blocking on input.
743          */
744         case A_PENDING:
745             amandad_debug(1, _("state_machine: %p leaving (A_PENDING)\n"), as);
746             return;
747
748         /*
749          * service has switched states.  Loop.
750          */
751         case A_CONTINUE:
752             break;
753
754         /*
755          * state has determined that the packet it received was bogus.
756          * Send a nak, and return.
757          */
758         case A_SENDNAK:
759             dbprintf(_("received unexpected %s packet\n"),
760                 pkt_type2str(pkt->type));
761             dbprintf(_("<<<<<\n%s----\n\n"), pkt->body);
762             pkt_init(&nak, P_NAK, _("ERROR unexpected packet type %s\n"),
763                 pkt_type2str(pkt->type));
764             do_sendpkt(as->security_handle, &nak);
765             amfree(nak.body);
766             security_recvpkt(as->security_handle, protocol_recv, as, -1);
767             amandad_debug(1, _("state_machine: %p leaving (A_SENDNAK)\n"), as);
768             return;
769
770         /*
771          * Service is done.  Remove it and finish.
772          */
773         case A_FINISH:
774             amandad_debug(1, _("state_machine: %p leaving (A_FINISH)\n"), as);
775             service_delete(as);
776             return;
777
778         default:
779             assert(0);
780             break;
781         }
782     }
783     /*NOTREACHED*/
784 }
785
786 /*
787  * This state just sends an ack.  After that, we move to the repwait
788  * state to wait for REP data to arrive from the subprocess.
789  */
790 static action_t
791 s_sendack(
792     struct active_service *     as,
793     action_t                    action,
794     pkt_t *                     pkt)
795 {
796     pkt_t ack;
797
798     (void)action;       /* Quiet unused parameter warning */
799     (void)pkt;          /* Quiet unused parameter warning */
800
801     pkt_init_empty(&ack, P_ACK);
802     if (do_sendpkt(as->security_handle, &ack) < 0) {
803         dbprintf(_("error sending ACK: %s\n"),
804             security_geterror(as->security_handle));
805         amfree(ack.body);
806         return (A_FINISH);
807     }
808     amfree(ack.body);
809
810     /*
811      * move to the repwait state
812      * Setup a listener for data on the reply fd, but also
813      * listen for packets over the wire, as the server may
814      * poll us if we take a long time.
815      * Setup a timeout that will fire if it takes too long to
816      * receive rep data.
817      */
818     as->state = s_repwait;
819     as->ev_repfd = event_register((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
820     as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
821         timeout_repfd, as);
822     as->errbuf = NULL;
823     as->ev_errfd = event_register((event_id_t)as->errfd, EV_READFD, errfd_recv, as);
824     security_recvpkt(as->security_handle, protocol_recv, as, -1);
825     return (A_PENDING);
826 }
827
828 /*
829  * This is the repwait state.  We have responded to the initial REQ with
830  * an ACK, and we are now waiting for the process we spawned to pass us 
831  * data to send in a REP.
832  */
833 static action_t
834 s_repwait(
835     struct active_service *     as,
836     action_t                    action,
837     pkt_t *                     pkt)
838 {
839     ssize_t   n;
840     char     *repbuf_temp;
841     char     *what;
842     char     *msg;
843     int       code = 0;
844     int       pid;
845     amwait_t  retstat;
846
847     /*
848      * We normally shouldn't receive any packets while waiting
849      * for our REP data, but in some cases we do.
850      */
851     if (action == A_RECVPKT) {
852         assert(pkt != NULL);
853         /*
854          * Another req for something that's running.  Just send an ACK
855          * and go back and wait for more data.
856          */
857         if (pkt->type == P_REQ) {
858             dbprintf(_("received dup P_REQ packet, ACKing it\n"));
859             amfree(as->rep_pkt.body);
860             pkt_init_empty(&as->rep_pkt, P_ACK);
861             do_sendpkt(as->security_handle, &as->rep_pkt);
862             security_recvpkt(as->security_handle, protocol_recv, as, -1);
863             return (A_PENDING);
864         }
865         /* something unexpected.  Nak it */
866         return (A_SENDNAK);
867     }
868
869     if (action == A_TIMEOUT) {
870         amfree(as->rep_pkt.body);
871         pkt_init(&as->rep_pkt, P_NAK, _("ERROR timeout on reply pipe\n"));
872         dbprintf(_("%s timed out waiting for REP data\n"), as->cmd);
873         do_sendpkt(as->security_handle, &as->rep_pkt);
874         return (A_FINISH);
875     }
876
877     assert(action == A_RECVREP);
878     if(as->bufsize == 0) {
879         as->bufsize = NETWORK_BLOCK_BYTES;
880         as->repbuf = alloc(as->bufsize);
881     }
882
883     do {
884         n = read(as->repfd, as->repbuf + as->repbufsize,
885                  as->bufsize - as->repbufsize - 1);
886     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
887     if (n < 0) {
888         const char *errstr = strerror(errno);
889         dbprintf(_("read error on reply pipe: %s\n"), errstr);
890         amfree(as->rep_pkt.body);
891         pkt_init(&as->rep_pkt, P_NAK, _("ERROR read error on reply pipe: %s\n"),
892                  errstr);
893         do_sendpkt(as->security_handle, &as->rep_pkt);
894         return (A_FINISH);
895     }
896
897     /* If end of service, wait for process status */
898     if (n == 0) {
899         pid = waitpid(as->pid, &retstat, WNOHANG);
900         if (as->service  == SERVICE_NOOP ||
901             as->service  == SERVICE_SENDSIZE ||
902             as->service  == SERVICE_SELFCHECK) {
903             int t = 0;
904             while (t<5 && pid == 0) {
905                 sleep(1);
906                 t++;
907                 pid = waitpid(as->pid, &retstat, WNOHANG);
908             }
909         }
910
911         /* Process errfd before sending the REP packet */
912         if (as->ev_errfd) {
913             SELECT_ARG_TYPE readset;
914             struct timeval  tv;
915             int             nfound;
916
917             memset(&tv, 0, SIZEOF(tv));
918             FD_ZERO(&readset);
919             FD_SET(as->errfd, &readset);
920             nfound = select(as->errfd+1, &readset, NULL, NULL, &tv);
921             if (nfound && FD_ISSET(as->errfd, &readset)) {
922                 errfd_recv(as);
923             }
924         }
925
926         if (pid == 0)
927             pid = waitpid(as->pid, &retstat, WNOHANG);
928
929         if (pid > 0) {
930             what = NULL;
931             if (! WIFEXITED(retstat)) {
932                 what = _("signal");
933                 code = WTERMSIG(retstat);
934             } else if (WEXITSTATUS(retstat) != 0) {
935                 what = _("code");
936                 code = WEXITSTATUS(retstat);
937             }
938             if (what) {
939                 dbprintf(_("service %s failed: pid %u exited with %s %d\n"),
940                          (as->cmd)?as->cmd:_("??UNKONWN??"),
941                          (unsigned)as->pid,
942                          what, code);
943                 msg = vstrallocf(
944                      _("ERROR service %s failed: pid %u exited with %s %d\n"),
945                      (as->cmd)?as->cmd:_("??UNKONWN??"), (unsigned)as->pid,
946                      what, code);
947                 if (as->repbufsize + strlen(msg) >= (as->bufsize - 1)) {
948                         as->bufsize *= 2;
949                         repbuf_temp = alloc(as->bufsize);
950                         memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
951                         amfree(as->repbuf);
952                         as->repbuf = repbuf_temp;
953                 }
954                 strcpy(as->repbuf + as->repbufsize, msg);
955                 as->repbufsize += strlen(msg);
956                 amfree(msg);
957             }
958         }
959     }
960
961     /*
962      * If we got some data, go back and wait for more, or EOF.  Nul terminate
963      * the buffer first.
964      */
965     as->repbuf[n + as->repbufsize] = '\0';
966     if (n > 0) {
967         as->repbufsize += n;
968         if(as->repbufsize >= (as->bufsize - 1)) {
969             as->bufsize *= 2;
970             repbuf_temp = alloc(as->bufsize);
971             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
972             amfree(as->repbuf);
973             as->repbuf = repbuf_temp;
974         }
975         else if(as->send_partial_reply) {
976             amfree(as->rep_pkt.body);
977             pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
978             do_sendpkt(as->security_handle, &as->rep_pkt);
979             amfree(as->rep_pkt.body);
980             pkt_init_empty(&as->rep_pkt, P_REP);
981         }
982  
983         return (A_PENDING);
984     }
985
986     /*
987      * If we got 0, then we hit EOF.  Process the data and release
988      * the timeout.
989      */
990     assert(n == 0);
991
992     assert(as->ev_repfd != NULL);
993     event_release(as->ev_repfd);
994     as->ev_repfd = NULL;
995
996     assert(as->ev_reptimeout != NULL);
997     event_release(as->ev_reptimeout);
998     as->ev_reptimeout = NULL;
999
1000     as->state = s_processrep;
1001     aclose(as->repfd);
1002     return (A_CONTINUE);
1003 }
1004
1005 /*
1006  * After we have read in all of the rep data, we process it and send
1007  * it out as a REP packet.
1008  */
1009 static action_t
1010 s_processrep(
1011     struct active_service *     as,
1012     action_t                    action,
1013     pkt_t *                     pkt)
1014 {
1015     char *tok, *repbuf;
1016
1017     (void)action;       /* Quiet unused parameter warning */
1018     (void)pkt;          /* Quiet unused parameter warning */
1019
1020     /*
1021      * Copy the rep lines into the outgoing packet.
1022      *
1023      * If this line is a CONNECT, translate it
1024      * Format is "CONNECT <tag> <handle> <tag> <handle> etc...
1025      * Example:
1026      *
1027      *  CONNECT DATA 4 MESG 5 INDEX 6
1028      *
1029      * The tags are arbitrary.  The handles are in the DATA_FD pool.
1030      * We need to map these to security streams and pass them back
1031      * to the amanda server.  If the handle is -1, then we don't map.
1032      */
1033     if (strncmp_const(as->repbuf,"KENCRYPT\n") == 0) {
1034         amandad_kencrypt = KENCRYPT_WILL_DO;
1035         repbuf = stralloc(as->repbuf + 9);
1036     } else {
1037         repbuf = stralloc(as->repbuf);
1038     }
1039     amfree(as->rep_pkt.body);
1040     pkt_init_empty(&as->rep_pkt, P_REP);
1041     tok = strtok(repbuf, " ");
1042     if (tok == NULL)
1043         goto error;
1044     if (strcmp(tok, "CONNECT") == 0) {
1045         char *line, *nextbuf;
1046
1047         /* Save the entire line */
1048         line = strtok(NULL, "\n");
1049         /* Save the buf following the line */
1050         nextbuf = strtok(NULL, "");
1051
1052         if (line == NULL || nextbuf == NULL)
1053             goto error;
1054
1055         pkt_cat(&as->rep_pkt, "CONNECT");
1056
1057         /* loop over the id/handle pairs */
1058         for (;;) {
1059             /* id */
1060             tok = strtok(line, " ");
1061             line = NULL;        /* keep working from line */
1062             if (tok == NULL)
1063                 break;
1064             pkt_cat(&as->rep_pkt, " %s", tok);
1065
1066             /* handle */
1067             tok = strtok(NULL, " \n");
1068             if (tok == NULL)
1069                 goto error;
1070             /* convert the handle into something the server can process */
1071             pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
1072         }
1073         pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
1074     } else {
1075 error:
1076         pkt_cat(&as->rep_pkt, "%s", as->repbuf);
1077     }
1078
1079     /*
1080      * We've setup our REP packet in as->rep_pkt.  Now move to the transmission
1081      * state.
1082      */
1083     as->state = s_sendrep;
1084     as->repretry = getconf_int(CNF_REP_TRIES);
1085     amfree(repbuf);
1086     return (A_CONTINUE);
1087 }
1088
1089 /*
1090  * This is the state where we send the REP we just collected from our child.
1091  */
1092 static action_t
1093 s_sendrep(
1094     struct active_service *     as,
1095     action_t                    action,
1096     pkt_t *                     pkt)
1097 {
1098     (void)action;       /* Quiet unused parameter warning */
1099     (void)pkt;          /* Quiet unused parameter warning */
1100
1101     /*
1102      * Transmit it and move to the ack state.
1103      */
1104     do_sendpkt(as->security_handle, &as->rep_pkt);
1105     security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
1106     as->state = s_ackwait;
1107     return (A_PENDING);
1108 }
1109
1110 /*
1111  * This is the state in which we wait for the server to ACK the REP
1112  * we just sent it.
1113  */
1114 static action_t
1115 s_ackwait(
1116     struct active_service *     as,
1117     action_t                    action,
1118     pkt_t *                     pkt)
1119 {
1120     struct datafd_handle *dh;
1121     int npipes;
1122
1123     /*
1124      * If we got a timeout, try again, but eventually give up.
1125      */
1126     if (action == A_TIMEOUT) {
1127         if (--as->repretry > 0) {
1128             as->state = s_sendrep;
1129             return (A_CONTINUE);
1130         }
1131         dbprintf(_("timeout waiting for ACK for our REP\n"));
1132         return (A_FINISH);
1133     }
1134     amandad_debug(1, _("received ACK, now opening streams\n"));
1135
1136     assert(action == A_RECVPKT);
1137
1138     if (pkt->type == P_REQ) {
1139         dbprintf(_("received dup P_REQ packet, resending REP\n"));
1140         as->state = s_sendrep;
1141         return (A_CONTINUE);
1142     }
1143
1144     if (pkt->type != P_ACK)
1145         return (A_SENDNAK);
1146
1147     if (amandad_kencrypt == KENCRYPT_WILL_DO) {
1148         amandad_kencrypt = KENCRYPT_YES;
1149     }
1150
1151     /*
1152      * Got the ack, now open the pipes
1153      */
1154     for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1155         if (dh->netfd == NULL)
1156             continue;
1157         if (security_stream_accept(dh->netfd) < 0) {
1158             dbprintf(_("stream %td accept failed: %s\n"),
1159                 dh - &as->data[0], security_geterror(as->security_handle));
1160             security_stream_close(dh->netfd);
1161             dh->netfd = NULL;
1162             continue;
1163         }
1164
1165         /* setup an event for reads from it.  As a special case, don't start
1166          * listening on as->data[0] until we read some data on another fd, if
1167          * the service is sendbackup.  This ensures that we send a MESG or 
1168          * INDEX token before any DATA tokens, as dumper assumes. This is a
1169          * hack, if that wasn't already obvious! */
1170         if (dh != &as->data[0] || as->service != SERVICE_SENDBACKUP) {
1171             dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1172                                          process_readnetfd, dh);
1173         } else {
1174             amandad_debug(1, "Skipping registration of sendbackup's data FD\n");
1175         }
1176
1177         security_stream_read(dh->netfd, process_writenetfd, dh);
1178
1179     }
1180
1181     /*
1182      * Pipes are open, so auth them.  Count them at the same time.
1183      */
1184     for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1185         if (dh->netfd == NULL)
1186             continue;
1187         if (security_stream_auth(dh->netfd) < 0) {
1188             security_stream_close(dh->netfd);
1189             dh->netfd = NULL;
1190             event_release(dh->ev_read);
1191             event_release(dh->ev_write);
1192             dh->ev_read = NULL;
1193             dh->ev_write = NULL;
1194         } else {
1195             npipes++;
1196         }
1197     }
1198
1199     /*
1200      * If no pipes are open, then we're done.  Otherwise, just start running.
1201      * The event handlers on all of the pipes will take it from here.
1202      */
1203     amandad_debug(1, _("at end of s_ackwait, npipes is %d\n"), npipes);
1204     if (npipes == 0)
1205         return (A_FINISH);
1206     else {
1207         security_close(as->security_handle);
1208         as->security_handle = NULL;
1209         return (A_PENDING);
1210     }
1211 }
1212
1213 /*
1214  * Called when a repfd has received data
1215  */
1216 static void
1217 repfd_recv(
1218     void *      cookie)
1219 {
1220     struct active_service *as = cookie;
1221
1222     assert(as != NULL);
1223     assert(as->ev_repfd != NULL);
1224
1225     state_machine(as, A_RECVREP, NULL);
1226 }
1227
1228 /*
1229  * Called when a errfd has received data
1230  */
1231 static void
1232 errfd_recv(
1233     void *      cookie)
1234 {
1235     struct active_service *as = cookie;
1236     char  buf[32769];
1237     int   n;
1238     char *r;
1239
1240     assert(as != NULL);
1241     assert(as->ev_errfd != NULL);
1242
1243     n = read(as->errfd, &buf, 32768);
1244     /* merge buffer */
1245     if (n > 0) {
1246         /* Terminate it with '\0' */
1247         buf[n+1] = '\0';
1248
1249         if (as->errbuf) {
1250             as->errbuf = vstrextend(&as->errbuf, buf, NULL);
1251         } else {
1252             as->errbuf = stralloc(buf);
1253         }
1254     } else if (n == 0) {
1255         event_release(as->ev_errfd);
1256         as->ev_errfd = NULL;
1257     } else { /* n < 0 */
1258         event_release(as->ev_errfd);
1259         as->ev_errfd = NULL;
1260         g_snprintf(buf, 32768,
1261                    "error reading stderr or service: %s\n", strerror(errno));
1262     }
1263
1264     /* for each line terminate by '\n' */
1265     while (as->errbuf != NULL  && (r = strchr(as->errbuf, '\n')) != NULL) {
1266         char *s;
1267
1268         *r = '\0';
1269         s = vstrallocf("ERROR service %s: %s\n",
1270                        services[as->service].name, as->errbuf);
1271
1272         /* Add to repbuf, error message will be in the REP packet if it
1273          * is not already sent
1274          */
1275         n = strlen(s);
1276         if (as->bufsize == 0) {
1277             as->bufsize = NETWORK_BLOCK_BYTES;
1278             as->repbuf = alloc(as->bufsize);
1279         }
1280         while (as->bufsize < as->repbufsize + n) {
1281             char *repbuf_temp;
1282             as->bufsize *= 2;
1283             repbuf_temp = alloc(as->bufsize);
1284             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
1285             amfree(as->repbuf);
1286             as->repbuf = repbuf_temp;
1287         }
1288         memcpy(as->repbuf + as->repbufsize, s, n);
1289         as->repbufsize += n;
1290
1291         dbprintf("%s", s);
1292
1293         /* remove first line from buffer */
1294         r++;
1295         s = stralloc(r);
1296         amfree(as->errbuf);
1297         as->errbuf = s;
1298     }
1299 }
1300
1301 /*
1302  * Called when a repfd has timed out
1303  */
1304 static void
1305 timeout_repfd(
1306     void *      cookie)
1307 {
1308     struct active_service *as = cookie;
1309
1310     assert(as != NULL);
1311     assert(as->ev_reptimeout != NULL);
1312
1313     state_machine(as, A_TIMEOUT, NULL);
1314 }
1315
1316 /*
1317  * Called when a handle has received data
1318  */
1319 static void
1320 protocol_recv(
1321     void *              cookie,
1322     pkt_t *             pkt,
1323     security_status_t   status)
1324 {
1325     struct active_service *as = cookie;
1326
1327     assert(as != NULL);
1328
1329     switch (status) {
1330     case S_OK:
1331         dbprintf(_("received %s pkt:\n<<<<<\n%s>>>>>\n"),
1332             pkt_type2str(pkt->type), pkt->body);
1333         state_machine(as, A_RECVPKT, pkt);
1334         break;
1335     case S_TIMEOUT:
1336         dbprintf(_("timeout\n"));
1337         state_machine(as, A_TIMEOUT, NULL);
1338         break;
1339     case S_ERROR:
1340         dbprintf(_("receive error: %s\n"),
1341             security_geterror(as->security_handle));
1342         break;
1343     }
1344 }
1345
1346 /*
1347  * This is a generic relay function that just reads data from one of
1348  * the process's pipes and passes it up the equivalent security_stream_t
1349  */
1350 static void
1351 process_readnetfd(
1352     void *      cookie)
1353 {
1354     pkt_t nak;
1355     struct datafd_handle *dh = cookie;
1356     struct active_service *as = dh->as;
1357     ssize_t n;
1358
1359     nak.body = NULL;
1360
1361     do {
1362         n = read(dh->fd_read, as->databuf, SIZEOF(as->databuf));
1363     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
1364
1365     /*
1366      * Process has died.
1367      */
1368     if (n < 0) {
1369         pkt_init(&nak, P_NAK, _("A ERROR data descriptor %d broken: %s\n"),
1370             dh->fd_read, strerror(errno));
1371         goto sendnak;
1372     }
1373     /*
1374      * Process has closed the pipe.  Just remove this event handler.
1375      * If all pipes are closed, shut down this service.
1376      */
1377     if (n == 0) {
1378         event_release(dh->ev_read);
1379         dh->ev_read = NULL;
1380         if(dh->ev_write == NULL) {
1381             security_stream_close(dh->netfd);
1382             dh->netfd = NULL;
1383         }
1384         for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1385             if (dh->netfd != NULL)
1386                 return;
1387         }
1388         service_delete(as);
1389         return;
1390     }
1391
1392     /* Handle the special case of recognizing "sendbackup info end"
1393      * from sendbackup's MESG fd */
1394     if (as->service == SERVICE_SENDBACKUP && !as->seen_info_end && dh == &as->data[1]) {
1395         /* make a buffer containing the combined data from info_end_buf
1396          * and what we've read this time, and search it for info_end_strj
1397          * This includes a NULL byte for strstr's sanity. */
1398         char *combined_buf = malloc(INFO_END_LEN + n + 1);
1399         memcpy(combined_buf, as->info_end_buf, INFO_END_LEN);
1400         memcpy(combined_buf+INFO_END_LEN, as->databuf, n);
1401         combined_buf[INFO_END_LEN+n] = '\0';
1402
1403         as->seen_info_end = (strstr(combined_buf, info_end_str) != NULL);
1404
1405         /* fill info_end_buf from the tail end of combined_buf */
1406         memcpy(as->info_end_buf, combined_buf + n, INFO_END_LEN);
1407
1408         /* if we did see info_end_str, start reading the data fd (fd 0) */
1409         if (as->seen_info_end) {
1410             struct datafd_handle *dh = &as->data[0];
1411             amandad_debug(1, "Opening datafd to sendbackup (delayed until sendbackup sent header info)\n");
1412             dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1413                                          process_readnetfd, dh);
1414         } else {
1415             amandad_debug(1, "sendbackup header info still not complete\n");
1416         }
1417     }
1418
1419     if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
1420         /* stream has croaked */
1421         pkt_init(&nak, P_NAK, _("ERROR write error on stream %d: %s\n"),
1422             security_stream_id(dh->netfd),
1423             security_stream_geterror(dh->netfd));
1424         goto sendnak;
1425     }
1426     return;
1427
1428 sendnak:
1429     do_sendpkt(as->security_handle, &nak);
1430     service_delete(as);
1431     amfree(nak.body);
1432 }
1433
1434 /*
1435  * This is a generic relay function that just read data from one of
1436  * the security_stream_t and passes it up the equivalent process's pipes
1437  */
1438 static void
1439 process_writenetfd(
1440     void *      cookie,
1441     void *      buf,
1442     ssize_t     size)
1443 {
1444     struct datafd_handle *dh;
1445
1446     assert(cookie != NULL);
1447     dh = cookie;
1448
1449     if (dh->fd_write <= 0) {
1450         dbprintf(_("process_writenetfd: dh->fd_write <= 0\n"));
1451     } else if (size > 0) {
1452         full_write(dh->fd_write, buf, (size_t)size);
1453         security_stream_read(dh->netfd, process_writenetfd, dh);
1454     }
1455     else {
1456         aclose(dh->fd_write);
1457     }
1458 }
1459
1460
1461 /*
1462  * Convert a local stream handle (DATA_FD...) into something that
1463  * can be sent to the amanda server.
1464  *
1465  * Returns a number that should be sent to the server in the REP packet.
1466  */
1467 static int
1468 allocstream(
1469     struct active_service *     as,
1470     int                         handle)
1471 {
1472     struct datafd_handle *dh;
1473
1474     /* if the handle is -1, then we don't bother */
1475     if (handle < 0)
1476         return (-1);
1477
1478     /* make sure the handle's kosher */
1479     if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
1480         return (-1);
1481
1482     /* get a pointer into our handle array */
1483     dh = &as->data[handle - DATA_FD_OFFSET];
1484
1485     /* make sure we're not already using the net handle */
1486     if (dh->netfd != NULL)
1487         return (-1);
1488
1489     /* allocate a stream from the security layer and return */
1490     dh->netfd = security_stream_server(as->security_handle);
1491     if (dh->netfd == NULL) {
1492         dbprintf(_("couldn't open stream to server: %s\n"),
1493             security_geterror(as->security_handle));
1494         return (-1);
1495     }
1496
1497     /*
1498      * convert the stream into a numeric id that can be sent to the
1499      * remote end.
1500      */
1501     return (security_stream_id(dh->netfd));
1502 }
1503
1504 /*
1505  * Create a new service instance
1506  */
1507 static struct active_service *
1508 service_new(
1509     security_handle_t * security_handle,
1510     const char *        cmd,
1511     service_t           service,
1512     const char *        arguments)
1513 {
1514     int i;
1515     int data_read[DATA_FD_COUNT + 2][2];
1516     int data_write[DATA_FD_COUNT + 2][2];
1517     struct active_service *as;
1518     pid_t pid;
1519     int newfd;
1520
1521     assert(security_handle != NULL);
1522     assert(cmd != NULL);
1523     assert(arguments != NULL);
1524
1525     /* a plethora of pipes */
1526     /* data_read[0]                : stdin
1527      * data_write[0]               : stdout
1528      * data_read[1], data_write[1] : first  stream
1529      * data_read[2], data_write[2] : second stream
1530      * data_read[3], data_write[3] : third stream
1531      * data_write[4]               : stderr
1532      */
1533     for (i = 0; i < DATA_FD_COUNT + 1; i++) {
1534         if (pipe(data_read[i]) < 0) {
1535             error(_("pipe: %s\n"), strerror(errno));
1536             /*NOTREACHED*/
1537         }
1538         if (pipe(data_write[i]) < 0) {
1539             error(_("pipe: %s\n"), strerror(errno));
1540             /*NOTREACHED*/
1541         }
1542     }
1543     if (pipe(data_write[STDERR_PIPE]) < 0) {
1544         error(_("pipe: %s\n"), strerror(errno));
1545         /*NOTREACHED*/
1546     }
1547
1548     switch(pid = fork()) {
1549     case -1:
1550         error(_("could not fork service %s: %s\n"), cmd, strerror(errno));
1551         /*NOTREACHED*/
1552     default:
1553         /*
1554          * The parent.  Close the far ends of our pipes and return.
1555          */
1556         as = g_new0(struct active_service, 1);
1557         as->cmd = stralloc(cmd);
1558         as->arguments = stralloc(arguments);
1559         as->security_handle = security_handle;
1560         as->state = NULL;
1561         as->service = service;
1562         as->pid = pid;
1563         as->send_partial_reply = 0;
1564         as->seen_info_end = FALSE;
1565         /* fill in info_end_buf with non-null characters */
1566         memset(as->info_end_buf, '-', sizeof(as->info_end_buf));
1567         if(service == SERVICE_SENDSIZE) {
1568             g_option_t *g_options;
1569             char *option_str, *p;
1570
1571             option_str = stralloc(as->arguments+8);
1572             p = strchr(option_str,'\n');
1573             if(p) *p = '\0';
1574
1575             g_options = parse_g_options(option_str, 1);
1576             if(am_has_feature(g_options->features, fe_partial_estimate)) {
1577                 as->send_partial_reply = 1;
1578             }
1579             free_g_options(g_options);
1580             amfree(option_str);
1581         }
1582
1583         /* write to the request pipe */
1584         aclose(data_read[0][0]);
1585         as->reqfd = data_read[0][1];
1586
1587         /*
1588          * read from the reply pipe
1589          */
1590         as->repfd = data_write[0][0];
1591         aclose(data_write[0][1]);
1592         as->ev_repfd = NULL;
1593         as->repbuf = NULL;
1594         as->repbufsize = 0;
1595         as->bufsize = 0;
1596         as->repretry = 0;
1597         as->rep_pkt.body = NULL;
1598
1599         /*
1600          * read from the stderr pipe
1601          */
1602         as->errfd = data_write[STDERR_PIPE][0];
1603         aclose(data_write[STDERR_PIPE][1]);
1604         as->ev_errfd = NULL;
1605         as->errbuf = NULL;
1606
1607         /*
1608          * read from the rest of the general-use pipes
1609          * (netfds are opened as the client requests them)
1610          */
1611         for (i = 0; i < DATA_FD_COUNT; i++) {
1612             aclose(data_read[i + 1][1]);
1613             aclose(data_write[i + 1][0]);
1614             as->data[i].fd_read = data_read[i + 1][0];
1615             as->data[i].fd_write = data_write[i + 1][1];
1616             as->data[i].ev_read = NULL;
1617             as->data[i].ev_write = NULL;
1618             as->data[i].netfd = NULL;
1619             as->data[i].as = as;
1620         }
1621
1622         /* add it to the service queue */
1623         /* increment the active service count */
1624         TAILQ_INSERT_TAIL(&serviceq.tailq, as, tq);
1625         serviceq.qlength++;
1626
1627         return (as);
1628     case 0:
1629         /*
1630          * The child.  Put our pipes in their advertised locations
1631          * and start up.
1632          */
1633
1634         /*
1635          * The data stream is stdin in the new process
1636          */
1637         if (dup2(data_read[0][0], 0) < 0) {
1638             error(_("dup %d to %d failed: %s\n"), data_read[0][0], 0,
1639                 strerror(errno));
1640             /*NOTREACHED*/
1641         }
1642         aclose(data_read[0][0]);
1643         aclose(data_read[0][1]);
1644
1645         /*
1646          * The reply stream is stdout
1647          */
1648         if (dup2(data_write[0][1], 1) < 0) {
1649             error(_("dup %d to %d failed: %s\n"), data_write[0][1], 1,
1650                 strerror(errno));
1651         }
1652         aclose(data_write[0][0]);
1653         aclose(data_write[0][1]);
1654
1655         for (i = 0; i < DATA_FD_COUNT; i++) {
1656             aclose(data_read[i + 1][0]);
1657             aclose(data_write[i + 1][1]);
1658         }
1659
1660         /*
1661          *  Make sure they are not open in the range DATA_FD_OFFSET to
1662          *      DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1
1663          */
1664         for (i = 0; i < DATA_FD_COUNT; i++) {
1665             while(data_read[i + 1][1] >= DATA_FD_OFFSET &&
1666                   data_read[i + 1][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1667                 newfd = dup(data_read[i + 1][1]);
1668                 if(newfd == -1)
1669                     error(_("Can't dup out off DATA_FD range"));
1670                 data_read[i + 1][1] = newfd;
1671             }
1672             while(data_write[i + 1][0] >= DATA_FD_OFFSET &&
1673                   data_write[i + 1][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1674                 newfd = dup(data_write[i + 1][0]);
1675                 if(newfd == -1)
1676                     error(_("Can't dup out off DATA_FD range"));
1677                 data_write[i + 1][0] = newfd;
1678             }
1679         }
1680         while(data_write[4][0] >= DATA_FD_OFFSET &&
1681               data_write[4][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1682             newfd = dup(data_write[4][0]);
1683             if (newfd == -1)
1684                 error(_("Can't dup out off DATA_FD range"));
1685             data_write[4][0] = newfd;
1686         }
1687         while(data_write[4][1] >= DATA_FD_OFFSET &&
1688               data_write[4][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1689             newfd = dup(data_write[4][1]);
1690             if (newfd == -1)
1691                 error(_("Can't dup out off DATA_FD range"));
1692             data_write[4][1] = newfd;
1693         }
1694
1695         for (i = 0; i < DATA_FD_COUNT*2; i++)
1696             close(DATA_FD_OFFSET + i);
1697
1698         /*
1699          * The rest start at the offset defined in amandad.h, and continue
1700          * through the internal defined.
1701          */
1702         for (i = 0; i < DATA_FD_COUNT; i++) {
1703             if (dup2(data_read[i + 1][1], i*2 + DATA_FD_OFFSET) < 0) {
1704                 error(_("dup %d to %d failed: %s\n"), data_read[i + 1][1],
1705                     i + DATA_FD_OFFSET, strerror(errno));
1706             }
1707             aclose(data_read[i + 1][1]);
1708
1709             if (dup2(data_write[i + 1][0], i*2 + 1 + DATA_FD_OFFSET) < 0) {
1710                 error(_("dup %d to %d failed: %s\n"), data_write[i + 1][0],
1711                     i + DATA_FD_OFFSET, strerror(errno));
1712             }
1713             aclose(data_write[i + 1][0]);
1714         }
1715
1716         /* close all unneeded fd */
1717         close(STDERR_FILENO);
1718         dup2(data_write[STDERR_PIPE][1], 2);
1719         aclose(data_write[STDERR_PIPE][0]);
1720         aclose(data_write[STDERR_PIPE][1]);
1721         safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
1722
1723         execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env());
1724         error(_("could not exec service %s: %s\n"), cmd, strerror(errno));
1725         /*NOTREACHED*/
1726     }
1727     return NULL;
1728 }
1729
1730 /*
1731  * Unallocate a service instance
1732  */
1733 static void
1734 service_delete(
1735     struct active_service *     as)
1736 {
1737     int i;
1738     struct datafd_handle *dh;
1739
1740     amandad_debug(1, _("closing service: %s\n"),
1741                       (as->cmd)?as->cmd:_("??UNKONWN??"));
1742
1743     assert(as != NULL);
1744
1745     assert(as->cmd != NULL);
1746     amfree(as->cmd);
1747
1748     assert(as->arguments != NULL);
1749     amfree(as->arguments);
1750
1751     if (as->reqfd != -1)
1752         aclose(as->reqfd);
1753     if (as->repfd != -1)
1754         aclose(as->repfd);
1755
1756     if (as->ev_repfd != NULL)
1757         event_release(as->ev_repfd);
1758     if (as->ev_reptimeout != NULL)
1759         event_release(as->ev_reptimeout);
1760
1761     for (i = 0; i < DATA_FD_COUNT; i++) {
1762         dh = &as->data[i];
1763
1764         aclose(dh->fd_read);
1765         aclose(dh->fd_write);
1766
1767         if (dh->netfd != NULL)
1768             security_stream_close(dh->netfd);
1769
1770         if (dh->ev_read != NULL)
1771             event_release(dh->ev_read);
1772         if (dh->ev_write != NULL)
1773             event_release(dh->ev_write);
1774     }
1775
1776     if (as->security_handle != NULL)
1777         security_close(as->security_handle);
1778
1779     assert(as->pid > 0);
1780     kill(as->pid, SIGTERM);
1781     waitpid(as->pid, NULL, WNOHANG);
1782
1783     TAILQ_REMOVE(&serviceq.tailq, as, tq);
1784     assert(serviceq.qlength > 0);
1785     serviceq.qlength--;
1786
1787     amfree(as->cmd);
1788     amfree(as->arguments);
1789     amfree(as->repbuf);
1790     amfree(as->rep_pkt.body);
1791     amfree(as);
1792
1793     if(exit_on_qlength == 0 && serviceq.qlength == 0) {
1794         dbclose();
1795         exit(0);
1796     }
1797 }
1798
1799 /*
1800  * Like 'fullwrite', but does the work in a child process so pipelines
1801  * do not hang.
1802  */
1803 static int
1804 writebuf(
1805     struct active_service *     as,
1806     const void *                bufp,
1807     size_t                      size)
1808 {
1809     pid_t pid;
1810     size_t    writesize;
1811
1812     switch (pid=fork()) {
1813     case -1:
1814         break;
1815
1816     default:
1817         waitpid(pid, NULL, WNOHANG);
1818         return 0;                       /* this is the parent */
1819
1820     case 0:                             /* this is the child */
1821         close(as->repfd);
1822         writesize = full_write(as->reqfd, bufp, size);
1823         exit(writesize != size);
1824         /* NOTREACHED */
1825     }
1826     return -1;
1827 }
1828
1829 static ssize_t
1830 do_sendpkt(
1831     security_handle_t * handle,
1832     pkt_t *             pkt)
1833 {
1834     dbprintf(_("sending %s pkt:\n<<<<<\n%s>>>>>\n"),
1835         pkt_type2str(pkt->type), pkt->body);
1836     if (handle)
1837         return security_sendpkt(handle, pkt);
1838     else
1839         return 1;
1840 }
1841
1842 /*
1843  * Convert a state into a string
1844  */
1845 static const char *
1846 state2str(
1847     state_t     state)
1848 {
1849     static const struct {
1850         state_t state;
1851         const char str[13];
1852     } states[] = {
1853 #define X(state)        { state, stringize(state) }
1854         X(s_sendack),
1855         X(s_repwait),
1856         X(s_processrep),
1857         X(s_sendrep),
1858         X(s_ackwait),
1859 #undef X
1860     };
1861     int i;
1862
1863     for (i = 0; i < (int)(sizeof(states) / sizeof(states[0])); i++)
1864         if (state == states[i].state)
1865             return (states[i].str);
1866     return (_("INVALID STATE"));
1867 }
1868
1869 /*
1870  * Convert an action into a string
1871  */
1872 static const char *
1873 action2str(
1874     action_t    action)
1875 {
1876     static const struct {
1877         action_t action;
1878         const char str[12];
1879     } actions[] = {
1880 #define X(action)       { action, stringize(action) }
1881         X(A_START),
1882         X(A_RECVPKT),
1883         X(A_RECVREP),
1884         X(A_PENDING),
1885         X(A_FINISH),
1886         X(A_CONTINUE),
1887         X(A_SENDNAK),
1888         X(A_TIMEOUT),
1889 #undef X
1890     };
1891     int i;
1892
1893     for (i = 0; i < (int)(sizeof(actions) / sizeof(actions[0])); i++)
1894         if (action == actions[i].action)
1895             return (actions[i].str);
1896     return (_("UNKNOWN ACTION"));
1897 }
1898
1899 static char *
1900 amandad_get_security_conf(
1901     char *      string,
1902     void *      arg)
1903 {
1904     (void)arg;      /* Quiet unused parameter warning */
1905
1906     if (!string || !*string)
1907         return(NULL);
1908
1909     if (strcmp(string, "kencrypt")==0) {
1910         if (amandad_kencrypt == KENCRYPT_YES)
1911             return ("yes");
1912         else
1913             return (NULL);
1914     }
1915     return(NULL);
1916 }
1917