1c47ae446467868dfb2dbb49ef9a04e1bb08e4e5
[debian/amanda] / amandad-src / amandad.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1999 University of Maryland at College Park
4  * All Rights Reserved.
5  *
6  * Permission to use, copy, modify, distribute, and sell this software and its
7  * documentation for any purpose is hereby granted without fee, provided that
8  * the above copyright notice appear in all copies and that both that
9  * copyright notice and this permission notice appear in supporting
10  * documentation, and that the name of U.M. not be used in advertising or
11  * publicity pertaining to distribution of the software without specific,
12  * written prior permission.  U.M. makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as is"
14  * without express or implied warranty.
15  *
16  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22  *
23  * Authors: the Amanda Development Team.  Its members are listed in a
24  * file named AUTHORS, in the root directory of this distribution.
25  */
26
27 /*
28  * $Id: amandad.c,v 1.18 2006/08/21 20:17:09 martinea Exp $
29  *
30  * handle client-host side of Amanda network communications, including
31  * security checks, execution of the proper service, and acking the
32  * master side
33  */
34
35 #include "amanda.h"
36 #include "amandad.h"
37 #include "clock.h"
38 #include "event.h"
39 #include "amfeatures.h"
40 #include "packet.h"
41 #include "version.h"
42 #include "security.h"
43 #include "stream.h"
44 #include "util.h"
45 #include "conffile.h"
46
47 #define REP_TIMEOUT     (6*60*60)       /* secs for service to reply */
48 #define ACK_TIMEOUT     10              /* XXX should be configurable */
49 #define STDERR_PIPE (DATA_FD_COUNT + 1)
50
51 #define amandad_debug(i, ...) do {      \
52         if ((i) <= debug_amandad) {     \
53                 dbprintf(__VA_ARGS__);  \
54         }                               \
55 } while (0)
56
57 /*
58  * These are the actions for entering the state machine
59  */
60 typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
61     A_SENDNAK, A_TIMEOUT } action_t;
62
63 /*
64  * This is a state in the state machine.  It is a function pointer to
65  * the function that actually implements the state.
66  */
67 struct active_service;
68 typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
69
70 /* string that we scan for in sendbackup's MESG stream */
71 static const char info_end_str[] = "sendbackup: info end\n";
72 #define INFO_END_LEN (sizeof(info_end_str)-1)
73
74 /* 
75  * Here are the services that we allow.
76  * Must be in the same order as services[].
77  */
78 typedef enum {
79     SERVICE_NOOP,
80     SERVICE_SENDSIZE,
81     SERVICE_SENDBACKUP,
82     SERVICE_SELFCHECK,
83     SERVICE_AMINDEXD,
84     SERVICE_AMIDXTAPED
85 } service_t;
86
87 static struct services {
88     char *name;
89     int  active;
90     service_t service;
91 } services[] = {
92    { "noop", 1, SERVICE_NOOP },
93    { "sendsize", 1, SERVICE_SENDSIZE },
94    { "sendbackup", 1, SERVICE_SENDBACKUP },
95    { "selfcheck", 1, SERVICE_SELFCHECK },
96    { "amindexd", 0, SERVICE_AMINDEXD },
97    { "amidxtaped", 0, SERVICE_AMIDXTAPED }
98 };
99 #define NSERVICES       (int)(sizeof(services) / sizeof(services[0]))
100
101 /*
102  * This structure describes an active running service.
103  *
104  * An active service is something running that we have received
105  * a request for.  This structure holds info on that service, including
106  * file descriptors for data, etc, as well as the security handle
107  * for communications with the amanda server.
108  */
109 struct active_service {
110     service_t service;                  /* service name */
111     char *cmd;                          /* name of command we ran */
112     char *arguments;                    /* arguments we sent it */
113     security_handle_t *security_handle; /* remote server */
114     state_t state;                      /* how far this has progressed */
115     pid_t pid;                          /* pid of subprocess */
116     int send_partial_reply;             /* send PREP packet */
117     int reqfd;                          /* pipe to write requests */
118     int repfd;                          /* pipe to read replies */
119     int errfd;                          /* pipe to read stderr */
120     event_handle_t *ev_repfd;           /* read event handle for repfd */
121     event_handle_t *ev_reptimeout;      /* timeout for rep data */
122     event_handle_t *ev_errfd;           /* read event handle for errfd */
123     pkt_t rep_pkt;                      /* rep packet we're sending out */
124     char *errbuf;                       /* buffer to read the err into */
125     char *repbuf;                       /* buffer to read the rep into */
126     size_t bufsize;                     /* length of repbuf */
127     size_t repbufsize;                  /* length of repbuf */
128     int repretry;                       /* times we'll retry sending the rep */
129     int seen_info_end;                  /* have we seen "sendbackup info end\n"? */
130     char info_end_buf[INFO_END_LEN];    /* last few bytes read, used for scanning for info end */
131
132     /*
133      * General user streams to the process, and their equivalent
134      * network streams.
135      */
136     struct datafd_handle {
137         int fd_read;                    /* pipe to child process */
138         int fd_write;                   /* pipe to child process */
139         event_handle_t *ev_read;        /* it's read event handle */
140         event_handle_t *ev_write;       /* it's write event handle */
141         security_stream_t *netfd;       /* stream to amanda server */
142         struct active_service *as;      /* pointer back to our enclosure */
143     } data[DATA_FD_COUNT];
144     char databuf[NETWORK_BLOCK_BYTES];  /* buffer to relay netfd data in */
145 };
146
147 /*
148  * Queue of outstanding requests that we are running.
149  */
150 GSList *serviceq = NULL;
151
152 static int wait_30s = 1;
153 static int exit_on_qlength = 1;
154 static char *auth = NULL;
155 static kencrypt_type amandad_kencrypt = KENCRYPT_NONE;
156
157 int main(int argc, char **argv);
158
159 static int allocstream(struct active_service *, int);
160 static void exit_check(void *);
161 static void protocol_accept(security_handle_t *, pkt_t *);
162 static void state_machine(struct active_service *, action_t, pkt_t *);
163
164 static action_t s_sendack(struct active_service *, action_t, pkt_t *);
165 static action_t s_repwait(struct active_service *, action_t, pkt_t *);
166 static action_t s_processrep(struct active_service *, action_t, pkt_t *);
167 static action_t s_sendrep(struct active_service *, action_t, pkt_t *);
168 static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
169
170 static void repfd_recv(void *);
171 static void process_errfd(void *cookie);
172 static void errfd_recv(void *);
173 static void timeout_repfd(void *);
174 static void protocol_recv(void *, pkt_t *, security_status_t);
175 static void process_readnetfd(void *);
176 static void process_writenetfd(void *, void *, ssize_t);
177 static struct active_service *service_new(security_handle_t *,
178     const char *, service_t, const char *);
179 static void service_delete(struct active_service *);
180 static int writebuf(struct active_service *, const void *, size_t);
181 static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
182 static char *amandad_get_security_conf (char *, void *);
183
184 static const char *state2str(state_t);
185 static const char *action2str(action_t);
186
187 int
188 main(
189     int         argc,
190     char **     argv)
191 {
192     int i, j;
193     int have_services;
194     int in, out;
195     const security_driver_t *secdrv;
196     int no_exit = 0;
197     char *pgm = "amandad";              /* in case argv[0] is not set */
198 #if defined(USE_REUSEADDR)
199     const int on = 1;
200     int r;
201 #endif
202
203     /*
204      * Configure program for internationalization:
205      *   1) Only set the message locale for now.
206      *   2) Set textdomain for all amanda related programs to "amanda"
207      *      We don't want to be forced to support dozens of message catalogs.
208      */  
209     setlocale(LC_MESSAGES, "C");
210     textdomain("amanda"); 
211
212     safe_fd(-1, 0);
213     safe_cd();
214
215     /*
216      * When called via inetd, it is not uncommon to forget to put the
217      * argv[0] value on the config line.  On some systems (e.g. Solaris)
218      * this causes argv and/or argv[0] to be NULL, so we have to be
219      * careful getting our name.
220      */
221     if ((argv == NULL) || (argv[0] == NULL)) {
222             pgm = "amandad";            /* in case argv[0] is not set */
223     } else {
224             pgm = basename(argv[0]);    /* Strip of leading path get debug name */
225     }
226     set_pname(pgm);
227     dbopen(DBG_SUBDIR_AMANDAD);
228
229     if(argv == NULL) {
230         error(_("argv == NULL\n"));
231         /*NOTREACHED*/
232     }
233
234     /* Don't die when child closes pipe */
235     signal(SIGPIPE, SIG_IGN);
236
237     /* Parse the configuration; we'll handle errors later */
238     config_init(CONFIG_INIT_CLIENT, NULL);
239
240     if (geteuid() == 0) {
241         check_running_as(RUNNING_AS_ROOT);
242         initgroups(CLIENT_LOGIN, get_client_gid());
243         setgid(get_client_gid());
244         setegid(get_client_gid());
245         seteuid(get_client_uid());
246     } else {
247         check_running_as(RUNNING_AS_CLIENT_LOGIN);
248     }
249
250     add_amanda_log_handler(amanda_log_stderr);
251     add_amanda_log_handler(amanda_log_syslog);
252
253     /*
254      * ad-hoc argument parsing
255      *
256      * We accept        -auth=[authentication type]
257      *                  -no-exit
258      *                  -tcp=[port]
259      *                  -udp=[port]
260      * We also add a list of services that amandad can launch
261      */
262     secdrv = NULL;
263     in = 0; out = 1;            /* default to stdin/stdout */
264     have_services = 0;
265     for (i = 1; i < argc; i++) {
266         /*
267          * Get a driver for a security type specified after -auth=
268          */
269         if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
270             argv[i] += strlen("-auth=");
271             secdrv = security_getdriver(argv[i]);
272             auth = argv[i];
273             if (secdrv == NULL) {
274                 error(_("no driver for security type '%s'\n"), argv[i]);
275                 /*NOTREACHED*/
276             }
277             continue;
278         }
279
280         /*
281          * If -no-exit is specified, always run even after requests have
282          * been satisfied.
283          */
284         else if (strcmp(argv[i], "-no-exit") == 0) {
285             no_exit = 1;
286             continue;
287         }
288
289         /*
290          * Allow us to directly bind to a udp port for debugging.
291          * This may only apply to some security types.
292          */
293         else if (strncmp(argv[i], "-udp=", strlen("-udp=")) == 0) {
294 #ifdef WORKING_IPV6
295             struct sockaddr_in6 sin;
296 #else
297             struct sockaddr_in sin;
298 #endif
299
300             argv[i] += strlen("-udp=");
301 #ifdef WORKING_IPV6
302             in = out = socket(AF_INET6, SOCK_DGRAM, 0);
303 #else
304             in = out = socket(AF_INET, SOCK_DGRAM, 0);
305 #endif
306             if (in < 0) {
307                 error(_("can't create dgram socket: %s\n"), strerror(errno));
308                 /*NOTREACHED*/
309             }
310 #ifdef USE_REUSEADDR
311             r = setsockopt(in, SOL_SOCKET, SO_REUSEADDR,
312                 (void *)&on, (socklen_t_equiv)sizeof(on));
313             if (r < 0) {
314                 dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
315                           strerror(errno));
316             }
317 #endif
318
319 #ifdef WORKING_IPV6
320             sin.sin6_family = (sa_family_t)AF_INET6;
321             sin.sin6_addr = in6addr_any;
322             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
323 #else
324             sin.sin_family = (sa_family_t)AF_INET;
325             sin.sin_addr.s_addr = INADDR_ANY;
326             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
327 #endif
328             if (bind(in, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
329                 error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
330                     strerror(errno));
331                 /*NOTREACHED*/
332             }
333         }
334         /*
335          * Ditto for tcp ports.
336          */
337         else if (strncmp(argv[i], "-tcp=", strlen("-tcp=")) == 0) {
338 #ifdef WORKING_IPV6
339             struct sockaddr_in6 sin;
340 #else
341             struct sockaddr_in sin;
342 #endif
343             int sock;
344             socklen_t_equiv n;
345
346             argv[i] += strlen("-tcp=");
347 #ifdef WORKING_IPV6
348             sock = socket(AF_INET6, SOCK_STREAM, 0);
349 #else
350             sock = socket(AF_INET, SOCK_STREAM, 0);
351 #endif
352             if (sock < 0) {
353                 error(_("can't create tcp socket: %s\n"), strerror(errno));
354                 /*NOTREACHED*/
355             }
356 #ifdef USE_REUSEADDR
357             r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
358                 (void *)&on, (socklen_t_equiv)sizeof(on));
359             if (r < 0) {
360                 dbprintf(_("amandad: setsockopt(SO_REUSEADDR) failed: %s\n"),
361                           strerror(errno));
362             }
363 #endif
364 #ifdef WORKING_IPV6
365             sin.sin6_family = (sa_family_t)AF_INET6;
366             sin.sin6_addr = in6addr_any;
367             sin.sin6_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
368 #else
369             sin.sin_family = (sa_family_t)AF_INET;
370             sin.sin_addr.s_addr = INADDR_ANY;
371             sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
372 #endif
373             if (bind(sock, (struct sockaddr *)&sin, (socklen_t_equiv)sizeof(sin)) < 0) {
374                 error(_("can't bind to port %d: %s\n"), atoi(argv[i]),
375                     strerror(errno));
376                 /*NOTREACHED*/
377             }
378             listen(sock, 10);
379             n = (socklen_t_equiv)sizeof(sin);
380             in = out = accept(sock, (struct sockaddr *)&sin, &n);
381         }
382         /*
383          * It must be a service name
384          */
385         else {
386             /* clear all services */
387             if(!have_services) {
388                 for (j = 0; j < (int)NSERVICES; j++)
389                     services[j].active = 0;
390             }
391             have_services = 1;
392
393             if(strcmp(argv[i],"amdump") == 0) {
394                 services[0].active = 1;
395                 services[1].active = 1;
396                 services[2].active = 1;
397                 services[3].active = 1;
398             }
399             else {
400                 for (j = 0; j < (int)NSERVICES; j++)
401                     if (strcmp(services[j].name, argv[i]) == 0)
402                         break;
403                 if (j == (int)NSERVICES) {
404                     dbprintf(_("%s: invalid service\n"), argv[i]);
405                     exit(1);
406                 }
407                 services[j].active = 1;
408             }
409         }
410     }
411
412     /*
413      * If no security type specified, use BSD
414      */
415     if (secdrv == NULL) {
416         secdrv = security_getdriver("BSD");
417         auth = "bsd";
418         if (secdrv == NULL) {
419             error(_("no driver for default security type 'BSD'\n"));
420             /*NOTREACHED*/
421         }
422     }
423
424     if(strcasecmp(auth, "rsh") == 0 ||
425        strcasecmp(auth, "ssh") == 0 ||
426        strcasecmp(auth, "local") == 0 ||
427        strcasecmp(auth, "bsdtcp") == 0) {
428         wait_30s = 0;
429         exit_on_qlength = 1;
430     }
431
432 #ifndef SINGLE_USERID
433     if (geteuid() == 0) {
434         if (strcasecmp(auth, "krb5") != 0) {
435             struct passwd *pwd;
436             /* lookup our local user name */
437             if ((pwd = getpwnam(CLIENT_LOGIN)) == NULL) {
438                 error(_("getpwnam(%s) failed."), CLIENT_LOGIN);
439             }
440
441             if (pwd->pw_uid != 0) {
442                 error(_("'amandad' must be run as user '%s' when using '%s' authentication"),
443                       CLIENT_LOGIN, auth);
444             }
445         }
446     } else {
447         if (strcasecmp(auth, "krb5") == 0) {
448             error(_("'amandad' must be run as user 'root' when using 'krb5' authentication"));
449         }
450     }
451 #endif
452
453     /* initialize */
454
455     startclock();
456
457     dbprintf(_("version %s\n"), VERSION);
458     for (i = 0; version_info[i] != NULL; i++) {
459         dbprintf("    %s", version_info[i]);
460     }
461
462     if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
463         dbprintf(_("WARNING: argv[0] not defined: check inetd.conf\n"));
464     }
465
466     /* krb5 require the euid to be 0 */
467     if (strcasecmp(auth, "krb5") == 0) {
468         seteuid((uid_t)0);
469     }
470
471     /*
472      * Schedule to call protocol_accept() when new security handles
473      * are created on stdin.
474      */
475     security_accept(secdrv, amandad_get_security_conf, in, out, protocol_accept, NULL);
476
477     /*
478      * Schedule an event that will try to exit every 30 seconds if there
479      * are no requests outstanding.
480      */
481     if(wait_30s)
482         (void)event_register((event_id_t)30, EV_TIME, exit_check, &no_exit);
483
484     /*
485      * Call event_loop() with an arg of 0, telling it to block until all
486      * events are completed.
487      */
488     event_loop(0);
489
490     close(in);
491     close(out);
492     dbclose();
493     return(0);
494 }
495
496 /*
497  * This runs periodically and checks to see if we have any active services
498  * still running.  If we don't, then we quit.
499  */
500 static void
501 exit_check(
502     void *      cookie)
503 {
504     int no_exit;
505
506     assert(cookie != NULL);
507     no_exit = *(int *)cookie;
508
509     /*
510      * If things are still running, then don't exit.
511      */
512     if (g_slist_length(serviceq) > 0)
513         return;
514
515     /*
516      * If the caller asked us to never exit, then we're done
517      */
518     if (no_exit)
519         return;
520
521     dbclose();
522     exit(0);
523 }
524
525 /*
526  * Handles new incoming protocol handles.  This is a callback for
527  * security_accept(), which gets called when new handles are detected.
528  */
529 static void
530 protocol_accept(
531     security_handle_t * handle,
532     pkt_t *             pkt)
533 {
534     pkt_t pkt_out;
535     GSList *iter;
536     struct active_service *as;
537     char *pktbody, *tok, *service, *arguments;
538     char *service_path = NULL;
539     GSList *errlist = NULL;
540     int i;
541
542     pkt_out.body = NULL;
543
544     /*
545      * If handle is NULL, then the connection is closed.
546      */
547     if(handle == NULL) {
548         return;
549     }
550
551     /*
552      * If we have errors (not warnings) from the config file, let the server
553      * know immediately.  Unfortunately, we only get one ERROR line, so if there
554      * are multiple errors, we just show the first.
555      */
556     if (config_errors(&errlist) >= CFGERR_ERRORS) {
557         GSList *iter = errlist;
558         char *errmsg;
559         gboolean multiple_errors = FALSE;
560
561         if (iter) {
562             errmsg = (char *)iter->data;
563             if (iter->next)
564                 multiple_errors = TRUE;
565         } else {
566             errmsg = "(no error message)";
567         }
568
569         pkt_init(&pkt_out, P_NAK, "ERROR %s%s", errmsg,
570             multiple_errors? _(" (additional errors not displayed)"):"");
571         do_sendpkt(handle, &pkt_out);
572         amfree(pkt_out.body);
573         security_close(handle);
574         return;
575     }
576
577     /*
578      * If pkt is NULL, then there was a problem with the new connection.
579      */
580     if (pkt == NULL) {
581         dbprintf(_("accept error: %s\n"), security_geterror(handle));
582         pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
583         do_sendpkt(handle, &pkt_out);
584         amfree(pkt_out.body);
585         security_close(handle);
586         return;
587     }
588
589     dbprintf(_("accept recv %s pkt:\n<<<<<\n%s>>>>>\n"),
590         pkt_type2str(pkt->type), pkt->body);
591
592     /*
593      * If this is not a REQ packet, just forget about it.
594      */
595     if (pkt->type != P_REQ) {
596         dbprintf(_("received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n"),
597             pkt_type2str(pkt->type), pkt->body);
598         security_close(handle);
599         return;
600     }
601
602     pktbody = service = arguments = NULL;
603     as = NULL;
604
605     /*
606      * Parse out the service and arguments
607      */
608
609     pktbody = stralloc(pkt->body);
610
611     tok = strtok(pktbody, " ");
612     if (tok == NULL)
613         goto badreq;
614     if (strcmp(tok, "SERVICE") != 0)
615         goto badreq;
616
617     tok = strtok(NULL, " \n");
618     if (tok == NULL)
619         goto badreq;
620     service = stralloc(tok);
621
622     /* we call everything else 'arguments' */
623     tok = strtok(NULL, "");
624     if (tok == NULL)
625         goto badreq;
626     arguments = stralloc(tok);
627
628     /* see if it's one we allow */
629     for (i = 0; i < (int)NSERVICES; i++)
630         if (services[i].active == 1 && strcmp(services[i].name, service) == 0)
631             break;
632     if (i == (int)NSERVICES) {
633         dbprintf(_("%s: invalid service\n"), service);
634         pkt_init(&pkt_out, P_NAK, _("ERROR %s: invalid service, add '%s' as argument to amandad\n"), service, service);
635         goto send_pkt_out;
636     }
637
638     service_path = vstralloc(amlibexecdir, "/", service, NULL);
639     if (access(service_path, X_OK) < 0) {
640         dbprintf(_("can't execute %s: %s\n"), service_path, strerror(errno));
641             pkt_init(&pkt_out, P_NAK,
642                      _("ERROR execute access to \"%s\" denied\n"),
643                      service_path);
644         goto send_pkt_out;
645     }
646
647     /* see if its already running */
648     for (iter = serviceq; iter != NULL; iter = g_slist_next(iter)) {
649         as = (struct active_service *)iter->data;
650             if (strcmp(as->cmd, service_path) == 0 &&
651                 strcmp(as->arguments, arguments) == 0) {
652                     dbprintf(_("%s %s: already running, acking req\n"),
653                         service, arguments);
654                     pkt_init_empty(&pkt_out, P_ACK);
655                     goto send_pkt_out_no_delete;
656             }
657     }
658
659     /*
660      * create a new service instance, and send the arguments down
661      * the request pipe.
662      */
663     dbprintf(_("creating new service: %s\n%s\n"), service, arguments);
664     as = service_new(handle, service_path, services[i].service, arguments);
665     if (writebuf(as, arguments, strlen(arguments)) < 0) {
666         const char *errmsg = strerror(errno);
667         dbprintf(_("error sending arguments to %s: %s\n"), service, errmsg);
668         pkt_init(&pkt_out, P_NAK, _("ERROR error writing arguments to %s: %s\n"),
669             service, errmsg);
670         goto send_pkt_out;
671     }
672     aclose(as->reqfd);
673
674     amfree(pktbody);
675     amfree(service);
676     amfree(service_path);
677     amfree(arguments);
678
679     /*
680      * Move to the sendack state, and start up the state
681      * machine.
682      */
683     as->state = s_sendack;
684     state_machine(as, A_START, NULL);
685     return;
686
687 badreq:
688     pkt_init(&pkt_out, P_NAK, _("ERROR invalid REQ\n"));
689     dbprintf(_("received invalid %s packet:\n<<<<<\n%s>>>>>\n\n"),
690         pkt_type2str(pkt->type), pkt->body);
691
692 send_pkt_out:
693     if(as)
694         service_delete(as);
695 send_pkt_out_no_delete:
696     amfree(pktbody);
697     amfree(service_path);
698     amfree(service);
699     amfree(arguments);
700     do_sendpkt(handle, &pkt_out);
701     security_close(handle);
702     amfree(pkt_out.body);
703 }
704
705 /*
706  * Handles incoming protocol packets.  Routes responses to the proper
707  * running service.
708  */
709 static void
710 state_machine(
711     struct active_service *     as,
712     action_t                    action,
713     pkt_t *                     pkt)
714 {
715     action_t retaction;
716     state_t curstate;
717     pkt_t nak;
718
719     amandad_debug(1, _("state_machine: %p entering\n"), as);
720     for (;;) {
721         curstate = as->state;
722         amandad_debug(1, _("state_machine: %p curstate=%s action=%s\n"), as,
723                           state2str(curstate), action2str(action));
724         retaction = (*curstate)(as, action, pkt);
725         amandad_debug(1, _("state_machine: %p curstate=%s returned %s (nextstate=%s)\n"),
726                           as, state2str(curstate), action2str(retaction),
727                           state2str(as->state));
728
729         switch (retaction) {
730         /*
731          * State has queued up and is now blocking on input.
732          */
733         case A_PENDING:
734             amandad_debug(1, _("state_machine: %p leaving (A_PENDING)\n"), as);
735             return;
736
737         /*
738          * service has switched states.  Loop.
739          */
740         case A_CONTINUE:
741             break;
742
743         /*
744          * state has determined that the packet it received was bogus.
745          * Send a nak, and return.
746          */
747         case A_SENDNAK:
748             dbprintf(_("received unexpected %s packet\n"),
749                 pkt_type2str(pkt->type));
750             dbprintf(_("<<<<<\n%s----\n\n"), pkt->body);
751             pkt_init(&nak, P_NAK, _("ERROR unexpected packet type %s\n"),
752                 pkt_type2str(pkt->type));
753             do_sendpkt(as->security_handle, &nak);
754             amfree(nak.body);
755             security_recvpkt(as->security_handle, protocol_recv, as, -1);
756             amandad_debug(1, _("state_machine: %p leaving (A_SENDNAK)\n"), as);
757             return;
758
759         /*
760          * Service is done.  Remove it and finish.
761          */
762         case A_FINISH:
763             amandad_debug(1, _("state_machine: %p leaving (A_FINISH)\n"), as);
764             service_delete(as);
765             return;
766
767         default:
768             assert(0);
769             break;
770         }
771     }
772     /*NOTREACHED*/
773 }
774
775 /*
776  * This state just sends an ack.  After that, we move to the repwait
777  * state to wait for REP data to arrive from the subprocess.
778  */
779 static action_t
780 s_sendack(
781     struct active_service *     as,
782     action_t                    action,
783     pkt_t *                     pkt)
784 {
785     pkt_t ack;
786
787     (void)action;       /* Quiet unused parameter warning */
788     (void)pkt;          /* Quiet unused parameter warning */
789
790     pkt_init_empty(&ack, P_ACK);
791     if (do_sendpkt(as->security_handle, &ack) < 0) {
792         dbprintf(_("error sending ACK: %s\n"),
793             security_geterror(as->security_handle));
794         amfree(ack.body);
795         return (A_FINISH);
796     }
797     amfree(ack.body);
798
799     /*
800      * move to the repwait state
801      * Setup a listener for data on the reply fd, but also
802      * listen for packets over the wire, as the server may
803      * poll us if we take a long time.
804      * Setup a timeout that will fire if it takes too long to
805      * receive rep data.
806      */
807     as->state = s_repwait;
808     as->ev_repfd = event_register((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
809     as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
810         timeout_repfd, as);
811     as->errbuf = NULL;
812     as->ev_errfd = event_register((event_id_t)as->errfd, EV_READFD, errfd_recv, as);
813     security_recvpkt(as->security_handle, protocol_recv, as, -1);
814     return (A_PENDING);
815 }
816
817 /*
818  * This is the repwait state.  We have responded to the initial REQ with
819  * an ACK, and we are now waiting for the process we spawned to pass us 
820  * data to send in a REP.
821  */
822 static action_t
823 s_repwait(
824     struct active_service *     as,
825     action_t                    action,
826     pkt_t *                     pkt)
827 {
828     ssize_t   n;
829     char     *repbuf_temp;
830     char     *what;
831     char     *msg;
832     int       code = 0;
833     int       pid;
834     amwait_t  retstat;
835
836     /*
837      * We normally shouldn't receive any packets while waiting
838      * for our REP data, but in some cases we do.
839      */
840     if (action == A_RECVPKT) {
841         assert(pkt != NULL);
842         /*
843          * Another req for something that's running.  Just send an ACK
844          * and go back and wait for more data.
845          */
846         if (pkt->type == P_REQ) {
847             dbprintf(_("received dup P_REQ packet, ACKing it\n"));
848             amfree(as->rep_pkt.body);
849             pkt_init_empty(&as->rep_pkt, P_ACK);
850             do_sendpkt(as->security_handle, &as->rep_pkt);
851             security_recvpkt(as->security_handle, protocol_recv, as, -1);
852             return (A_PENDING);
853         }
854         /* something unexpected.  Nak it */
855         return (A_SENDNAK);
856     }
857
858     if (action == A_TIMEOUT) {
859         amfree(as->rep_pkt.body);
860         pkt_init(&as->rep_pkt, P_NAK, _("ERROR timeout on reply pipe\n"));
861         dbprintf(_("%s timed out waiting for REP data\n"), as->cmd);
862         do_sendpkt(as->security_handle, &as->rep_pkt);
863         return (A_FINISH);
864     }
865
866     assert(action == A_RECVREP);
867     if(as->bufsize == 0) {
868         as->bufsize = NETWORK_BLOCK_BYTES;
869         as->repbuf = alloc(as->bufsize);
870     }
871
872     do {
873         n = read(as->repfd, as->repbuf + as->repbufsize,
874                  as->bufsize - as->repbufsize - 1);
875     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
876     if (n < 0) {
877         const char *errstr = strerror(errno);
878         dbprintf(_("read error on reply pipe: %s\n"), errstr);
879         amfree(as->rep_pkt.body);
880         pkt_init(&as->rep_pkt, P_NAK, _("ERROR read error on reply pipe: %s\n"),
881                  errstr);
882         do_sendpkt(as->security_handle, &as->rep_pkt);
883         return (A_FINISH);
884     }
885
886     /* If end of service, wait for process status */
887     if (n == 0) {
888         pid = waitpid(as->pid, &retstat, WNOHANG);
889         if (as->service  == SERVICE_NOOP ||
890             as->service  == SERVICE_SENDSIZE ||
891             as->service  == SERVICE_SELFCHECK) {
892             int t = 0;
893             while (t<5 && pid == 0) {
894                 sleep(1);
895                 t++;
896                 pid = waitpid(as->pid, &retstat, WNOHANG);
897             }
898         }
899
900         process_errfd(as);
901
902         if (pid == 0)
903             pid = waitpid(as->pid, &retstat, WNOHANG);
904
905         if (pid > 0) {
906             what = NULL;
907             if (! WIFEXITED(retstat)) {
908                 what = _("signal");
909                 code = WTERMSIG(retstat);
910             } else if (WEXITSTATUS(retstat) != 0) {
911                 what = _("code");
912                 code = WEXITSTATUS(retstat);
913             }
914             if (what) {
915                 dbprintf(_("service %s failed: pid %u exited with %s %d\n"),
916                          (as->cmd)?as->cmd:_("??UNKONWN??"),
917                          (unsigned)as->pid,
918                          what, code);
919                 msg = vstrallocf(
920                      _("ERROR service %s failed: pid %u exited with %s %d\n"),
921                      (as->cmd)?as->cmd:_("??UNKONWN??"), (unsigned)as->pid,
922                      what, code);
923                 if (as->repbufsize + strlen(msg) >= (as->bufsize - 1)) {
924                         as->bufsize *= 2;
925                         repbuf_temp = alloc(as->bufsize);
926                         memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
927                         amfree(as->repbuf);
928                         as->repbuf = repbuf_temp;
929                 }
930                 strcpy(as->repbuf + as->repbufsize, msg);
931                 as->repbufsize += strlen(msg);
932                 amfree(msg);
933             }
934         }
935     }
936
937     /*
938      * If we got some data, go back and wait for more, or EOF.  Nul terminate
939      * the buffer first.
940      */
941     as->repbuf[n + as->repbufsize] = '\0';
942     if (n > 0) {
943         as->repbufsize += n;
944         if(as->repbufsize >= (as->bufsize - 1)) {
945             as->bufsize *= 2;
946             repbuf_temp = alloc(as->bufsize);
947             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
948             amfree(as->repbuf);
949             as->repbuf = repbuf_temp;
950         }
951         else if(as->send_partial_reply) {
952             amfree(as->rep_pkt.body);
953             pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
954             do_sendpkt(as->security_handle, &as->rep_pkt);
955             amfree(as->rep_pkt.body);
956             pkt_init_empty(&as->rep_pkt, P_REP);
957         }
958  
959         return (A_PENDING);
960     }
961
962     /*
963      * If we got 0, then we hit EOF.  Process the data and release
964      * the timeout.
965      */
966     assert(n == 0);
967
968     assert(as->ev_repfd != NULL);
969     event_release(as->ev_repfd);
970     as->ev_repfd = NULL;
971
972     assert(as->ev_reptimeout != NULL);
973     event_release(as->ev_reptimeout);
974     as->ev_reptimeout = NULL;
975
976     as->state = s_processrep;
977     aclose(as->repfd);
978     return (A_CONTINUE);
979 }
980
981 /*
982  * After we have read in all of the rep data, we process it and send
983  * it out as a REP packet.
984  */
985 static action_t
986 s_processrep(
987     struct active_service *     as,
988     action_t                    action,
989     pkt_t *                     pkt)
990 {
991     char *tok, *repbuf;
992
993     (void)action;       /* Quiet unused parameter warning */
994     (void)pkt;          /* Quiet unused parameter warning */
995
996     /*
997      * Copy the rep lines into the outgoing packet.
998      *
999      * If this line is a CONNECT, translate it
1000      * Format is "CONNECT <tag> <handle> <tag> <handle> etc...
1001      * Example:
1002      *
1003      *  CONNECT DATA 4 MESG 5 INDEX 6
1004      *
1005      * The tags are arbitrary.  The handles are in the DATA_FD pool.
1006      * We need to map these to security streams and pass them back
1007      * to the amanda server.  If the handle is -1, then we don't map.
1008      */
1009     if (strncmp_const(as->repbuf,"KENCRYPT\n") == 0) {
1010         amandad_kencrypt = KENCRYPT_WILL_DO;
1011         repbuf = stralloc(as->repbuf + 9);
1012     } else {
1013         repbuf = stralloc(as->repbuf);
1014     }
1015     amfree(as->rep_pkt.body);
1016     pkt_init_empty(&as->rep_pkt, P_REP);
1017     tok = strtok(repbuf, " ");
1018     if (tok == NULL)
1019         goto error;
1020     if (strcmp(tok, "CONNECT") == 0) {
1021         char *line, *nextbuf;
1022
1023         /* Save the entire line */
1024         line = strtok(NULL, "\n");
1025         /* Save the buf following the line */
1026         nextbuf = strtok(NULL, "");
1027
1028         if (line == NULL || nextbuf == NULL)
1029             goto error;
1030
1031         pkt_cat(&as->rep_pkt, "CONNECT");
1032
1033         /* loop over the id/handle pairs */
1034         for (;;) {
1035             /* id */
1036             tok = strtok(line, " ");
1037             line = NULL;        /* keep working from line */
1038             if (tok == NULL)
1039                 break;
1040             pkt_cat(&as->rep_pkt, " %s", tok);
1041
1042             /* handle */
1043             tok = strtok(NULL, " \n");
1044             if (tok == NULL)
1045                 goto error;
1046             /* convert the handle into something the server can process */
1047             pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
1048         }
1049         pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
1050     } else {
1051 error:
1052         pkt_cat(&as->rep_pkt, "%s", as->repbuf);
1053     }
1054
1055     /*
1056      * We've setup our REP packet in as->rep_pkt.  Now move to the transmission
1057      * state.
1058      */
1059     as->state = s_sendrep;
1060     as->repretry = getconf_int(CNF_REP_TRIES);
1061     amfree(repbuf);
1062     return (A_CONTINUE);
1063 }
1064
1065 /*
1066  * This is the state where we send the REP we just collected from our child.
1067  */
1068 static action_t
1069 s_sendrep(
1070     struct active_service *     as,
1071     action_t                    action,
1072     pkt_t *                     pkt)
1073 {
1074     (void)action;       /* Quiet unused parameter warning */
1075     (void)pkt;          /* Quiet unused parameter warning */
1076
1077     /*
1078      * Transmit it and move to the ack state.
1079      */
1080     do_sendpkt(as->security_handle, &as->rep_pkt);
1081     security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
1082     as->state = s_ackwait;
1083     return (A_PENDING);
1084 }
1085
1086 /*
1087  * This is the state in which we wait for the server to ACK the REP
1088  * we just sent it.
1089  */
1090 static action_t
1091 s_ackwait(
1092     struct active_service *     as,
1093     action_t                    action,
1094     pkt_t *                     pkt)
1095 {
1096     struct datafd_handle *dh;
1097     int npipes;
1098
1099     /*
1100      * If we got a timeout, try again, but eventually give up.
1101      */
1102     if (action == A_TIMEOUT) {
1103         if (--as->repretry > 0) {
1104             as->state = s_sendrep;
1105             return (A_CONTINUE);
1106         }
1107         dbprintf(_("timeout waiting for ACK for our REP\n"));
1108         return (A_FINISH);
1109     }
1110     amandad_debug(1, _("received ACK, now opening streams\n"));
1111
1112     assert(action == A_RECVPKT);
1113
1114     if (pkt->type == P_REQ) {
1115         dbprintf(_("received dup P_REQ packet, resending REP\n"));
1116         as->state = s_sendrep;
1117         return (A_CONTINUE);
1118     }
1119
1120     if (pkt->type != P_ACK)
1121         return (A_SENDNAK);
1122
1123     if (amandad_kencrypt == KENCRYPT_WILL_DO) {
1124         amandad_kencrypt = KENCRYPT_YES;
1125     }
1126
1127     /*
1128      * Got the ack, now open the pipes
1129      */
1130     for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1131         if (dh->netfd == NULL)
1132             continue;
1133         dbprintf("opening security stream for fd %d\n", (int)(dh - as->data) + DATA_FD_OFFSET);
1134         if (security_stream_accept(dh->netfd) < 0) {
1135             dbprintf(_("stream %td accept failed: %s\n"),
1136                 dh - &as->data[0], security_geterror(as->security_handle));
1137             security_stream_close(dh->netfd);
1138             dh->netfd = NULL;
1139             continue;
1140         }
1141
1142         /* setup an event for reads from it.  As a special case, don't start
1143          * listening on as->data[0] until we read some data on another fd, if
1144          * the service is sendbackup.  This ensures that we send a MESG or 
1145          * INDEX token before any DATA tokens, as dumper assumes. This is a
1146          * hack, if that wasn't already obvious! */
1147         if (dh != &as->data[0] || as->service != SERVICE_SENDBACKUP) {
1148             dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1149                                          process_readnetfd, dh);
1150         } else {
1151             amandad_debug(1, "Skipping registration of sendbackup's data FD\n");
1152         }
1153
1154         security_stream_read(dh->netfd, process_writenetfd, dh);
1155
1156     }
1157
1158     /*
1159      * Pipes are open, so auth them.  Count them at the same time.
1160      */
1161     for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1162         if (dh->netfd == NULL)
1163             continue;
1164         if (security_stream_auth(dh->netfd) < 0) {
1165             security_stream_close(dh->netfd);
1166             dh->netfd = NULL;
1167             event_release(dh->ev_read);
1168             event_release(dh->ev_write);
1169             dh->ev_read = NULL;
1170             dh->ev_write = NULL;
1171         } else {
1172             npipes++;
1173         }
1174     }
1175
1176     /*
1177      * If no pipes are open, then we're done.  Otherwise, just start running.
1178      * The event handlers on all of the pipes will take it from here.
1179      */
1180     amandad_debug(1, _("at end of s_ackwait, npipes is %d\n"), npipes);
1181     if (npipes == 0)
1182         return (A_FINISH);
1183     else {
1184         security_close(as->security_handle);
1185         as->security_handle = NULL;
1186         return (A_PENDING);
1187     }
1188 }
1189
1190 /*
1191  * Called when a repfd has received data
1192  */
1193 static void
1194 repfd_recv(
1195     void *      cookie)
1196 {
1197     struct active_service *as = cookie;
1198
1199     assert(as != NULL);
1200     assert(as->ev_repfd != NULL);
1201
1202     state_machine(as, A_RECVREP, NULL);
1203 }
1204
1205 static void
1206 process_errfd(
1207     void *cookie)
1208 {
1209     struct active_service *as = cookie;
1210
1211     /* Process errfd before sending the REP packet */
1212     if (as->ev_errfd) {
1213         SELECT_ARG_TYPE readset;
1214         struct timeval  tv;
1215         int             nfound;
1216
1217         memset(&tv, 0, SIZEOF(tv));
1218         FD_ZERO(&readset);
1219         FD_SET(as->errfd, &readset);
1220         nfound = select(as->errfd+1, &readset, NULL, NULL, &tv);
1221         if (nfound && FD_ISSET(as->errfd, &readset)) {
1222             errfd_recv(as);
1223         }
1224     }
1225 }
1226
1227 /*
1228  * Called when a errfd has received data
1229  */
1230 static void
1231 errfd_recv(
1232     void *      cookie)
1233 {
1234     struct active_service *as = cookie;
1235     char  buf[32769];
1236     int   n;
1237     char *r;
1238
1239     assert(as != NULL);
1240     assert(as->ev_errfd != NULL);
1241
1242     n = read(as->errfd, &buf, 32768);
1243     /* merge buffer */
1244     if (n > 0) {
1245         /* Terminate it with '\0' */
1246         buf[n+1] = '\0';
1247
1248         if (as->errbuf) {
1249             as->errbuf = vstrextend(&as->errbuf, buf, NULL);
1250         } else {
1251             as->errbuf = stralloc(buf);
1252         }
1253     } else if (n == 0) {
1254         event_release(as->ev_errfd);
1255         as->ev_errfd = NULL;
1256     } else { /* n < 0 */
1257         event_release(as->ev_errfd);
1258         as->ev_errfd = NULL;
1259         g_snprintf(buf, 32768,
1260                    "error reading stderr or service: %s\n", strerror(errno));
1261     }
1262
1263     /* for each line terminate by '\n' */
1264     while (as->errbuf != NULL  && (r = strchr(as->errbuf, '\n')) != NULL) {
1265         char *s;
1266
1267         *r = '\0';
1268         s = vstrallocf("ERROR service %s: %s\n",
1269                        services[as->service].name, as->errbuf);
1270
1271         /* Add to repbuf, error message will be in the REP packet if it
1272          * is not already sent
1273          */
1274         n = strlen(s);
1275         if (as->bufsize == 0) {
1276             as->bufsize = NETWORK_BLOCK_BYTES;
1277             as->repbuf = alloc(as->bufsize);
1278         }
1279         while (as->bufsize < as->repbufsize + n) {
1280             char *repbuf_temp;
1281             as->bufsize *= 2;
1282             repbuf_temp = alloc(as->bufsize);
1283             memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
1284             amfree(as->repbuf);
1285             as->repbuf = repbuf_temp;
1286         }
1287         memcpy(as->repbuf + as->repbufsize, s, n);
1288         as->repbufsize += n;
1289
1290         dbprintf("%s", s);
1291
1292         /* remove first line from buffer */
1293         r++;
1294         s = stralloc(r);
1295         amfree(as->errbuf);
1296         as->errbuf = s;
1297     }
1298 }
1299
1300 /*
1301  * Called when a repfd has timed out
1302  */
1303 static void
1304 timeout_repfd(
1305     void *      cookie)
1306 {
1307     struct active_service *as = cookie;
1308
1309     assert(as != NULL);
1310     assert(as->ev_reptimeout != NULL);
1311
1312     state_machine(as, A_TIMEOUT, NULL);
1313 }
1314
1315 /*
1316  * Called when a handle has received data
1317  */
1318 static void
1319 protocol_recv(
1320     void *              cookie,
1321     pkt_t *             pkt,
1322     security_status_t   status)
1323 {
1324     struct active_service *as = cookie;
1325
1326     assert(as != NULL);
1327
1328     switch (status) {
1329     case S_OK:
1330         dbprintf(_("received %s pkt:\n<<<<<\n%s>>>>>\n"),
1331             pkt_type2str(pkt->type), pkt->body);
1332         state_machine(as, A_RECVPKT, pkt);
1333         break;
1334     case S_TIMEOUT:
1335         dbprintf(_("timeout\n"));
1336         state_machine(as, A_TIMEOUT, NULL);
1337         break;
1338     case S_ERROR:
1339         dbprintf(_("receive error: %s\n"),
1340             security_geterror(as->security_handle));
1341         break;
1342     }
1343 }
1344
1345 /*
1346  * This is a generic relay function that just reads data from one of
1347  * the process's pipes and passes it up the equivalent security_stream_t
1348  */
1349 static void
1350 process_readnetfd(
1351     void *      cookie)
1352 {
1353     pkt_t nak;
1354     struct datafd_handle *dh = cookie;
1355     struct active_service *as = dh->as;
1356     ssize_t n;
1357
1358     nak.body = NULL;
1359
1360     do {
1361         n = read(dh->fd_read, as->databuf, SIZEOF(as->databuf));
1362     } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
1363
1364     /*
1365      * Process has died.
1366      */
1367     if (n < 0) {
1368         pkt_init(&nak, P_NAK, _("A ERROR data descriptor %d broken: %s\n"),
1369             dh->fd_read, strerror(errno));
1370         goto sendnak;
1371     }
1372     /*
1373      * Process has closed the pipe.  Just remove this event handler.
1374      * If all pipes are closed, shut down this service.
1375      */
1376     if (n == 0) {
1377         event_release(dh->ev_read);
1378         dh->ev_read = NULL;
1379         if(dh->ev_write == NULL) {
1380             security_stream_close(dh->netfd);
1381             dh->netfd = NULL;
1382         }
1383         for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1384             if (dh->netfd != NULL)
1385                 return;
1386         }
1387         service_delete(as);
1388         return;
1389     }
1390
1391     /* Handle the special case of recognizing "sendbackup info end"
1392      * from sendbackup's MESG fd */
1393     if (as->service == SERVICE_SENDBACKUP && !as->seen_info_end && dh == &as->data[1]) {
1394         /* make a buffer containing the combined data from info_end_buf
1395          * and what we've read this time, and search it for info_end_strj
1396          * This includes a NULL byte for strstr's sanity. */
1397         char *combined_buf = malloc(INFO_END_LEN + n + 1);
1398         memcpy(combined_buf, as->info_end_buf, INFO_END_LEN);
1399         memcpy(combined_buf+INFO_END_LEN, as->databuf, n);
1400         combined_buf[INFO_END_LEN+n] = '\0';
1401
1402         as->seen_info_end = (strstr(combined_buf, info_end_str) != NULL);
1403
1404         /* fill info_end_buf from the tail end of combined_buf */
1405         memcpy(as->info_end_buf, combined_buf + n, INFO_END_LEN);
1406
1407         /* if we did see info_end_str, start reading the data fd (fd 0) */
1408         if (as->seen_info_end) {
1409             struct datafd_handle *dh = &as->data[0];
1410             amandad_debug(1, "Opening datafd to sendbackup (delayed until sendbackup sent header info)\n");
1411             dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1412                                          process_readnetfd, dh);
1413         } else {
1414             amandad_debug(1, "sendbackup header info still not complete\n");
1415         }
1416     }
1417
1418     if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
1419         /* stream has croaked */
1420         pkt_init(&nak, P_NAK, _("ERROR write error on stream %d: %s\n"),
1421             security_stream_id(dh->netfd),
1422             security_stream_geterror(dh->netfd));
1423         goto sendnak;
1424     }
1425     return;
1426
1427 sendnak:
1428     do_sendpkt(as->security_handle, &nak);
1429     service_delete(as);
1430     amfree(nak.body);
1431 }
1432
1433 /*
1434  * This is a generic relay function that just read data from one of
1435  * the security_stream_t and passes it up the equivalent process's pipes
1436  */
1437 static void
1438 process_writenetfd(
1439     void *      cookie,
1440     void *      buf,
1441     ssize_t     size)
1442 {
1443     struct datafd_handle *dh;
1444
1445     assert(cookie != NULL);
1446     dh = cookie;
1447
1448     if (dh->fd_write <= 0) {
1449         dbprintf(_("process_writenetfd: dh->fd_write <= 0\n"));
1450     } else if (size > 0) {
1451         full_write(dh->fd_write, buf, (size_t)size);
1452         security_stream_read(dh->netfd, process_writenetfd, dh);
1453     }
1454     else {
1455         aclose(dh->fd_write);
1456     }
1457 }
1458
1459
1460 /*
1461  * Convert a local stream handle (DATA_FD...) into something that
1462  * can be sent to the amanda server.
1463  *
1464  * Returns a number that should be sent to the server in the REP packet.
1465  */
1466 static int
1467 allocstream(
1468     struct active_service *     as,
1469     int                         handle)
1470 {
1471     struct datafd_handle *dh;
1472
1473     /* note that handle is in the range DATA_FD_OFFSET to DATA_FD_COUNT, but
1474      * it is NOT a file descriptor! */
1475
1476     /* if the handle is -1, then we don't bother */
1477     if (handle < 0)
1478         return (-1);
1479
1480     /* make sure the handle's kosher */
1481     if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
1482         return (-1);
1483
1484     /* get a pointer into our handle array */
1485     dh = &as->data[handle - DATA_FD_OFFSET];
1486
1487     /* make sure we're not already using the net handle */
1488     if (dh->netfd != NULL)
1489         return (-1);
1490
1491     /* allocate a stream from the security layer and return */
1492     dh->netfd = security_stream_server(as->security_handle);
1493     if (dh->netfd == NULL) {
1494         dbprintf(_("couldn't open stream to server: %s\n"),
1495             security_geterror(as->security_handle));
1496         return (-1);
1497     }
1498
1499     /*
1500      * convert the stream into a numeric id that can be sent to the
1501      * remote end.
1502      */
1503     return (security_stream_id(dh->netfd));
1504 }
1505
1506 /*
1507  * Create a new service instance
1508  */
1509 static struct active_service *
1510 service_new(
1511     security_handle_t * security_handle,
1512     const char *        cmd,
1513     service_t           service,
1514     const char *        arguments)
1515 {
1516     int i;
1517     int data_read[DATA_FD_COUNT + 2][2];
1518     int data_write[DATA_FD_COUNT + 2][2];
1519     struct active_service *as;
1520     pid_t pid;
1521     int newfd;
1522
1523     assert(security_handle != NULL);
1524     assert(cmd != NULL);
1525     assert(arguments != NULL);
1526
1527     /* a plethora of pipes */
1528     /* data_read[0]                : stdin
1529      * data_write[0]               : stdout
1530      * data_read[1], data_write[1] : first  stream
1531      * data_read[2], data_write[2] : second stream
1532      * data_read[3], data_write[3] : third stream
1533      * data_write[4]               : stderr
1534      */
1535     for (i = 0; i < DATA_FD_COUNT + 1; i++) {
1536         if (pipe(data_read[i]) < 0) {
1537             error(_("pipe: %s\n"), strerror(errno));
1538             /*NOTREACHED*/
1539         }
1540         if (pipe(data_write[i]) < 0) {
1541             error(_("pipe: %s\n"), strerror(errno));
1542             /*NOTREACHED*/
1543         }
1544     }
1545     if (pipe(data_write[STDERR_PIPE]) < 0) {
1546         error(_("pipe: %s\n"), strerror(errno));
1547         /*NOTREACHED*/
1548     }
1549
1550     switch(pid = fork()) {
1551     case -1:
1552         error(_("could not fork service %s: %s\n"), cmd, strerror(errno));
1553         /*NOTREACHED*/
1554     default:
1555         /*
1556          * The parent.  Close the far ends of our pipes and return.
1557          */
1558         as = g_new0(struct active_service, 1);
1559         as->cmd = stralloc(cmd);
1560         as->arguments = stralloc(arguments);
1561         as->security_handle = security_handle;
1562         as->state = NULL;
1563         as->service = service;
1564         as->pid = pid;
1565         as->send_partial_reply = 0;
1566         as->seen_info_end = FALSE;
1567         /* fill in info_end_buf with non-null characters */
1568         memset(as->info_end_buf, '-', sizeof(as->info_end_buf));
1569         if(service == SERVICE_SENDSIZE) {
1570             g_option_t *g_options;
1571             char *option_str, *p;
1572
1573             option_str = stralloc(as->arguments+8);
1574             p = strchr(option_str,'\n');
1575             if(p) *p = '\0';
1576
1577             g_options = parse_g_options(option_str, 1);
1578             if(am_has_feature(g_options->features, fe_partial_estimate)) {
1579                 as->send_partial_reply = 1;
1580             }
1581             free_g_options(g_options);
1582             amfree(option_str);
1583         }
1584
1585         /* write to the request pipe */
1586         aclose(data_read[0][0]);
1587         as->reqfd = data_read[0][1];
1588
1589         /*
1590          * read from the reply pipe
1591          */
1592         as->repfd = data_write[0][0];
1593         aclose(data_write[0][1]);
1594         as->ev_repfd = NULL;
1595         as->repbuf = NULL;
1596         as->repbufsize = 0;
1597         as->bufsize = 0;
1598         as->repretry = 0;
1599         as->rep_pkt.body = NULL;
1600
1601         /*
1602          * read from the stderr pipe
1603          */
1604         as->errfd = data_write[STDERR_PIPE][0];
1605         aclose(data_write[STDERR_PIPE][1]);
1606         as->ev_errfd = NULL;
1607         as->errbuf = NULL;
1608
1609         /*
1610          * read from the rest of the general-use pipes
1611          * (netfds are opened as the client requests them)
1612          */
1613         for (i = 0; i < DATA_FD_COUNT; i++) {
1614             aclose(data_read[i + 1][1]);
1615             aclose(data_write[i + 1][0]);
1616             as->data[i].fd_read = data_read[i + 1][0];
1617             as->data[i].fd_write = data_write[i + 1][1];
1618             as->data[i].ev_read = NULL;
1619             as->data[i].ev_write = NULL;
1620             as->data[i].netfd = NULL;
1621             as->data[i].as = as;
1622         }
1623
1624         /* add it to the service queue */
1625         /* increment the active service count */
1626         serviceq = g_slist_append(serviceq, (gpointer)as);
1627
1628         return (as);
1629     case 0:
1630         /*
1631          * The child.  Put our pipes in their advertised locations
1632          * and start up.
1633          */
1634
1635         /*
1636          * The data stream is stdin in the new process
1637          */
1638         if (dup2(data_read[0][0], 0) < 0) {
1639             error(_("dup %d to %d failed: %s\n"), data_read[0][0], 0,
1640                 strerror(errno));
1641             /*NOTREACHED*/
1642         }
1643         aclose(data_read[0][0]);
1644         aclose(data_read[0][1]);
1645
1646         /*
1647          * The reply stream is stdout
1648          */
1649         if (dup2(data_write[0][1], 1) < 0) {
1650             error(_("dup %d to %d failed: %s\n"), data_write[0][1], 1,
1651                 strerror(errno));
1652         }
1653         aclose(data_write[0][0]);
1654         aclose(data_write[0][1]);
1655
1656         for (i = 0; i < DATA_FD_COUNT; i++) {
1657             aclose(data_read[i + 1][0]);
1658             aclose(data_write[i + 1][1]);
1659         }
1660
1661         /*
1662          *  Make sure they are not open in the range DATA_FD_OFFSET to
1663          *      DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1
1664          */
1665         for (i = 0; i < DATA_FD_COUNT; i++) {
1666             while(data_read[i + 1][1] >= DATA_FD_OFFSET &&
1667                   data_read[i + 1][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1668                 newfd = dup(data_read[i + 1][1]);
1669                 if(newfd == -1)
1670                     error(_("Can't dup out off DATA_FD range"));
1671                 data_read[i + 1][1] = newfd;
1672             }
1673             while(data_write[i + 1][0] >= DATA_FD_OFFSET &&
1674                   data_write[i + 1][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1675                 newfd = dup(data_write[i + 1][0]);
1676                 if(newfd == -1)
1677                     error(_("Can't dup out off DATA_FD range"));
1678                 data_write[i + 1][0] = newfd;
1679             }
1680         }
1681         while(data_write[4][0] >= DATA_FD_OFFSET &&
1682               data_write[4][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1683             newfd = dup(data_write[4][0]);
1684             if (newfd == -1)
1685                 error(_("Can't dup out off DATA_FD range"));
1686             data_write[4][0] = newfd;
1687         }
1688         while(data_write[4][1] >= DATA_FD_OFFSET &&
1689               data_write[4][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1690             newfd = dup(data_write[4][1]);
1691             if (newfd == -1)
1692                 error(_("Can't dup out off DATA_FD range"));
1693             data_write[4][1] = newfd;
1694         }
1695
1696         for (i = 0; i < DATA_FD_COUNT*2; i++)
1697             close(DATA_FD_OFFSET + i);
1698
1699         /*
1700          * The rest start at the offset defined in amandad.h, and continue
1701          * through the internal defined.
1702          */
1703         for (i = 0; i < DATA_FD_COUNT; i++) {
1704             if (dup2(data_read[i + 1][1], i*2 + DATA_FD_OFFSET) < 0) {
1705                 error(_("dup %d to %d failed: %s\n"), data_read[i + 1][1],
1706                     i + DATA_FD_OFFSET, strerror(errno));
1707             }
1708             aclose(data_read[i + 1][1]);
1709
1710             if (dup2(data_write[i + 1][0], i*2 + 1 + DATA_FD_OFFSET) < 0) {
1711                 error(_("dup %d to %d failed: %s\n"), data_write[i + 1][0],
1712                     i + DATA_FD_OFFSET, strerror(errno));
1713             }
1714             aclose(data_write[i + 1][0]);
1715         }
1716
1717         /* close all unneeded fd */
1718         close(STDERR_FILENO);
1719         dup2(data_write[STDERR_PIPE][1], 2);
1720         aclose(data_write[STDERR_PIPE][0]);
1721         aclose(data_write[STDERR_PIPE][1]);
1722         safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
1723
1724         execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env());
1725         error(_("could not exec service %s: %s\n"), cmd, strerror(errno));
1726         /*NOTREACHED*/
1727     }
1728     return NULL;
1729 }
1730
1731 /*
1732  * Unallocate a service instance
1733  */
1734 static void
1735 service_delete(
1736     struct active_service *     as)
1737 {
1738     int i;
1739     int   count;
1740     pid_t pid;
1741     struct datafd_handle *dh;
1742
1743     amandad_debug(1, _("closing service: %s\n"),
1744                       (as->cmd)?as->cmd:_("??UNKONWN??"));
1745
1746     assert(as != NULL);
1747
1748     assert(as->cmd != NULL);
1749     amfree(as->cmd);
1750
1751     assert(as->arguments != NULL);
1752     amfree(as->arguments);
1753
1754     if (as->reqfd != -1)
1755         aclose(as->reqfd);
1756     if (as->repfd != -1)
1757         aclose(as->repfd);
1758     if (as->errfd != -1) {
1759         process_errfd(as);
1760         aclose(as->errfd);
1761     }
1762
1763     if (as->ev_repfd != NULL)
1764         event_release(as->ev_repfd);
1765     if (as->ev_reptimeout != NULL)
1766         event_release(as->ev_reptimeout);
1767     if (as->ev_errfd != NULL)
1768         event_release(as->ev_errfd);
1769
1770     for (i = 0; i < DATA_FD_COUNT; i++) {
1771         dh = &as->data[i];
1772
1773         aclose(dh->fd_read);
1774         aclose(dh->fd_write);
1775
1776         if (dh->netfd != NULL)
1777             security_stream_close(dh->netfd);
1778
1779         if (dh->ev_read != NULL)
1780             event_release(dh->ev_read);
1781         if (dh->ev_write != NULL)
1782             event_release(dh->ev_write);
1783     }
1784
1785     if (as->security_handle != NULL)
1786         security_close(as->security_handle);
1787
1788     assert(as->pid > 0);
1789     kill(as->pid, SIGTERM);
1790     pid = waitpid(as->pid, NULL, WNOHANG);
1791     count = 5;
1792     while (pid != as->pid && count > 0) {
1793         count--;
1794         sleep(1);
1795         pid = waitpid(as->pid, NULL, WNOHANG);
1796     }
1797     if (pid != as->pid) {
1798         g_debug("Process %d failed to exit", (int)as->pid);
1799     }
1800
1801     serviceq = g_slist_remove(serviceq, (gpointer)as);
1802
1803     amfree(as->cmd);
1804     amfree(as->arguments);
1805     amfree(as->repbuf);
1806     amfree(as->rep_pkt.body);
1807     amfree(as);
1808
1809     if(exit_on_qlength == 0 && g_slist_length(serviceq) == 0) {
1810         dbclose();
1811         exit(0);
1812     }
1813 }
1814
1815 /*
1816  * Like 'fullwrite', but does the work in a child process so pipelines
1817  * do not hang.
1818  */
1819 static int
1820 writebuf(
1821     struct active_service *     as,
1822     const void *                bufp,
1823     size_t                      size)
1824 {
1825     pid_t pid;
1826     size_t    writesize;
1827
1828     switch (pid=fork()) {
1829     case -1:
1830         break;
1831
1832     default:
1833         waitpid(pid, NULL, WNOHANG);
1834         return 0;                       /* this is the parent */
1835
1836     case 0:                             /* this is the child */
1837         close(as->repfd);
1838         writesize = full_write(as->reqfd, bufp, size);
1839         exit(writesize != size);
1840         /* NOTREACHED */
1841     }
1842     return -1;
1843 }
1844
1845 static ssize_t
1846 do_sendpkt(
1847     security_handle_t * handle,
1848     pkt_t *             pkt)
1849 {
1850     dbprintf(_("sending %s pkt:\n<<<<<\n%s>>>>>\n"),
1851         pkt_type2str(pkt->type), pkt->body);
1852     if (handle)
1853         return security_sendpkt(handle, pkt);
1854     else
1855         return 1;
1856 }
1857
1858 /*
1859  * Convert a state into a string
1860  */
1861 static const char *
1862 state2str(
1863     state_t     state)
1864 {
1865     static const struct {
1866         state_t state;
1867         const char str[13];
1868     } states[] = {
1869 #define X(state)        { state, stringize(state) }
1870         X(s_sendack),
1871         X(s_repwait),
1872         X(s_processrep),
1873         X(s_sendrep),
1874         X(s_ackwait),
1875 #undef X
1876     };
1877     int i;
1878
1879     for (i = 0; i < (int)(sizeof(states) / sizeof(states[0])); i++)
1880         if (state == states[i].state)
1881             return (states[i].str);
1882     return (_("INVALID STATE"));
1883 }
1884
1885 /*
1886  * Convert an action into a string
1887  */
1888 static const char *
1889 action2str(
1890     action_t    action)
1891 {
1892     static const struct {
1893         action_t action;
1894         const char str[12];
1895     } actions[] = {
1896 #define X(action)       { action, stringize(action) }
1897         X(A_START),
1898         X(A_RECVPKT),
1899         X(A_RECVREP),
1900         X(A_PENDING),
1901         X(A_FINISH),
1902         X(A_CONTINUE),
1903         X(A_SENDNAK),
1904         X(A_TIMEOUT),
1905 #undef X
1906     };
1907     int i;
1908
1909     for (i = 0; i < (int)(sizeof(actions) / sizeof(actions[0])); i++)
1910         if (action == actions[i].action)
1911             return (actions[i].str);
1912     return (_("UNKNOWN ACTION"));
1913 }
1914
1915 static char *
1916 amandad_get_security_conf(
1917     char *      string,
1918     void *      arg)
1919 {
1920     (void)arg;      /* Quiet unused parameter warning */
1921
1922     if (!string || !*string)
1923         return(NULL);
1924
1925     if (strcmp(string, "kencrypt")==0) {
1926         if (amandad_kencrypt == KENCRYPT_YES)
1927             return ("yes");
1928         else
1929             return (NULL);
1930     }
1931     return(NULL);
1932 }
1933