2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 1991-1999 University of Maryland at College Park
6 * Permission to use, copy, modify, distribute, and sell this software and its
7 * documentation for any purpose is hereby granted without fee, provided that
8 * the above copyright notice appear in all copies and that both that
9 * copyright notice and this permission notice appear in supporting
10 * documentation, and that the name of U.M. not be used in advertising or
11 * publicity pertaining to distribution of the software without specific,
12 * written prior permission. U.M. makes no representations about the
13 * suitability of this software for any purpose. It is provided "as is"
14 * without express or implied warranty.
16 * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20 * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21 * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
23 * Authors: the Amanda Development Team. Its members are listed in a
24 * file named AUTHORS, in the root directory of this distribution.
28 * $Id: amandad.c,v 1.62.2.2 2006/05/08 11:50:16 martinea Exp $
30 * handle client-host side of Amanda network communications, including
31 * security checks, execution of the proper service, and acking the
35 /*#define AMANDAD_DEBUG*/
41 #include "amfeatures.h"
48 #include "client_util.h"
50 #define REP_TIMEOUT (6*60*60) /* secs for service to reply */
51 #define ACK_TIMEOUT 10 /* XXX should be configurable */
52 #define MAX_REP_RETRIES 5
55 * These are the actions for entering the state machine
57 typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
58 A_SENDNAK, A_TIMEOUT } action_t;
61 * This is a state in the state machine. It is a function pointer to
62 * the function that actually implements the state.
64 struct active_service;
65 typedef action_t (*state_t) P((struct active_service *, action_t, pkt_t *));
68 * This structure describes an active running service.
70 * An active service is something running that we have received
71 * a request for. This structure holds info on that service, including
72 * file descriptors for data, etc, as well as the security handle
73 * for communications with the amanda server.
75 struct active_service {
76 char *cmd; /* name of command we ran */
77 char *arguments; /* arguments we sent it */
78 security_handle_t *security_handle; /* remote server */
79 state_t state; /* how far this has progressed */
80 pid_t pid; /* pid of subprocess */
81 int send_partial_reply; /* send PREP packet */
82 int reqfd; /* pipe to write requests */
83 int repfd; /* pipe to read replies */
84 event_handle_t *ev_repfd; /* read event handle for repfd */
85 event_handle_t *ev_reptimeout; /* timeout for rep data */
86 pkt_t rep_pkt; /* rep packet we're sending out */
87 char repbuf[MAX_PACKET]; /* buffer to read the rep into */
88 int repbufsize; /* length of repbuf */
89 int repretry; /* times we'll retry sending the rep */
91 * General user streams to the process, and their equivalent
94 struct datafd_handle {
95 int fd; /* pipe to child process */
96 event_handle_t *ev_handle; /* it's read event handle */
97 security_stream_t *netfd; /* stream to amanda server */
98 struct active_service *as; /* pointer back to our enclosure */
99 } data[DATA_FD_COUNT];
100 char databuf[NETWORK_BLOCK_BYTES]; /* buffer to relay netfd data in */
101 TAILQ_ENTRY(active_service) tq; /* queue handle */
105 * Here are the services that we allow.
107 static const char *services[] = {
113 #define NSERVICES (sizeof(services) / sizeof(services[0]))
116 * Queue of outstanding requests that we are running.
119 TAILQ_HEAD(, active_service) tailq;
122 TAILQ_HEAD_INITIALIZER(serviceq.tailq), 0
126 * Data for dbmalloc to check for memory leaks
131 unsigned long size, hist;
136 int ack_timeout = ACK_TIMEOUT;
138 int main P((int argc, char **argv));
140 static int allocstream P((struct active_service *, int));
141 static void exit_check P((void *));
142 static void protocol_accept P((security_handle_t *, pkt_t *));
143 static void state_machine P((struct active_service *, action_t, pkt_t *));
145 static action_t s_sendack P((struct active_service *, action_t, pkt_t *));
146 static action_t s_repwait P((struct active_service *, action_t, pkt_t *));
147 static action_t s_processrep P((struct active_service *, action_t, pkt_t *));
148 static action_t s_sendrep P((struct active_service *, action_t, pkt_t *));
149 static action_t s_ackwait P((struct active_service *, action_t, pkt_t *));
151 static void repfd_recv P((void *));
152 static void timeout_repfd P((void *));
153 static void protocol_recv P((void *, pkt_t *, security_status_t));
154 static void process_netfd P((void *));
155 static struct active_service *service_new P((security_handle_t *,
156 const char *, const char *));
157 static void service_delete P((struct active_service *));
158 static int writebuf P((struct active_service *, const void *, size_t));
159 static int do_sendpkt P((security_handle_t *handle, pkt_t *pkt));
162 static const char *state2str P((state_t));
163 static const char *action2str P((action_t));
172 const security_driver_t *secdrv;
174 char *pgm = "amandad"; /* in case argv[0] is not set */
180 error("argv == NULL\n");
184 * When called via inetd, it is not uncommon to forget to put the
185 * argv[0] value on the config line. On some systems (e.g. Solaris)
186 * this causes argv and/or argv[0] to be NULL, so we have to be
187 * careful getting our name.
189 if (argc >= 1 && argv != NULL && argv[0] != NULL) {
190 if((pgm = strrchr(argv[0], '/')) != NULL) {
199 /* Don't die when child closes pipe */
200 signal(SIGPIPE, SIG_IGN);
203 dbmalloc_info.start.size = malloc_inuse(&dbmalloc_info.start.hist);
206 erroutput_type = (ERR_INTERACTIVE|ERR_SYSLOG);
209 /* we'd rather not run as root */
210 if (geteuid() == 0) {
211 if(client_uid == (uid_t) -1) {
212 error("error [cannot find user %s in passwd file]\n", CLIENT_LOGIN);
214 initgroups(CLIENT_LOGIN, client_gid);
219 #endif /* FORCE_USERID */
222 * ad-hoc argument parsing
224 * We accept -auth=[authentication type]
232 in = 0; out = 1; /* default to stdin/stdout */
233 for (i = 1; i < argc; i++) {
235 * accept -krb4 as an alias for -auth=krb4 (for compatibility)
237 if (strcmp(argv[i], "-krb4") == 0) {
238 argv[i] = "-auth=krb4";
243 * Get a driver for a security type specified after -auth=
245 if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
246 argv[i] += strlen("-auth=");
247 secdrv = security_getdriver(argv[i]);
249 error("no driver for security type '%s'", argv[i]);
254 * If -no-exit is specified, always run even after requests have
257 if (strcmp(argv[i], "-no-exit") == 0) {
264 * Allow us to directly bind to a udp port for debugging.
265 * This may only apply to some security types.
267 if (strncmp(argv[i], "-udp=", strlen("-udp=")) == 0) {
268 struct sockaddr_in sin;
270 argv[i] += strlen("-udp=");
271 in = out = socket(AF_INET, SOCK_DGRAM, 0);
273 error("can't create dgram socket: %s\n", strerror(errno));
274 sin.sin_family = AF_INET;
275 sin.sin_addr.s_addr = INADDR_ANY;
276 sin.sin_port = htons(atoi(argv[i]));
277 if (bind(in, (struct sockaddr *)&sin, sizeof(sin)) < 0)
278 error("can't bind to port %d: %s\n", atoi(argv[i]),
282 * Ditto for tcp ports.
284 if (strncmp(argv[i], "-tcp=", strlen("-tcp=")) == 0) {
285 struct sockaddr_in sin;
288 argv[i] += strlen("-tcp=");
289 sock = socket(AF_INET, SOCK_STREAM, 0);
291 error("can't create tcp socket: %s\n", strerror(errno));
293 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (void *)&n, sizeof(n));
294 sin.sin_family = AF_INET;
295 sin.sin_addr.s_addr = INADDR_ANY;
296 sin.sin_port = htons(atoi(argv[i]));
297 if (bind(sock, (struct sockaddr *)&sin, sizeof(sin)) < 0)
298 error("can't bind to port %d: %s\n", atoi(argv[i]),
302 in = out = accept(sock, (struct sockaddr *)&sin, &n);
308 * If no security type specified, use BSD
310 if (secdrv == NULL) {
311 secdrv = security_getdriver("BSD");
313 error("no driver for default security type 'BSD'");
320 /* this lameness is for error() */
329 dbprintf(("%s: version %s\n", get_pname(), version()));
330 for (i = 0; version_info[i] != NULL; i++) {
331 dbprintf(("%s: %s", debug_prefix(NULL), version_info[i]));
334 if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
335 dbprintf(("%s: WARNING: argv[0] not defined: check inetd.conf\n",
336 debug_prefix(NULL)));
340 * Schedule to call protocol_accept() when new security handles
341 * are created on stdin.
343 security_accept(secdrv, in, out, protocol_accept);
346 * Schedule an event that will try to exit every 30 seconds if there
347 * are no requests outstanding.
349 (void)event_register(30, EV_TIME, exit_check, &no_exit);
352 * Call event_loop() with an arg of 0, telling it to block until all
353 * events are completed.
358 exit(1); /* appease gcc/lint */
362 * This runs periodically and checks to see if we have any active services
363 * still running. If we don't, then we quit.
371 assert(cookie != NULL);
372 no_exit = *(int *)cookie;
375 * If things are still running, then don't exit.
377 if (serviceq.qlength > 0)
381 * If the caller asked us to never exit, then we're done
387 dbmalloc_info.end.size = malloc_inuse(&dbmalloc_info.end.hist);
389 if (dbmalloc_info.start.size != dbmalloc_info.end.size) {
390 malloc_list(dbfd(), dbmalloc_info.start.hist,
391 dbmalloc_info.end.hist);
400 * Handles new incoming protocol handles. This is a callback for
401 * security_accept(), which gets called when new handles are detected.
404 protocol_accept(handle, pkt)
405 security_handle_t *handle;
409 struct active_service *as;
410 char *pktbody, *tok, *service, *arguments;
414 * If pkt is NULL, then there was a problem with the new connection.
417 dbprintf(("%s: accept error: %s\n",
418 debug_prefix_time(NULL), security_geterror(handle)));
419 pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
420 do_sendpkt(handle, &pkt_out);
421 security_close(handle);
425 dbprintf(("%s: accept recv %s pkt:\n<<<<<\n%s>>>>>\n",
426 debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
429 * If this is not a REQ packet, just forget about it.
431 if (pkt->type != P_REQ) {
432 dbprintf(("%s: received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n",
433 debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
434 security_close(handle);
438 pktbody = service = arguments = NULL;
442 * Parse out the service and arguments
445 pktbody = stralloc(pkt->body);
447 tok = strtok(pktbody, " ");
450 if (strcmp(tok, "SERVICE") != 0)
453 tok = strtok(NULL, " \n");
456 service = stralloc(tok);
458 /* we call everything else 'arguments' */
459 tok = strtok(NULL, "");
462 arguments = stralloc(tok);
464 /* see if it's one we allow */
465 for (i = 0; i < NSERVICES; i++)
466 if (strcmp(services[i], service) == 0)
468 if (i == NSERVICES) {
469 dbprintf(("%s: %s: invalid service\n",
470 debug_prefix_time(NULL), service));
471 pkt_init(&pkt_out, P_NAK, "ERROR %s: invalid service\n", service);
475 service = newvstralloc(service,
476 libexecdir, "/", service, versionsuffix(),
478 if (access(service, X_OK) < 0) {
479 dbprintf(("%s: can't execute %s: %s\n",
480 debug_prefix_time(NULL), service, strerror(errno)));
481 pkt_init(&pkt_out, P_NAK, "ERROR execute access to \"%s\" denied\n",
486 /* see if its already running */
487 for (as = TAILQ_FIRST(&serviceq.tailq); as != NULL;
488 as = TAILQ_NEXT(as, tq)) {
489 if (strcmp(as->cmd, service) == 0 &&
490 strcmp(as->arguments, arguments) == 0) {
491 dbprintf(("%s: %s %s: already running, acking req\n",
492 debug_prefix_time(NULL), service, arguments));
493 pkt_init(&pkt_out, P_ACK, "");
499 * create a new service instance, and send the arguments down
502 dbprintf(("%s: creating new service: %s\n%s\n",
503 debug_prefix_time(NULL), service, arguments));
504 as = service_new(handle, service, arguments);
505 if (writebuf(as, arguments, strlen(arguments)) < 0) {
506 const char *errmsg = strerror(errno);
507 dbprintf(("%s: error sending arguments to %s: %s\n",
508 debug_prefix_time(NULL), service, errmsg));
509 pkt_init(&pkt_out, P_NAK, "ERROR error writing arguments to %s: %s\n",
520 * Move to the sendack state, and start up the state
523 as->state = s_sendack;
524 state_machine(as, A_START, NULL);
528 pkt_init(&pkt_out, P_NAK, "ERROR invalid REQ\n");
529 dbprintf(("%s: received invalid %s packet:\n<<<<<\n%s>>>>>\n\n",
530 debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
536 if(as) service_delete(as);
537 do_sendpkt(handle, &pkt_out);
538 security_close(handle);
542 * Handles incoming protocol packets. Routes responses to the proper
546 state_machine(as, action, pkt)
547 struct active_service *as;
556 dbprintf(("%s: state_machine: %X entering\n",
557 debug_prefix_time(NULL), (unsigned int)as));
560 curstate = as->state;
562 dbprintf(("%s: state_machine: %X curstate=%s action=%s\n",
563 debug_prefix_time(NULL), (unsigned int)as,
564 state2str(curstate), action2str(action)));
566 retaction = (*curstate)(as, action, pkt);
568 dbprintf(("%s: state_machine: %X curstate=%s returned %s (nextstate=%s)\n",
569 debug_prefix_time(NULL),
570 (unsigned int)as, state2str(curstate), action2str(retaction),
571 state2str(as->state)));
576 * State has queued up and is now blocking on input.
580 dbprintf(("%s: state_machine: %X leaving (A_PENDING)\n",
581 debug_prefix_time(NULL), (unsigned int)as));
586 * service has switched states. Loop.
592 * state has determined that the packet it received was bogus.
593 * Send a nak, and return.
596 dbprintf(("%s: received unexpected %s packet\n",
597 debug_prefix_time(NULL), pkt_type2str(pkt->type)));
598 dbprintf(("<<<<<\n%s----\n\n", pkt->body));
599 pkt_init(&nak, P_NAK, "ERROR unexpected packet type %s\n",
600 pkt_type2str(pkt->type));
601 do_sendpkt(as->security_handle, &nak);
603 dbprintf(("%s: state_machine: %X leaving (A_SENDNAK)\n",
604 debug_prefix_time(NULL), (unsigned int)as));
609 * Service is done. Remove it and finish.
614 dbprintf(("%s: state_machine: %X leaving (A_FINISH)\n",
615 debug_prefix_time(NULL), (unsigned int)as));
628 * This state just sends an ack. After that, we move to the repwait
629 * state to wait for REP data to arrive from the subprocess.
632 s_sendack(as, action, pkt)
633 struct active_service *as;
639 pkt_init(&ack, P_ACK, "");
640 if (do_sendpkt(as->security_handle, &ack) < 0) {
641 dbprintf(("%s: error sending ACK: %s\n",
642 debug_prefix_time(NULL), security_geterror(as->security_handle)));
647 * move to the repwait state
648 * Setup a listener for data on the reply fd, but also
649 * listen for packets over the wire, as the server may
650 * poll us if we take a long time.
651 * Setup a timeout that will fire if it takes too long to
654 as->state = s_repwait;
655 as->ev_repfd = event_register(as->repfd, EV_READFD, repfd_recv, as);
656 as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
658 security_recvpkt(as->security_handle, protocol_recv, as, -1);
663 * This is the repwait state. We have responded to the initial REQ with
664 * an ACK, and we are now waiting for the process we spawned to pass us
665 * data to send in a REP.
668 s_repwait(as, action, pkt)
669 struct active_service *as;
676 * We normally shouldn't receive any packets while waiting
677 * for our REP data, but in some cases we do.
679 if (action == A_RECVPKT) {
682 * Another req for something that's running. Just send an ACK
683 * and go back and wait for more data.
685 if (pkt->type == P_REQ) {
686 dbprintf(("%s: received dup P_REQ packet, ACKing it\n",
687 debug_prefix_time(NULL)));
688 pkt_init(&as->rep_pkt, P_ACK, "");
689 do_sendpkt(as->security_handle, &as->rep_pkt);
692 /* something unexpected. Nak it */
696 if (action == A_TIMEOUT) {
697 pkt_init(&as->rep_pkt, P_NAK, "ERROR timeout on reply pipe\n");
698 dbprintf(("%s: %s timed out waiting for REP data\n",
699 debug_prefix_time(NULL), as->cmd));
700 do_sendpkt(as->security_handle, &as->rep_pkt);
704 assert(action == A_RECVREP);
707 * If the read fails, consider the process dead, and remove it.
708 * Always save room for nul termination.
710 if (as->repbufsize + 1 >= sizeof(as->repbuf)) {
711 dbprintf(("%s: more than %d bytes in reply\n",
712 debug_prefix_time(NULL), sizeof(as->repbuf)));
713 dbprintf(("%s: reply so far:\n%s\n", debug_prefix(NULL), as->repbuf));
714 pkt_init(&as->rep_pkt, P_NAK, "ERROR more than %d bytes in reply\n",
716 do_sendpkt(as->security_handle, &as->rep_pkt);
720 n = read(as->repfd, as->repbuf + as->repbufsize,
721 sizeof(as->repbuf) - as->repbufsize - 1);
722 } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
724 const char *errstr = strerror(errno);
725 dbprintf(("%s: read error on reply pipe: %s\n",
726 debug_prefix_time(NULL), errstr));
727 pkt_init(&as->rep_pkt, P_NAK, "ERROR read error on reply pipe: %s\n",
729 do_sendpkt(as->security_handle, &as->rep_pkt);
733 * If we got some data, go back and wait for more, or EOF. Nul terminate
736 as->repbuf[n + as->repbufsize] = '\0';
739 if(as->send_partial_reply) {
740 pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
741 do_sendpkt(as->security_handle, &as->rep_pkt);
742 pkt_init(&as->rep_pkt, P_REP, "");
749 * If we got 0, then we hit EOF. Process the data and release
754 assert(as->ev_repfd != NULL);
755 event_release(as->ev_repfd);
758 assert(as->ev_reptimeout != NULL);
759 event_release(as->ev_reptimeout);
760 as->ev_reptimeout = NULL;
762 as->state = s_processrep;
768 * After we have read in all of the rep data, we process it and send
769 * it out as a REP packet.
772 s_processrep(as, action, pkt)
773 struct active_service *as;
780 * Copy the rep lines into the outgoing packet.
782 * If this line is a CONNECT, translate it
783 * Format is "CONNECT <tag> <handle> <tag> <handle> etc...
786 * CONNECT DATA 4 MESG 5 INDEX 6
788 * The tags are arbitrary. The handles are in the DATA_FD pool.
789 * We need to map these to security streams and pass them back
790 * to the amanda server. If the handle is -1, then we don't map.
792 repbuf = stralloc(as->repbuf);
793 pkt_init(&as->rep_pkt, P_REP, "");
794 tok = strtok(repbuf, " ");
797 if (strcmp(tok, "CONNECT") == 0) {
798 char *line, *nextbuf;
800 /* Save the entire line */
801 line = strtok(NULL, "\n");
802 /* Save the buf following the line */
803 nextbuf = strtok(NULL, "");
805 if (line == NULL || nextbuf == NULL)
808 pkt_cat(&as->rep_pkt, "CONNECT");
810 /* loop over the id/handle pairs */
813 tok = strtok(line, " ");
814 line = NULL; /* keep working from line */
817 pkt_cat(&as->rep_pkt, " %s", tok);
820 tok = strtok(NULL, " \n");
823 /* convert the handle into something the server can process */
824 pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
826 pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
829 pkt_cat(&as->rep_pkt, "%s", as->repbuf);
833 * We've setup our REP packet in as->rep_pkt. Now move to the transmission
836 as->state = s_sendrep;
837 as->repretry = MAX_REP_RETRIES;
843 * This is the state where we send the REP we just collected from our child.
846 s_sendrep(as, action, pkt)
847 struct active_service *as;
853 * Transmit it and move to the ack state.
855 do_sendpkt(as->security_handle, &as->rep_pkt);
856 security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
857 as->state = s_ackwait;
862 * This is the state in which we wait for the server to ACK the REP
866 s_ackwait(as, action, pkt)
867 struct active_service *as;
871 struct datafd_handle *dh;
875 * If we got a timeout, try again, but eventually give up.
877 if (action == A_TIMEOUT) {
878 if (--as->repretry > 0) {
879 as->state = s_sendrep;
882 dbprintf(("%s: timeout waiting for ACK for our REP\n",
883 debug_prefix_time(NULL)));
887 dbprintf(("%s: received ACK, now opening streams\n",
888 debug_prefix_time(NULL)));
891 assert(action == A_RECVPKT);
892 if (pkt->type != P_ACK)
896 * Got the ack, now open the pipes
898 for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
899 if (dh->netfd == NULL)
901 if (security_stream_accept(dh->netfd) < 0) {
902 dbprintf(("%s: stream %d accept failed: %s\n",
903 debug_prefix_time(NULL),
904 dh - &as->data[0], security_geterror(as->security_handle)));
905 security_stream_close(dh->netfd);
908 /* setup an event for reads from it */
909 dh->ev_handle = event_register(dh->fd, EV_READFD, process_netfd, dh);
913 * Pipes are open, so auth them. Count them at the same time.
915 for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
916 if (dh->netfd == NULL)
918 if (security_stream_auth(dh->netfd) < 0) {
919 security_stream_close(dh->netfd);
921 event_release(dh->ev_handle);
922 dh->ev_handle = NULL;
929 * If no pipes are open, then we're done. Otherwise, just start running.
930 * The event handlers on all of the pipes will take it from here.
933 dbprintf(("%s: at end of s_ackwait, npipes is %d\n",
934 debug_prefix_time(NULL), npipes));
939 security_close(as->security_handle);
940 as->security_handle = NULL;
946 * Called when a repfd has received data
952 struct active_service *as = cookie;
955 assert(as->ev_repfd != NULL);
957 state_machine(as, A_RECVREP, NULL);
961 * Called when a repfd has timed out
964 timeout_repfd(cookie)
967 struct active_service *as = cookie;
970 assert(as->ev_reptimeout != NULL);
972 state_machine(as, A_TIMEOUT, NULL);
976 * Called when a handle has received data
979 protocol_recv(cookie, pkt, status)
982 security_status_t status;
984 struct active_service *as = cookie;
990 dbprintf(("%s: received %s pkt:\n<<<<<\n%s>>>>>\n",
991 debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
992 state_machine(as, A_RECVPKT, pkt);
995 dbprintf(("%s: timeout\n", debug_prefix_time(NULL)));
996 state_machine(as, A_TIMEOUT, NULL);
999 dbprintf(("%s: receive error: %s\n",
1000 debug_prefix_time(NULL), security_geterror(as->security_handle)));
1006 * This is a generic relay function that just reads data from one of
1007 * the process's pipes and passes it up the equivalent security_stream_t
1010 process_netfd(cookie)
1014 struct datafd_handle *dh = cookie;
1015 struct active_service *as = dh->as;
1019 n = read(dh->fd, as->databuf, sizeof(as->databuf));
1020 } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
1026 pkt_init(&nak, P_NAK, "ERROR data descriptor %d broken: %s\n",
1027 dh->fd, strerror(errno));
1031 * Process has closed the pipe. Just remove this event handler.
1032 * If all pipes are closed, shut down this service.
1035 event_release(dh->ev_handle);
1036 dh->ev_handle = NULL;
1037 security_stream_close(dh->netfd);
1039 for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1040 if (dh->netfd != NULL)
1046 if (security_stream_write(dh->netfd, as->databuf, n) < 0) {
1047 /* stream has croaked */
1048 pkt_init(&nak, P_NAK, "ERROR write error on stream %d: %s\n",
1049 security_stream_id(dh->netfd),
1050 security_stream_geterror(dh->netfd));
1056 do_sendpkt(as->security_handle, &nak);
1062 * Convert a local stream handle (DATA_FD...) into something that
1063 * can be sent to the amanda server.
1065 * Returns a number that should be sent to the server in the REP packet.
1068 allocstream(as, handle)
1069 struct active_service *as;
1072 struct datafd_handle *dh;
1074 /* if the handle is -1, then we don't bother */
1078 /* make sure the handle's kosher */
1079 if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
1082 /* get a pointer into our handle array */
1083 dh = &as->data[handle - DATA_FD_OFFSET];
1085 /* make sure we're not already using the net handle */
1086 if (dh->netfd != NULL)
1089 /* allocate a stream from the security layer and return */
1090 dh->netfd = security_stream_server(as->security_handle);
1091 if (dh->netfd == NULL) {
1092 dbprintf(("%s: couldn't open stream to server: %s\n",
1093 debug_prefix_time(NULL), security_geterror(as->security_handle)));
1098 * convert the stream into a numeric id that can be sent to the
1101 return (security_stream_id(dh->netfd));
1105 * Create a new service instance
1107 static struct active_service *
1108 service_new(security_handle, cmd, arguments)
1109 security_handle_t *security_handle;
1110 const char *cmd, *arguments;
1112 int data[DATA_FD_COUNT + 2][2], i;
1113 struct active_service *as;
1117 assert(security_handle != NULL);
1118 assert(cmd != NULL);
1119 assert(arguments != NULL);
1121 /* a plethora of pipes */
1122 for (i = 0; i < DATA_FD_COUNT + 2; i++)
1123 if (pipe(data[i]) < 0)
1124 error("pipe: %s", strerror(errno));
1126 switch(pid = fork()) {
1128 error("could not fork service %s: %s", cmd, strerror(errno));
1131 * The parent. Close the far ends of our pipes and return.
1133 as = alloc(sizeof(*as));
1134 as->cmd = stralloc(cmd);
1135 as->arguments = stralloc(arguments);
1136 as->security_handle = security_handle;
1139 as->send_partial_reply = 0;
1140 if(strcmp(cmd+(strlen(cmd)-8), "sendsize") == 0) {
1141 g_option_t *g_options;
1142 char *option_str, *p;
1144 option_str = stralloc(as->arguments+8);
1145 p = strchr(option_str,'\n');
1148 g_options = parse_g_options(option_str, 1);
1149 if(am_has_feature(g_options->features, fe_partial_estimate)) {
1150 as->send_partial_reply = 1;
1156 /* write to the request pipe */
1158 as->reqfd = data[0][1];
1161 * read from the reply pipe
1163 as->repfd = data[1][0];
1165 as->ev_repfd = NULL;
1170 * read from the rest of the general-use pipes
1171 * (netfds are opened as the client requests them)
1173 for (i = 0; i < DATA_FD_COUNT; i++) {
1174 aclose(data[i + 2][1]);
1175 as->data[i].fd = data[i + 2][0];
1176 as->data[i].ev_handle = NULL;
1177 as->data[i].netfd = NULL;
1178 as->data[i].as = as;
1181 /* add it to the service queue */
1182 /* increment the active service count */
1183 TAILQ_INSERT_TAIL(&serviceq.tailq, as, tq);
1189 * The child. Put our pipes in their advertised locations
1198 * The data stream is stdin in the new process
1200 if (dup2(data[0][0], 0) < 0) {
1201 error("dup %d to %d failed: %s\n", data[0][0], 0,
1208 * The reply stream is stdout
1210 if (dup2(data[1][1], 1) < 0) {
1211 error("dup %d to %d failed: %s\n", data[1][1], 1,
1218 * The rest start at the offset defined in amandad.h, and continue
1219 * through the internal defined.
1221 for (i = 0; i < DATA_FD_COUNT; i++)
1222 aclose(data[i + 2][0]);
1225 * Make sure they are not open in the range DATA_FD_OFFSET to
1226 * DATA_FD_OFFSET + DATA_FD_COUNT - 1
1228 for (i = 0; i < DATA_FD_COUNT; i++) {
1229 while(data[i + 2][1] >= DATA_FD_OFFSET &&
1230 data[i + 2][1] <= DATA_FD_OFFSET + DATA_FD_COUNT - 1) {
1231 newfd = dup(data[i + 2][1]);
1233 error("Can't dup out off DATA_FD range");
1234 data[i + 2][1] = newfd;
1237 for (i = 0; i < DATA_FD_COUNT; i++)
1238 close(DATA_FD_OFFSET + i);
1240 for (i = 0; i < DATA_FD_COUNT; i++) {
1241 if (dup2(data[i + 2][1], i + DATA_FD_OFFSET) < 0) {
1242 error("dup %d to %d failed: %s\n", data[i + 2][1],
1243 i + DATA_FD_OFFSET, strerror(errno));
1245 aclose(data[i + 2][1]);
1248 execle(cmd, cmd, NULL, safe_env());
1249 error("could not exec service %s: %s", cmd, strerror(errno));
1256 * Unallocate a service instance
1260 struct active_service *as;
1263 struct datafd_handle *dh;
1265 #ifdef AMANDAD_DEBUG
1266 dbprintf(("%s: closing service: %s\n",
1267 debug_prefix_time(NULL), (as->cmd)?as->cmd:"??UNKONWN??"));
1272 assert(as->cmd != NULL);
1275 assert(as->arguments != NULL);
1276 amfree(as->arguments);
1278 if (as->reqfd != -1)
1280 if (as->repfd != -1)
1283 if (as->ev_repfd != NULL)
1284 event_release(as->ev_repfd);
1285 if (as->ev_reptimeout != NULL)
1286 event_release(as->ev_reptimeout);
1288 for (i = 0; i < DATA_FD_COUNT; i++) {
1293 if (dh->netfd != NULL)
1294 security_stream_close(dh->netfd);
1296 if (dh->ev_handle != NULL)
1297 event_release(dh->ev_handle);
1300 if (as->security_handle != NULL)
1301 security_close(as->security_handle);
1303 assert(as->pid > 0);
1304 kill(as->pid, SIGTERM);
1305 waitpid(as->pid, NULL, WNOHANG);
1307 TAILQ_REMOVE(&serviceq.tailq, as, tq);
1308 assert(serviceq.qlength > 0);
1315 * Like 'fullwrite', but does the work in a child process so pipelines
1319 writebuf(as, bufp, size)
1320 struct active_service *as;
1326 switch (pid=fork()) {
1331 waitpid(pid, NULL, WNOHANG);
1332 return 0; /* this is the parent */
1335 break; /* this is the child */
1337 aclose (as->repfd); /* make sure we are not a reader */
1338 exit (fullwrite(as->reqfd, bufp, size) != size);
1342 do_sendpkt(handle, pkt)
1343 security_handle_t *handle;
1346 dbprintf(("%s: sending %s pkt:\n<<<<<\n%s>>>>>\n",
1347 debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
1348 return security_sendpkt(handle, pkt);
1351 #ifdef AMANDAD_DEBUG
1353 * Convert a state into a string
1359 static const struct {
1363 #define X(state) { state, stringize(state) }
1373 for (i = 0; i < sizeof(states) / sizeof(states[0]); i++)
1374 if (state == states[i].state)
1375 return (states[i].str);
1376 return ("INVALID STATE");
1380 * Convert an action into a string
1386 static const struct {
1390 #define X(action) { action, stringize(action) }
1403 for (i = 0; i < sizeof(actions) / sizeof(actions[0]); i++)
1404 if (action == actions[i].action)
1405 return (actions[i].str);
1406 return ("UNKNOWN ACTION");
1408 #endif /* AMANDAD_DEBUG */