2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 1991-1999 University of Maryland at College Park
6 * Permission to use, copy, modify, distribute, and sell this software and its
7 * documentation for any purpose is hereby granted without fee, provided that
8 * the above copyright notice appear in all copies and that both that
9 * copyright notice and this permission notice appear in supporting
10 * documentation, and that the name of U.M. not be used in advertising or
11 * publicity pertaining to distribution of the software without specific,
12 * written prior permission. U.M. makes no representations about the
13 * suitability of this software for any purpose. It is provided "as is"
14 * without express or implied warranty.
16 * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18 * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20 * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21 * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
23 * Authors: the Amanda Development Team. Its members are listed in a
24 * file named AUTHORS, in the root directory of this distribution.
28 * $Id: amandad.c,v 1.18.2.5 2007/01/10 16:26:57 martinea Exp $
30 * handle client-host side of Amanda network communications, including
31 * security checks, execution of the proper service, and acking the
35 /*#define AMANDAD_DEBUG*/
41 #include "amfeatures.h"
49 #define REP_TIMEOUT (6*60*60) /* secs for service to reply */
50 #define ACK_TIMEOUT 10 /* XXX should be configurable */
51 #define MAX_REP_RETRIES 5
54 * These are the actions for entering the state machine
56 typedef enum { A_START, A_RECVPKT, A_RECVREP, A_PENDING, A_FINISH, A_CONTINUE,
57 A_SENDNAK, A_TIMEOUT } action_t;
60 * This is a state in the state machine. It is a function pointer to
61 * the function that actually implements the state.
63 struct active_service;
64 typedef action_t (*state_t)(struct active_service *, action_t, pkt_t *);
67 * This structure describes an active running service.
69 * An active service is something running that we have received
70 * a request for. This structure holds info on that service, including
71 * file descriptors for data, etc, as well as the security handle
72 * for communications with the amanda server.
74 struct active_service {
75 char *cmd; /* name of command we ran */
76 char *arguments; /* arguments we sent it */
77 security_handle_t *security_handle; /* remote server */
78 state_t state; /* how far this has progressed */
79 pid_t pid; /* pid of subprocess */
80 int send_partial_reply; /* send PREP packet */
81 int reqfd; /* pipe to write requests */
82 int repfd; /* pipe to read replies */
83 event_handle_t *ev_repfd; /* read event handle for repfd */
84 event_handle_t *ev_reptimeout; /* timeout for rep data */
85 pkt_t rep_pkt; /* rep packet we're sending out */
86 char *repbuf; /* buffer to read the rep into */
87 size_t bufsize; /* length of repbuf */
88 size_t repbufsize; /* length of repbuf */
89 int repretry; /* times we'll retry sending the rep */
91 * General user streams to the process, and their equivalent
94 struct datafd_handle {
95 int fd_read; /* pipe to child process */
96 int fd_write; /* pipe to child process */
97 event_handle_t *ev_read; /* it's read event handle */
98 event_handle_t *ev_write; /* it's write event handle */
99 security_stream_t *netfd; /* stream to amanda server */
100 struct active_service *as; /* pointer back to our enclosure */
101 } data[DATA_FD_COUNT];
102 char databuf[NETWORK_BLOCK_BYTES]; /* buffer to relay netfd data in */
103 TAILQ_ENTRY(active_service) tq; /* queue handle */
107 * Here are the services that we allow.
109 static struct services {
120 #define NSERVICES (int)(sizeof(services) / sizeof(services[0]))
123 * Queue of outstanding requests that we are running.
126 TAILQ_HEAD(, active_service) tailq;
129 TAILQ_HEAD_INITIALIZER(serviceq.tailq), 0
133 * Data for dbmalloc to check for memory leaks
138 unsigned long size, hist;
143 static int wait_30s = 1;
144 static int exit_on_qlength = 1;
145 static char *auth = NULL;
147 int main(int argc, char **argv);
149 static int allocstream(struct active_service *, int);
150 static void exit_check(void *);
151 static void protocol_accept(security_handle_t *, pkt_t *);
152 static void state_machine(struct active_service *, action_t, pkt_t *);
154 static action_t s_sendack(struct active_service *, action_t, pkt_t *);
155 static action_t s_repwait(struct active_service *, action_t, pkt_t *);
156 static action_t s_processrep(struct active_service *, action_t, pkt_t *);
157 static action_t s_sendrep(struct active_service *, action_t, pkt_t *);
158 static action_t s_ackwait(struct active_service *, action_t, pkt_t *);
160 static void repfd_recv(void *);
161 static void timeout_repfd(void *);
162 static void protocol_recv(void *, pkt_t *, security_status_t);
163 static void process_readnetfd(void *);
164 static void process_writenetfd(void *, void *, ssize_t);
165 static struct active_service *service_new(security_handle_t *,
166 const char *, const char *);
167 static void service_delete(struct active_service *);
168 static int writebuf(struct active_service *, const void *, size_t);
169 static ssize_t do_sendpkt(security_handle_t *handle, pkt_t *pkt);
171 static void child_signal(int signal);
174 static const char *state2str(state_t);
175 static const char *action2str(action_t);
179 * Harvests defunct processes...
188 (void)signal; /* Quite compiler warning */
190 * Reap and child status and promptly ignore since we don't care...
193 rp = waitpid(-1, NULL, WNOHANG);
205 const security_driver_t *secdrv;
207 struct sigaction act, oact;
208 char *pgm = "amandad"; /* in case argv[0] is not set */
209 #if defined(AMANDAD_DEBUG) && defined(USE_REUSEADDR)
218 * When called via inetd, it is not uncommon to forget to put the
219 * argv[0] value on the config line. On some systems (e.g. Solaris)
220 * this causes argv and/or argv[0] to be NULL, so we have to be
221 * careful getting our name.
223 if ((argv == NULL) || (argv[0] == NULL)) {
224 pgm = "amandad"; /* in case argv[0] is not set */
226 pgm = basename(argv[0]); /* Strip of leading path get debug name */
229 dbopen(DBG_SUBDIR_AMANDAD);
232 error("argv == NULL\n");
236 /* Don't die when child closes pipe */
237 signal(SIGPIPE, SIG_IGN);
239 /* Tell me when a child exits or dies... */
240 act.sa_handler = child_signal;
241 sigemptyset(&act.sa_mask);
243 if(sigaction(SIGCHLD, &act, &oact) != 0) {
244 error("error setting SIGCHLD handler: %s", strerror(errno));
249 dbmalloc_info.start.size = malloc_inuse(&dbmalloc_info.start.hist);
252 erroutput_type = (ERR_INTERACTIVE|ERR_SYSLOG);
255 /* we'd rather not run as root */
256 if (geteuid() == 0) {
257 if(client_uid == (uid_t) -1) {
258 error("error [cannot find user %s in passwd file]\n", CLIENT_LOGIN);
261 initgroups(CLIENT_LOGIN, client_gid);
266 #endif /* FORCE_USERID */
269 * ad-hoc argument parsing
271 * We accept -auth=[authentication type]
277 * We also add a list of services that amandad can launch
280 in = 0; out = 1; /* default to stdin/stdout */
282 for (i = 1; i < argc; i++) {
284 * accept -krb4 as an alias for -auth=krb4 (for compatibility)
286 if (strcmp(argv[i], "-krb4") == 0) {
287 argv[i] = "-auth=krb4";
293 * Get a driver for a security type specified after -auth=
295 else if (strncmp(argv[i], "-auth=", strlen("-auth=")) == 0) {
296 argv[i] += strlen("-auth=");
297 secdrv = security_getdriver(argv[i]);
299 if (secdrv == NULL) {
300 error("no driver for security type '%s'\n", argv[i]);
307 * If -no-exit is specified, always run even after requests have
310 else if (strcmp(argv[i], "-no-exit") == 0) {
317 * Allow us to directly bind to a udp port for debugging.
318 * This may only apply to some security types.
320 else if (strncmp(argv[i], "-udp=", strlen("-udp=")) == 0) {
321 struct sockaddr_in sin;
323 argv[i] += strlen("-udp=");
324 in = out = socket(AF_INET, SOCK_DGRAM, 0);
326 error("can't create dgram socket: %s\n", strerror(errno));
330 r = setsockopt(in, SOL_SOCKET, SO_REUSEADDR,
331 (void *)&on, (socklen_t)sizeof(on));
333 dbprintf(("%s: amandad: setsockopt(SO_REUSEADDR) failed: %s\n",
339 sin.sin_family = (sa_family_t)AF_INET;
340 sin.sin_addr.s_addr = INADDR_ANY;
341 sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
342 if (bind(in, (struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
343 error("can't bind to port %d: %s\n", atoi(argv[i]),
349 * Ditto for tcp ports.
351 else if (strncmp(argv[i], "-tcp=", strlen("-tcp=")) == 0) {
352 struct sockaddr_in sin;
356 argv[i] += strlen("-tcp=");
357 sock = socket(AF_INET, SOCK_STREAM, 0);
359 error("can't create tcp socket: %s\n", strerror(errno));
364 r = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
365 (void *)&on, (socklen_t)sizeof(on));
367 dbprintf(("%s: amandad: setsockopt(SO_REUSEADDR) failed: %s\n",
372 setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
373 (void *)&n, (socklen_t)sizeof(n));
374 sin.sin_family = (sa_family_t)AF_INET;
375 sin.sin_addr.s_addr = INADDR_ANY;
376 sin.sin_port = (in_port_t)htons((in_port_t)atoi(argv[i]));
377 if (bind(sock, (struct sockaddr *)&sin, (socklen_t)sizeof(sin)) < 0) {
378 error("can't bind to port %d: %s\n", atoi(argv[i]),
383 n = (socklen_t)sizeof(sin);
384 in = out = accept(sock, (struct sockaddr *)&sin, &n);
388 * It must be a service name
391 /* clear all services */
393 for (j = 0; j < (int)NSERVICES; j++)
394 services[j].active = 0;
398 if(strcmp(argv[i],"amdump") == 0) {
399 services[0].active = 1;
400 services[1].active = 1;
401 services[2].active = 1;
402 services[3].active = 1;
405 for (j = 0; j < (int)NSERVICES; j++)
406 if (strcmp(services[j].name, argv[i]) == 0)
408 if (j == (int)NSERVICES) {
409 dbprintf(("%s: %s: invalid service\n",
410 debug_prefix_time(NULL), argv[i]));
413 services[j].active = 1;
419 * If no security type specified, use BSD
421 if (secdrv == NULL) {
422 secdrv = security_getdriver("BSD");
424 if (secdrv == NULL) {
425 error("no driver for default security type 'BSD'\n");
430 if(strcasecmp(auth, "rsh") == 0 ||
431 strcasecmp(auth, "ssh") == 0 ||
432 strcasecmp(auth, "bsdtcp") == 0) {
441 dbprintf(("%s: version %s\n", get_pname(), version()));
442 for (i = 0; version_info[i] != NULL; i++) {
443 dbprintf(("%s: %s", debug_prefix(NULL), version_info[i]));
446 if (! (argc >= 1 && argv != NULL && argv[0] != NULL)) {
447 dbprintf(("%s: WARNING: argv[0] not defined: check inetd.conf\n",
448 debug_prefix(NULL)));
452 * Schedule to call protocol_accept() when new security handles
453 * are created on stdin.
455 security_accept(secdrv, in, out, protocol_accept);
458 * Schedule an event that will try to exit every 30 seconds if there
459 * are no requests outstanding.
462 (void)event_register((event_id_t)30, EV_TIME, exit_check, &no_exit);
465 * Call event_loop() with an arg of 0, telling it to block until all
466 * events are completed.
477 * This runs periodically and checks to see if we have any active services
478 * still running. If we don't, then we quit.
486 assert(cookie != NULL);
487 no_exit = *(int *)cookie;
490 * If things are still running, then don't exit.
492 if (serviceq.qlength > 0)
496 * If the caller asked us to never exit, then we're done
502 dbmalloc_info.end.size = malloc_inuse(&dbmalloc_info.end.hist);
504 if (dbmalloc_info.start.size != dbmalloc_info.end.size) {
505 malloc_list(dbfd(), dbmalloc_info.start.hist,
506 dbmalloc_info.end.hist);
515 * Handles new incoming protocol handles. This is a callback for
516 * security_accept(), which gets called when new handles are detected.
520 security_handle_t * handle,
524 struct active_service *as;
525 char *pktbody, *tok, *service, *arguments;
526 char *service_path = NULL;
532 * If handle is NULL, then the connection is closed.
539 * If pkt is NULL, then there was a problem with the new connection.
542 dbprintf(("%s: accept error: %s\n",
543 debug_prefix_time(NULL), security_geterror(handle)));
544 pkt_init(&pkt_out, P_NAK, "ERROR %s\n", security_geterror(handle));
545 do_sendpkt(handle, &pkt_out);
546 amfree(pkt_out.body);
547 security_close(handle);
551 dbprintf(("%s: accept recv %s pkt:\n<<<<<\n%s>>>>>\n",
552 debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
555 * If this is not a REQ packet, just forget about it.
557 if (pkt->type != P_REQ) {
558 dbprintf(("%s: received unexpected %s packet:\n<<<<<\n%s>>>>>\n\n",
559 debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
560 security_close(handle);
564 pktbody = service = arguments = NULL;
568 * Parse out the service and arguments
571 pktbody = stralloc(pkt->body);
573 tok = strtok(pktbody, " ");
576 if (strcmp(tok, "SERVICE") != 0)
579 tok = strtok(NULL, " \n");
582 service = stralloc(tok);
584 /* we call everything else 'arguments' */
585 tok = strtok(NULL, "");
588 arguments = stralloc(tok);
590 /* see if it's one we allow */
591 for (i = 0; i < (int)NSERVICES; i++)
592 if (services[i].active == 1 && strcmp(services[i].name, service) == 0)
594 if (i == (int)NSERVICES) {
595 dbprintf(("%s: %s: invalid service\n",
596 debug_prefix_time(NULL), service));
597 pkt_init(&pkt_out, P_NAK, "ERROR %s: invalid service\n", service);
601 service_path = vstralloc(libexecdir, "/", service, versionsuffix(), NULL);
602 if (access(service_path, X_OK) < 0) {
603 dbprintf(("%s: can't execute %s: %s\n",
604 debug_prefix_time(NULL), service_path, strerror(errno)));
605 pkt_init(&pkt_out, P_NAK,
606 "ERROR execute access to \"%s\" denied\n",
611 /* see if its already running */
612 for (as = TAILQ_FIRST(&serviceq.tailq); as != NULL;
613 as = TAILQ_NEXT(as, tq)) {
614 if (strcmp(as->cmd, service_path) == 0 &&
615 strcmp(as->arguments, arguments) == 0) {
616 dbprintf(("%s: %s %s: already running, acking req\n",
617 debug_prefix_time(NULL), service, arguments));
618 pkt_init_empty(&pkt_out, P_ACK);
619 goto send_pkt_out_no_delete;
624 * create a new service instance, and send the arguments down
627 dbprintf(("%s: creating new service: %s\n%s\n",
628 debug_prefix_time(NULL), service, arguments));
629 as = service_new(handle, service_path, arguments);
630 if (writebuf(as, arguments, strlen(arguments)) < 0) {
631 const char *errmsg = strerror(errno);
632 dbprintf(("%s: error sending arguments to %s: %s\n",
633 debug_prefix_time(NULL), service, errmsg));
634 pkt_init(&pkt_out, P_NAK, "ERROR error writing arguments to %s: %s\n",
642 amfree(service_path);
646 * Move to the sendack state, and start up the state
649 as->state = s_sendack;
650 state_machine(as, A_START, NULL);
654 pkt_init(&pkt_out, P_NAK, "ERROR invalid REQ\n");
655 dbprintf(("%s: received invalid %s packet:\n<<<<<\n%s>>>>>\n\n",
656 debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
661 send_pkt_out_no_delete:
663 amfree(service_path);
666 do_sendpkt(handle, &pkt_out);
667 security_close(handle);
668 amfree(pkt_out.body);
672 * Handles incoming protocol packets. Routes responses to the proper
677 struct active_service * as,
686 dbprintf(("%s: state_machine: %p entering\n",
687 debug_prefix_time(NULL), as));
690 curstate = as->state;
692 dbprintf(("%s: state_machine: %p curstate=%s action=%s\n",
693 debug_prefix_time(NULL), as,
694 state2str(curstate), action2str(action)));
696 retaction = (*curstate)(as, action, pkt);
698 dbprintf(("%s: state_machine: %p curstate=%s returned %s (nextstate=%s)\n",
699 debug_prefix_time(NULL),
700 as, state2str(curstate), action2str(retaction),
701 state2str(as->state)));
706 * State has queued up and is now blocking on input.
710 dbprintf(("%s: state_machine: %p leaving (A_PENDING)\n",
711 debug_prefix_time(NULL), as));
716 * service has switched states. Loop.
722 * state has determined that the packet it received was bogus.
723 * Send a nak, and return.
726 dbprintf(("%s: received unexpected %s packet\n",
727 debug_prefix_time(NULL), pkt_type2str(pkt->type)));
728 dbprintf(("<<<<<\n%s----\n\n", pkt->body));
729 pkt_init(&nak, P_NAK, "ERROR unexpected packet type %s\n",
730 pkt_type2str(pkt->type));
731 do_sendpkt(as->security_handle, &nak);
733 security_recvpkt(as->security_handle, protocol_recv, as, -1);
735 dbprintf(("%s: state_machine: %p leaving (A_SENDNAK)\n",
736 debug_prefix_time(NULL), as));
741 * Service is done. Remove it and finish.
746 dbprintf(("%s: state_machine: %p leaving (A_FINISH)\n",
747 debug_prefix_time(NULL), as));
760 * This state just sends an ack. After that, we move to the repwait
761 * state to wait for REP data to arrive from the subprocess.
765 struct active_service * as,
771 (void)action; /* Quiet unused parameter warning */
772 (void)pkt; /* Quiet unused parameter warning */
774 pkt_init_empty(&ack, P_ACK);
775 if (do_sendpkt(as->security_handle, &ack) < 0) {
776 dbprintf(("%s: error sending ACK: %s\n",
777 debug_prefix_time(NULL), security_geterror(as->security_handle)));
784 * move to the repwait state
785 * Setup a listener for data on the reply fd, but also
786 * listen for packets over the wire, as the server may
787 * poll us if we take a long time.
788 * Setup a timeout that will fire if it takes too long to
791 as->state = s_repwait;
792 as->ev_repfd = event_register((event_id_t)as->repfd, EV_READFD, repfd_recv, as);
793 as->ev_reptimeout = event_register(REP_TIMEOUT, EV_TIME,
795 security_recvpkt(as->security_handle, protocol_recv, as, -1);
800 * This is the repwait state. We have responded to the initial REQ with
801 * an ACK, and we are now waiting for the process we spawned to pass us
802 * data to send in a REP.
806 struct active_service * as,
814 * We normally shouldn't receive any packets while waiting
815 * for our REP data, but in some cases we do.
817 if (action == A_RECVPKT) {
820 * Another req for something that's running. Just send an ACK
821 * and go back and wait for more data.
823 if (pkt->type == P_REQ) {
824 dbprintf(("%s: received dup P_REQ packet, ACKing it\n",
825 debug_prefix_time(NULL)));
826 amfree(as->rep_pkt.body);
827 pkt_init_empty(&as->rep_pkt, P_ACK);
828 do_sendpkt(as->security_handle, &as->rep_pkt);
829 security_recvpkt(as->security_handle, protocol_recv, as, -1);
832 /* something unexpected. Nak it */
836 if (action == A_TIMEOUT) {
837 amfree(as->rep_pkt.body);
838 pkt_init(&as->rep_pkt, P_NAK, "ERROR timeout on reply pipe\n");
839 dbprintf(("%s: %s timed out waiting for REP data\n",
840 debug_prefix_time(NULL), as->cmd));
841 do_sendpkt(as->security_handle, &as->rep_pkt);
845 assert(action == A_RECVREP);
846 if(as->bufsize == 0) {
847 as->bufsize = NETWORK_BLOCK_BYTES;
848 as->repbuf = alloc(as->bufsize);
852 n = read(as->repfd, as->repbuf + as->repbufsize,
853 as->bufsize - as->repbufsize - 1);
854 } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
856 const char *errstr = strerror(errno);
857 dbprintf(("%s: read error on reply pipe: %s\n",
858 debug_prefix_time(NULL), errstr));
859 amfree(as->rep_pkt.body);
860 pkt_init(&as->rep_pkt, P_NAK, "ERROR read error on reply pipe: %s\n",
862 do_sendpkt(as->security_handle, &as->rep_pkt);
866 * If we got some data, go back and wait for more, or EOF. Nul terminate
869 as->repbuf[n + as->repbufsize] = '\0';
872 if(as->repbufsize >= (as->bufsize - 1)) {
874 repbuf_temp = alloc(as->bufsize);
875 memcpy(repbuf_temp, as->repbuf, as->repbufsize + 1);
877 as->repbuf = repbuf_temp;
879 else if(as->send_partial_reply) {
880 amfree(as->rep_pkt.body);
881 pkt_init(&as->rep_pkt, P_PREP, "%s", as->repbuf);
882 do_sendpkt(as->security_handle, &as->rep_pkt);
883 amfree(as->rep_pkt.body);
884 pkt_init_empty(&as->rep_pkt, P_REP);
891 * If we got 0, then we hit EOF. Process the data and release
896 assert(as->ev_repfd != NULL);
897 event_release(as->ev_repfd);
900 assert(as->ev_reptimeout != NULL);
901 event_release(as->ev_reptimeout);
902 as->ev_reptimeout = NULL;
904 as->state = s_processrep;
910 * After we have read in all of the rep data, we process it and send
911 * it out as a REP packet.
915 struct active_service * as,
921 (void)action; /* Quiet unused parameter warning */
922 (void)pkt; /* Quiet unused parameter warning */
925 * Copy the rep lines into the outgoing packet.
927 * If this line is a CONNECT, translate it
928 * Format is "CONNECT <tag> <handle> <tag> <handle> etc...
931 * CONNECT DATA 4 MESG 5 INDEX 6
933 * The tags are arbitrary. The handles are in the DATA_FD pool.
934 * We need to map these to security streams and pass them back
935 * to the amanda server. If the handle is -1, then we don't map.
937 repbuf = stralloc(as->repbuf);
938 amfree(as->rep_pkt.body);
939 pkt_init_empty(&as->rep_pkt, P_REP);
940 tok = strtok(repbuf, " ");
943 if (strcmp(tok, "CONNECT") == 0) {
944 char *line, *nextbuf;
946 /* Save the entire line */
947 line = strtok(NULL, "\n");
948 /* Save the buf following the line */
949 nextbuf = strtok(NULL, "");
951 if (line == NULL || nextbuf == NULL)
954 pkt_cat(&as->rep_pkt, "CONNECT");
956 /* loop over the id/handle pairs */
959 tok = strtok(line, " ");
960 line = NULL; /* keep working from line */
963 pkt_cat(&as->rep_pkt, " %s", tok);
966 tok = strtok(NULL, " \n");
969 /* convert the handle into something the server can process */
970 pkt_cat(&as->rep_pkt, " %d", allocstream(as, atoi(tok)));
972 pkt_cat(&as->rep_pkt, "\n%s", nextbuf);
975 pkt_cat(&as->rep_pkt, "%s", as->repbuf);
979 * We've setup our REP packet in as->rep_pkt. Now move to the transmission
982 as->state = s_sendrep;
983 as->repretry = MAX_REP_RETRIES;
989 * This is the state where we send the REP we just collected from our child.
993 struct active_service * as,
997 (void)action; /* Quiet unused parameter warning */
998 (void)pkt; /* Quiet unused parameter warning */
1001 * Transmit it and move to the ack state.
1003 do_sendpkt(as->security_handle, &as->rep_pkt);
1004 security_recvpkt(as->security_handle, protocol_recv, as, ACK_TIMEOUT);
1005 as->state = s_ackwait;
1010 * This is the state in which we wait for the server to ACK the REP
1015 struct active_service * as,
1019 struct datafd_handle *dh;
1023 * If we got a timeout, try again, but eventually give up.
1025 if (action == A_TIMEOUT) {
1026 if (--as->repretry > 0) {
1027 as->state = s_sendrep;
1028 return (A_CONTINUE);
1030 dbprintf(("%s: timeout waiting for ACK for our REP\n",
1031 debug_prefix_time(NULL)));
1034 #ifdef AMANDAD_DEBUG
1035 dbprintf(("%s: received ACK, now opening streams\n",
1036 debug_prefix_time(NULL)));
1039 assert(action == A_RECVPKT);
1041 if (pkt->type == P_REQ) {
1042 dbprintf(("%s: received dup P_REQ packet, resending REP\n",
1043 debug_prefix_time(NULL)));
1044 as->state = s_sendrep;
1045 return (A_CONTINUE);
1048 if (pkt->type != P_ACK)
1052 * Got the ack, now open the pipes
1054 for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1055 if (dh->netfd == NULL)
1057 if (security_stream_accept(dh->netfd) < 0) {
1058 dbprintf(("%s: stream %ld accept failed: %s\n",
1059 debug_prefix_time(NULL),
1060 dh - &as->data[0], security_geterror(as->security_handle)));
1061 security_stream_close(dh->netfd);
1064 /* setup an event for reads from it */
1065 dh->ev_read = event_register((event_id_t)dh->fd_read, EV_READFD,
1066 process_readnetfd, dh);
1068 security_stream_read(dh->netfd, process_writenetfd, dh);
1073 * Pipes are open, so auth them. Count them at the same time.
1075 for (npipes = 0, dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1076 if (dh->netfd == NULL)
1078 if (security_stream_auth(dh->netfd) < 0) {
1079 security_stream_close(dh->netfd);
1081 event_release(dh->ev_read);
1082 event_release(dh->ev_write);
1084 dh->ev_write = NULL;
1091 * If no pipes are open, then we're done. Otherwise, just start running.
1092 * The event handlers on all of the pipes will take it from here.
1094 #ifdef AMANDAD_DEBUG
1095 dbprintf(("%s: at end of s_ackwait, npipes is %d\n",
1096 debug_prefix_time(NULL), npipes));
1101 security_close(as->security_handle);
1102 as->security_handle = NULL;
1108 * Called when a repfd has received data
1114 struct active_service *as = cookie;
1117 assert(as->ev_repfd != NULL);
1119 state_machine(as, A_RECVREP, NULL);
1123 * Called when a repfd has timed out
1129 struct active_service *as = cookie;
1132 assert(as->ev_reptimeout != NULL);
1134 state_machine(as, A_TIMEOUT, NULL);
1138 * Called when a handle has received data
1144 security_status_t status)
1146 struct active_service *as = cookie;
1152 dbprintf(("%s: received %s pkt:\n<<<<<\n%s>>>>>\n",
1153 debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
1154 state_machine(as, A_RECVPKT, pkt);
1157 dbprintf(("%s: timeout\n", debug_prefix_time(NULL)));
1158 state_machine(as, A_TIMEOUT, NULL);
1161 dbprintf(("%s: receive error: %s\n",
1162 debug_prefix_time(NULL), security_geterror(as->security_handle)));
1168 * This is a generic relay function that just reads data from one of
1169 * the process's pipes and passes it up the equivalent security_stream_t
1176 struct datafd_handle *dh = cookie;
1177 struct active_service *as = dh->as;
1183 n = read(dh->fd_read, as->databuf, SIZEOF(as->databuf));
1184 } while ((n < 0) && ((errno == EINTR) || (errno == EAGAIN)));
1190 pkt_init(&nak, P_NAK, "A ERROR data descriptor %d broken: %s\n",
1191 dh->fd_read, strerror(errno));
1195 * Process has closed the pipe. Just remove this event handler.
1196 * If all pipes are closed, shut down this service.
1199 event_release(dh->ev_read);
1201 if(dh->ev_write == NULL) {
1202 security_stream_close(dh->netfd);
1205 for (dh = &as->data[0]; dh < &as->data[DATA_FD_COUNT]; dh++) {
1206 if (dh->netfd != NULL)
1212 if (security_stream_write(dh->netfd, as->databuf, (size_t)n) < 0) {
1213 /* stream has croaked */
1214 pkt_init(&nak, P_NAK, "ERROR write error on stream %d: %s\n",
1215 security_stream_id(dh->netfd),
1216 security_stream_geterror(dh->netfd));
1222 do_sendpkt(as->security_handle, &nak);
1228 * This is a generic relay function that just read data from one of
1229 * the security_stream_t and passes it up the equivalent process's pipes
1237 struct datafd_handle *dh;
1239 assert(cookie != NULL);
1242 if (dh->fd_write <= 0) {
1243 dbprintf(("%s: process_writenetfd: dh->fd_write <= 0\n",
1244 debug_prefix_time(NULL)));
1245 } else if (size > 0) {
1246 fullwrite(dh->fd_write, buf, (size_t)size);
1247 security_stream_read(dh->netfd, process_writenetfd, dh);
1250 aclose(dh->fd_write);
1256 * Convert a local stream handle (DATA_FD...) into something that
1257 * can be sent to the amanda server.
1259 * Returns a number that should be sent to the server in the REP packet.
1263 struct active_service * as,
1266 struct datafd_handle *dh;
1268 /* if the handle is -1, then we don't bother */
1272 /* make sure the handle's kosher */
1273 if (handle < DATA_FD_OFFSET || handle >= DATA_FD_OFFSET + DATA_FD_COUNT)
1276 /* get a pointer into our handle array */
1277 dh = &as->data[handle - DATA_FD_OFFSET];
1279 /* make sure we're not already using the net handle */
1280 if (dh->netfd != NULL)
1283 /* allocate a stream from the security layer and return */
1284 dh->netfd = security_stream_server(as->security_handle);
1285 if (dh->netfd == NULL) {
1286 dbprintf(("%s: couldn't open stream to server: %s\n",
1287 debug_prefix_time(NULL), security_geterror(as->security_handle)));
1292 * convert the stream into a numeric id that can be sent to the
1295 return (security_stream_id(dh->netfd));
1299 * Create a new service instance
1301 static struct active_service *
1303 security_handle_t * security_handle,
1305 const char * arguments)
1308 int data_read[DATA_FD_COUNT + 1][2];
1309 int data_write[DATA_FD_COUNT + 1][2];
1310 struct active_service *as;
1314 assert(security_handle != NULL);
1315 assert(cmd != NULL);
1316 assert(arguments != NULL);
1318 /* a plethora of pipes */
1319 for (i = 0; i < DATA_FD_COUNT + 1; i++) {
1320 if (pipe(data_read[i]) < 0) {
1321 error("pipe: %s\n", strerror(errno));
1324 if (pipe(data_write[i]) < 0) {
1325 error("pipe: %s\n", strerror(errno));
1330 switch(pid = fork()) {
1332 error("could not fork service %s: %s\n", cmd, strerror(errno));
1336 * The parent. Close the far ends of our pipes and return.
1338 as = alloc(SIZEOF(*as));
1339 as->cmd = stralloc(cmd);
1340 as->arguments = stralloc(arguments);
1341 as->security_handle = security_handle;
1344 as->send_partial_reply = 0;
1345 if(strcmp(cmd+(strlen(cmd)-8), "sendsize") == 0) {
1346 g_option_t *g_options;
1347 char *option_str, *p;
1349 option_str = stralloc(as->arguments+8);
1350 p = strchr(option_str,'\n');
1353 g_options = parse_g_options(option_str, 1);
1354 if(am_has_feature(g_options->features, fe_partial_estimate)) {
1355 as->send_partial_reply = 1;
1357 free_g_options(g_options);
1361 /* write to the request pipe */
1362 aclose(data_read[0][0]);
1363 as->reqfd = data_read[0][1];
1366 * read from the reply pipe
1368 as->repfd = data_write[0][0];
1369 aclose(data_write[0][1]);
1370 as->ev_repfd = NULL;
1375 as->rep_pkt.body = NULL;
1378 * read from the rest of the general-use pipes
1379 * (netfds are opened as the client requests them)
1381 for (i = 0; i < DATA_FD_COUNT; i++) {
1382 aclose(data_read[i + 1][1]);
1383 aclose(data_write[i + 1][0]);
1384 as->data[i].fd_read = data_read[i + 1][0];
1385 as->data[i].fd_write = data_write[i + 1][1];
1386 as->data[i].ev_read = NULL;
1387 as->data[i].ev_write = NULL;
1388 as->data[i].netfd = NULL;
1389 as->data[i].as = as;
1392 /* add it to the service queue */
1393 /* increment the active service count */
1394 TAILQ_INSERT_TAIL(&serviceq.tailq, as, tq);
1400 * The child. Put our pipes in their advertised locations
1409 * The data stream is stdin in the new process
1411 if (dup2(data_read[0][0], 0) < 0) {
1412 error("dup %d to %d failed: %s\n", data_read[0][0], 0,
1416 aclose(data_read[0][0]);
1417 aclose(data_read[0][1]);
1420 * The reply stream is stdout
1422 if (dup2(data_write[0][1], 1) < 0) {
1423 error("dup %d to %d failed: %s\n", data_write[0][1], 1,
1426 aclose(data_write[0][0]);
1427 aclose(data_write[0][1]);
1429 for (i = 0; i < DATA_FD_COUNT; i++) {
1430 aclose(data_read[i + 1][0]);
1431 aclose(data_write[i + 1][1]);
1435 * Make sure they are not open in the range DATA_FD_OFFSET to
1436 * DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1
1438 for (i = 0; i < DATA_FD_COUNT; i++) {
1439 while(data_read[i + 1][1] >= DATA_FD_OFFSET &&
1440 data_read[i + 1][1] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1441 newfd = dup(data_read[i + 1][1]);
1443 error("Can't dup out off DATA_FD range");
1444 data_read[i + 1][1] = newfd;
1446 while(data_write[i + 1][0] >= DATA_FD_OFFSET &&
1447 data_write[i + 1][0] <= DATA_FD_OFFSET + DATA_FD_COUNT*2 - 1) {
1448 newfd = dup(data_write[i + 1][0]);
1450 error("Can't dup out off DATA_FD range");
1451 data_write[i + 1][0] = newfd;
1454 for (i = 0; i < DATA_FD_COUNT*2; i++)
1455 close(DATA_FD_OFFSET + i);
1458 * The rest start at the offset defined in amandad.h, and continue
1459 * through the internal defined.
1461 for (i = 0; i < DATA_FD_COUNT; i++) {
1462 if (dup2(data_read[i + 1][1], i*2 + DATA_FD_OFFSET) < 0) {
1463 error("dup %d to %d failed: %s\n", data_read[i + 1][1],
1464 i + DATA_FD_OFFSET, strerror(errno));
1466 aclose(data_read[i + 1][1]);
1468 if (dup2(data_write[i + 1][0], i*2 + 1 + DATA_FD_OFFSET) < 0) {
1469 error("dup %d to %d failed: %s\n", data_write[i + 1][0],
1470 i + DATA_FD_OFFSET, strerror(errno));
1472 aclose(data_write[i + 1][0]);
1475 /* close all unneeded fd */
1476 safe_fd(DATA_FD_OFFSET, DATA_FD_COUNT*2);
1479 execle(cmd, cmd, "amandad", auth, (char *)NULL, safe_env());
1480 error("could not exec service %s: %s\n", cmd, strerror(errno));
1487 * Unallocate a service instance
1491 struct active_service * as)
1494 struct datafd_handle *dh;
1496 #ifdef AMANDAD_DEBUG
1497 dbprintf(("%s: closing service: %s\n",
1498 debug_prefix_time(NULL), (as->cmd)?as->cmd:"??UNKONWN??"));
1503 assert(as->cmd != NULL);
1506 assert(as->arguments != NULL);
1507 amfree(as->arguments);
1509 if (as->reqfd != -1)
1511 if (as->repfd != -1)
1514 if (as->ev_repfd != NULL)
1515 event_release(as->ev_repfd);
1516 if (as->ev_reptimeout != NULL)
1517 event_release(as->ev_reptimeout);
1519 for (i = 0; i < DATA_FD_COUNT; i++) {
1522 aclose(dh->fd_read);
1523 aclose(dh->fd_write);
1525 if (dh->netfd != NULL)
1526 security_stream_close(dh->netfd);
1528 if (dh->ev_read != NULL)
1529 event_release(dh->ev_read);
1530 if (dh->ev_write != NULL)
1531 event_release(dh->ev_write);
1534 if (as->security_handle != NULL)
1535 security_close(as->security_handle);
1537 assert(as->pid > 0);
1538 kill(as->pid, SIGTERM);
1539 waitpid(as->pid, NULL, WNOHANG);
1541 TAILQ_REMOVE(&serviceq.tailq, as, tq);
1542 assert(serviceq.qlength > 0);
1546 amfree(as->arguments);
1548 amfree(as->rep_pkt.body);
1551 if(exit_on_qlength == 0 && serviceq.qlength == 0) {
1558 * Like 'fullwrite', but does the work in a child process so pipelines
1563 struct active_service * as,
1570 switch (pid=fork()) {
1575 waitpid(pid, NULL, WNOHANG);
1576 return 0; /* this is the parent */
1578 case 0: /* this is the child */
1580 writesize = fullwrite(as->reqfd, bufp, size);
1581 exit(writesize != (ssize_t)size);
1589 security_handle_t * handle,
1592 dbprintf(("%s: sending %s pkt:\n<<<<<\n%s>>>>>\n",
1593 debug_prefix_time(NULL), pkt_type2str(pkt->type), pkt->body));
1595 return security_sendpkt(handle, pkt);
1600 #ifdef AMANDAD_DEBUG
1602 * Convert a state into a string
1608 static const struct {
1612 #define X(state) { state, stringize(state) }
1622 for (i = 0; i < (int)(sizeof(states) / sizeof(states[0])); i++)
1623 if (state == states[i].state)
1624 return (states[i].str);
1625 return ("INVALID STATE");
1629 * Convert an action into a string
1635 static const struct {
1639 #define X(action) { action, stringize(action) }
1652 for (i = 0; i < (int)(sizeof(actions) / sizeof(actions[0])); i++)
1653 if (action == actions[i].action)
1654 return (actions[i].str);
1655 return ("UNKNOWN ACTION");
1657 #endif /* AMANDAD_DEBUG */