c24e3290489a4adf35b58150db3e5d6e0548d8da
[debian/amanda] / common-src / rsh-security.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1999 University of Maryland
4  * All Rights Reserved.
5  *
6  * Permission to use, copy, modify, distribute, and sell this software and its
7  * documentation for any purpose is hereby granted without fee, provided that
8  * the above copyright notice appear in all copies and that both that
9  * copyright notice and this permission notice appear in supporting
10  * documentation, and that the name of U.M. not be used in advertising or
11  * publicity pertaining to distribution of the software without specific,
12  * written prior permission.  U.M. makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as is"
14  * without express or implied warranty.
15  *
16  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22  *
23  * Authors: the Amanda Development Team.  Its members are listed in a
24  * file named AUTHORS, in the root directory of this distribution.
25  */
26
27 /*
28  * $Id: rsh-security.c,v 1.18.2.1 2006/04/11 11:11:16 martinea Exp $
29  *
30  * rsh-security.c - security and transport over rsh or a rsh-like command.
31  *
32  * XXX still need to check for initial keyword on connect so we can skip
33  * over shell garbage and other stuff that rsh might want to spew out.
34  */
35
36 #include "amanda.h"
37 #include "event.h"
38 #include "packet.h"
39 #include "queue.h"
40 #include "security.h"
41 #include "stream.h"
42 #include "version.h"
43
44 #ifdef RSH_SECURITY
45
46 /*#define       RSH_DEBUG*/
47
48 #ifdef RSH_DEBUG
49 #define rshprintf(x)    dbprintf(x)
50 #else
51 #define rshprintf(x)
52 #endif
53
54 /*
55  * Path to the rsh binary.  This should be configurable.
56  */
57 #define RSH_PATH        "/usr/bin/rsh"
58
59 /*
60  * Arguments to rsh.  This should also be configurable
61  */
62 #define RSH_ARGS        "-l", CLIENT_LOGIN
63
64 /*
65  * Number of seconds rsh has to start up
66  */
67 #define CONNECT_TIMEOUT 20
68
69 /*
70  * Magic values for rsh_conn->handle
71  */
72 #define H_TAKEN -1              /* rsh_conn->tok was already read */
73 #define H_EOF   -2              /* this connection has been shut down */
74
75 /*
76  * This is a rsh connection to a host.  We should only have
77  * one connection per host.
78  */
79 struct rsh_conn {
80     int read, write;                            /* pipes to rsh */
81     pid_t pid;                                  /* pid of rsh process */
82     char pkt[NETWORK_BLOCK_BYTES];              /* last pkt read */
83     unsigned long pktlen;                       /* len of above */
84     struct {                                    /* buffer read() calls */
85         char buf[STREAM_BUFSIZE];               /* buffer */
86         size_t left;                    /* unread data */
87         ssize_t size;                   /* size of last read */
88     } readbuf;
89     event_handle_t *ev_read;                    /* read (EV_READFD) handle */
90     int ev_read_refcnt;                         /* number of readers */
91     char hostname[MAX_HOSTNAME_LENGTH+1];       /* host we're talking to */
92     char *errmsg;                               /* error passed up */
93     int refcnt;                                 /* number of handles using */
94     int handle;                                 /* last proto handle read */
95     TAILQ_ENTRY(rsh_conn) tq;                   /* queue handle */
96 };
97
98
99 struct rsh_stream;
100
101 /*
102  * This is the private handle data.
103  */
104 struct rsh_handle {
105     security_handle_t sech;             /* MUST be first */
106     char *hostname;                     /* ptr to rc->hostname */
107     struct rsh_stream *rs;              /* virtual stream we xmit over */
108
109     union {
110         void (*recvpkt) P((void *, pkt_t *, security_status_t));
111                                         /* func to call when packet recvd */
112         void (*connect) P((void *, security_handle_t *, security_status_t));
113                                         /* func to call when connected */
114     } fn;
115     void *arg;                          /* argument to pass function */
116     event_handle_t *ev_timeout;         /* timeout handle for recv */
117 };
118
119 /*
120  * This is the internal security_stream data for rsh.
121  */
122 struct rsh_stream {
123     security_stream_t secstr;           /* MUST be first */
124     struct rsh_conn *rc;                /* physical connection */
125     int handle;                         /* protocol handle */
126     event_handle_t *ev_read;            /* read (EV_WAIT) event handle */
127     void (*fn) P((void *, void *, ssize_t));    /* read event fn */
128     void *arg;                          /* arg for previous */
129 };
130
131 /*
132  * Interface functions
133  */
134 static int rsh_sendpkt P((void *, pkt_t *));
135 static int rsh_stream_accept P((void *));
136 static int rsh_stream_auth P((void *));
137 static int rsh_stream_id P((void *));
138 static int rsh_stream_write P((void *, const void *, size_t));
139 static void *rsh_stream_client P((void *, int));
140 static void *rsh_stream_server P((void *));
141 static void rsh_accept P((int, int,
142     void (*)(security_handle_t *, pkt_t *)));
143 static void rsh_close P((void *));
144 static void rsh_connect P((const char *,
145     char *(*)(char *, void *), 
146     void (*)(void *, security_handle_t *, security_status_t), void *));
147 static void rsh_recvpkt P((void *,
148     void (*)(void *, pkt_t *, security_status_t), void *, int));
149 static void rsh_recvpkt_cancel P((void *));
150 static void rsh_stream_close P((void *));
151 static void rsh_stream_read P((void *, void (*)(void *, void *, ssize_t),
152     void *));
153 static void rsh_stream_read_cancel P((void *));
154
155 /*
156  * This is our interface to the outside world.
157  */
158 const security_driver_t rsh_security_driver = {
159     "RSH",
160     rsh_connect,
161     rsh_accept,
162     rsh_close,
163     rsh_sendpkt,
164     rsh_recvpkt,
165     rsh_recvpkt_cancel,
166     rsh_stream_server,
167     rsh_stream_accept,
168     rsh_stream_client,
169     rsh_stream_close,
170     rsh_stream_auth,
171     rsh_stream_id,
172     rsh_stream_write,
173     rsh_stream_read,
174     rsh_stream_read_cancel,
175 };
176
177 /*
178  * This is a queue of open connections
179  */
180 static struct {
181     TAILQ_HEAD(, rsh_conn) tailq;
182     int qlength;
183 } connq = {
184     TAILQ_HEAD_INITIALIZER(connq.tailq), 0
185 };
186 #define connq_first()           TAILQ_FIRST(&connq.tailq)
187 #define connq_next(rc)          TAILQ_NEXT(rc, tq)
188 #define connq_append(rc)        do {                                    \
189     TAILQ_INSERT_TAIL(&connq.tailq, rc, tq);                            \
190     connq.qlength++;                                                    \
191 } while (0)
192 #define connq_remove(rc)        do {                                    \
193     assert(connq.qlength > 0);                                          \
194     TAILQ_REMOVE(&connq.tailq, rc, tq);                                 \
195     connq.qlength--;                                                    \
196 } while (0)
197
198 static int newhandle = 1;
199
200 /*
201  * This is a function that should be called if a new security_handle_t is
202  * created.  If NULL, no new handles are created.
203  * It is passed the new handle and the received pkt
204  */
205 static void (*accept_fn) P((security_handle_t *, pkt_t *));
206
207 /*
208  * Local functions
209  */
210 static void connect_callback P((void *));
211 static void connect_timeout P((void *));
212 static int send_token P((struct rsh_conn *, int, const void *, size_t));
213 static int recv_token P((struct rsh_conn *, int));
214 static void recvpkt_callback P((void *, void *, ssize_t));
215 static void recvpkt_timeout P((void *));
216 static void stream_read_callback P((void *));
217
218 static int runrsh P((struct rsh_conn *));
219 static struct rsh_conn *conn_get P((const char *));
220 static void conn_put P((struct rsh_conn *));
221 static void conn_read P((struct rsh_conn *));
222 static void conn_read_cancel P((struct rsh_conn *));
223 static void conn_read_callback P((void *));
224 static int net_writev P((int, struct iovec *, int));
225 static ssize_t net_read P((struct rsh_conn *, void *, size_t, int));
226 static int net_read_fillbuf P((struct rsh_conn *, int, int));
227 static void parse_pkt P((pkt_t *, const void *, size_t));
228
229
230 /*
231  * rsh version of a security handle allocator.  Logically sets
232  * up a network "connection".
233  */
234 static void
235 rsh_connect(hostname, conf_fn, fn, arg)
236     const char *hostname;
237     char *(*conf_fn) P((char *, void *));
238     void (*fn) P((void *, security_handle_t *, security_status_t));
239     void *arg;
240 {
241     struct rsh_handle *rh;
242     struct hostent *he;
243
244     assert(fn != NULL);
245     assert(hostname != NULL);
246
247     rshprintf(("rsh_connect: %s\n", hostname));
248
249     rh = alloc(sizeof(*rh));
250     security_handleinit(&rh->sech, &rsh_security_driver);
251     rh->hostname = NULL;
252     rh->rs = NULL;
253     rh->ev_timeout = NULL;
254
255     if ((he = gethostbyname(hostname)) == NULL) {
256         security_seterror(&rh->sech,
257             "%s: could not resolve hostname", hostname);
258         (*fn)(arg, &rh->sech, S_ERROR);
259         return;
260     }
261     rh->hostname = he->h_name;  /* will be replaced */
262     rh->rs = rsh_stream_client(rh, newhandle++);
263
264     if (rh->rs == NULL)
265         goto error;
266
267     rh->hostname = rh->rs->rc->hostname;
268
269     if (rh->rs->rc->pid < 0) {
270         /*
271          * We need to open a new connection.
272          *
273          * XXX need to eventually limit number of outgoing connections here.
274          */
275         if (runrsh(rh->rs->rc) < 0) {
276             security_seterror(&rh->sech,
277                 "can't connect to %s: %s", hostname, rh->rs->rc->errmsg);
278             goto error;
279         }
280     }
281     /*
282      * The socket will be opened async so hosts that are down won't
283      * block everything.  We need to register a write event
284      * so we will know when the socket comes alive.
285      *
286      * Overload rh->rs->ev_read to provide a write event handle.
287      * We also register a timeout.
288      */
289     rh->fn.connect = fn;
290     rh->arg = arg;
291     rh->rs->ev_read = event_register(rh->rs->rc->write, EV_WRITEFD,
292         connect_callback, rh);
293     rh->ev_timeout = event_register(CONNECT_TIMEOUT, EV_TIME,
294         connect_timeout, rh);
295
296     return;
297
298 error:
299     (*fn)(arg, &rh->sech, S_ERROR);
300 }
301
302 /*
303  * Called when a rsh connection is finished connecting and is ready
304  * to be authenticated.
305  */
306 static void
307 connect_callback(cookie)
308     void *cookie;
309 {
310     struct rsh_handle *rh = cookie;
311
312     event_release(rh->rs->ev_read);
313     rh->rs->ev_read = NULL;
314     event_release(rh->ev_timeout);
315     rh->ev_timeout = NULL;
316
317     (*rh->fn.connect)(rh->arg, &rh->sech, S_OK);
318 }
319
320 /*
321  * Called if a connection times out before completion.
322  */
323 static void
324 connect_timeout(cookie)
325     void *cookie;
326 {
327     struct rsh_handle *rh = cookie;
328
329     event_release(rh->rs->ev_read);
330     rh->rs->ev_read = NULL;
331     event_release(rh->ev_timeout);
332     rh->ev_timeout = NULL;
333
334     (*rh->fn.connect)(rh->arg, &rh->sech, S_TIMEOUT);
335 }
336
337 /*
338  * Setup to handle new incoming connections
339  */
340 static void
341 rsh_accept(in, out, fn)
342     int in, out;
343     void (*fn) P((security_handle_t *, pkt_t *));
344 {
345     struct rsh_conn *rc;
346
347     rc = conn_get("unknown");
348     rc->read = in;
349     rc->write = out;
350     accept_fn = fn;
351     conn_read(rc);
352 }
353
354 /*
355  * Locate an existing connection to the given host, or create a new,
356  * unconnected entry if none exists.  The caller is expected to check
357  * for the lack of a connection (rc->read == -1) and set one up.
358  */
359 static struct rsh_conn *
360 conn_get(hostname)
361     const char *hostname;
362 {
363     struct rsh_conn *rc;
364
365     rshprintf(("rsh: conn_get: %s\n", hostname));
366
367     for (rc = connq_first(); rc != NULL; rc = connq_next(rc)) {
368         if (strcasecmp(hostname, rc->hostname) == 0)
369             break;
370     }
371
372     if (rc != NULL) {
373         rc->refcnt++;
374         rshprintf(("rsh: conn_get: exists, refcnt to %s is now %d\n",
375             rc->hostname, rc->refcnt));
376         return (rc);
377     }
378
379     rshprintf(("rsh: conn_get: creating new handle\n"));
380     /*
381      * We can't be creating a new handle if we are the client
382      */
383     assert(accept_fn == NULL);
384     rc = alloc(sizeof(*rc));
385     rc->read = rc->write = -1;
386     rc->pid = -1;
387     rc->readbuf.left = 0;
388     rc->readbuf.size = 0;
389     rc->ev_read = NULL;
390     strncpy(rc->hostname, hostname, sizeof(rc->hostname) - 1);
391     rc->hostname[sizeof(rc->hostname) - 1] = '\0';
392     rc->errmsg = NULL;
393     rc->refcnt = 1;
394     rc->handle = -1;
395     connq_append(rc);
396     return (rc);
397 }
398
399 /*
400  * Delete a reference to a connection, and close it if it is the last
401  * reference.
402  */
403 static void
404 conn_put(rc)
405     struct rsh_conn *rc;
406 {
407     amwait_t status;
408
409     assert(rc->refcnt > 0);
410     if (--rc->refcnt > 0) {
411         rshprintf(("rsh: conn_put: decrementing refcnt for %s to %d\n",
412             rc->hostname, rc->refcnt));
413         return;
414     }
415     rshprintf(("rsh: conn_put: closing connection to %s\n", rc->hostname));
416     if (rc->read != -1)
417         aclose(rc->read);
418     if (rc->write != -1)
419         aclose(rc->write);
420     if (rc->pid != -1) {
421         waitpid(rc->pid, &status, WNOHANG);
422     }
423     if (rc->ev_read != NULL)
424         event_release(rc->ev_read);
425     if (rc->errmsg != NULL)
426         amfree(rc->errmsg);
427     connq_remove(rc);
428     amfree(rc);
429 }
430
431 /*
432  * Turn on read events for a conn.  Or, increase a refcnt if we are
433  * already receiving read events.
434  */
435 static void
436 conn_read(rc)
437     struct rsh_conn *rc;
438 {
439
440     if (rc->ev_read != NULL) {
441         rc->ev_read_refcnt++;
442         rshprintf(("rsh: conn_read: incremented refcnt to %d for %s\n",
443             rc->ev_read_refcnt, rc->hostname));
444         return;
445     }
446     rshprintf(("rsh: conn_read registering event handler for %s\n",
447         rc->hostname));
448     rc->ev_read = event_register(rc->read, EV_READFD, conn_read_callback, rc);
449     rc->ev_read_refcnt = 1;
450 }
451
452 static void
453 conn_read_cancel(rc)
454     struct rsh_conn *rc;
455 {
456
457     if (--rc->ev_read_refcnt > 0) {
458         rshprintf(("rsh: conn_read_cancel: decremented refcnt to %d for %s\n",
459             rc->ev_read_refcnt, rc->hostname));
460         return;
461     }
462     rshprintf(("rsh: conn_read_cancel: releasing event handler for %s\n",
463         rc->hostname));
464     event_release(rc->ev_read);
465     rc->ev_read = NULL;
466 }
467
468 /*
469  * frees a handle allocated by the above
470  */
471 static void
472 rsh_close(inst)
473     void *inst;
474 {
475     struct rsh_handle *rh = inst;
476
477     assert(rh != NULL);
478
479     rshprintf(("rsh: closing handle to %s\n", rh->hostname));
480
481     if (rh->rs != NULL) {
482         /* This may be null if we get here on an error */
483         rsh_recvpkt_cancel(rh);
484         security_stream_close(&rh->rs->secstr);
485     }
486     /* keep us from getting here again */
487     rh->sech.driver = NULL;
488     amfree(rh);
489 }
490
491 /*
492  * Forks a rsh to the host listed in rc->hostname
493  * Returns negative on error, with an errmsg in rc->errmsg.
494  */
495 static int
496 runrsh(rc)
497     struct rsh_conn *rc;
498 {
499     int rpipe[2], wpipe[2];
500     char *amandad_path;
501
502     if (pipe(rpipe) < 0 || pipe(wpipe) < 0) {
503         rc->errmsg = newvstralloc("pipe: ", strerror(errno), NULL);
504         return (-1);
505     }
506     switch (rc->pid = fork()) {
507     case -1:
508         rc->errmsg = newvstralloc("fork: ", strerror(errno), NULL);
509         aclose(rpipe[0]);
510         aclose(rpipe[1]);
511         aclose(wpipe[0]);
512         aclose(wpipe[1]);
513         return (-1);
514     case 0:
515         dup2(wpipe[0], 0);
516         dup2(rpipe[1], 1);
517         dup2(rpipe[1], 2);
518         break;
519     default:
520         rc->read = rpipe[0];
521         aclose(rpipe[1]);
522         rc->write = wpipe[1];
523         aclose(wpipe[0]);
524         return (0);
525     }
526
527     safe_fd(-1, 0);
528
529     amandad_path = vstralloc(libexecdir, "/", "amandad", versionsuffix(),
530         NULL);
531     execlp(RSH_PATH, RSH_PATH, RSH_ARGS, rc->hostname, amandad_path,
532         "-auth=rsh", NULL);
533     error("error: couldn't exec %s: %s", RSH_PATH, strerror(errno));
534
535     /* should nerver go here, shut up compiler warning */
536     return(-1);
537 }
538
539 /*
540  * Transmit a packet.
541  */
542 static int
543 rsh_sendpkt(cookie, pkt)
544     void *cookie;
545     pkt_t *pkt;
546 {
547     char buf[sizeof(pkt_t)];
548     struct rsh_handle *rh = cookie;
549     size_t len;
550
551     assert(rh != NULL);
552     assert(pkt != NULL);
553
554     rshprintf(("rsh: sendpkt: enter\n"));
555
556     len = strlen(pkt->body) + 2;
557     buf[0] = (char)pkt->type;
558     strcpy(&buf[1], pkt->body);
559
560     rshprintf(("rsh: sendpkt: %s (%d) pkt_t (len %d) contains:\n\n\"%s\"\n\n",
561         pkt_type2str(pkt->type), pkt->type, strlen(pkt->body), pkt->body));
562
563     if (rsh_stream_write(rh->rs, buf, len) < 0) {
564         security_seterror(&rh->sech, security_stream_geterror(&rh->rs->secstr));
565         return (-1);
566     }
567     return (0);
568 }
569
570 /*
571  * Set up to receive a packet asyncronously, and call back when
572  * it has been read.
573  */
574 static void
575 rsh_recvpkt(cookie, fn, arg, timeout)
576     void *cookie, *arg;
577     void (*fn) P((void *, pkt_t *, security_status_t));
578     int timeout;
579 {
580     struct rsh_handle *rh = cookie;
581
582     assert(rh != NULL);
583
584     rshprintf(("rsh: recvpkt registered for %s\n", rh->hostname));
585
586     /*
587      * Reset any pending timeout on this handle
588      */
589     if (rh->ev_timeout != NULL)
590         event_release(rh->ev_timeout);
591
592     /*
593      * Negative timeouts mean no timeout
594      */
595     if (timeout < 0)
596         rh->ev_timeout = NULL;
597     else
598         rh->ev_timeout = event_register(timeout, EV_TIME, recvpkt_timeout, rh);
599
600     rh->fn.recvpkt = fn;
601     rh->arg = arg;
602     rsh_stream_read(rh->rs, recvpkt_callback, rh);
603 }
604
605 /*
606  * Remove a async receive request from the queue
607  */
608 static void
609 rsh_recvpkt_cancel(cookie)
610     void *cookie;
611 {
612     struct rsh_handle *rh = cookie;
613
614     rshprintf(("rsh: cancelling recvpkt for %s\n", rh->hostname));
615
616     assert(rh != NULL);
617
618     rsh_stream_read_cancel(rh->rs);
619     if (rh->ev_timeout != NULL) {
620         event_release(rh->ev_timeout);
621         rh->ev_timeout = NULL;
622     }
623 }
624
625 /*
626  * This is called when a handle is woken up because data read off of the
627  * net is for it.
628  */
629 static void
630 recvpkt_callback(cookie, buf, bufsize)
631     void *cookie, *buf;
632     ssize_t bufsize;
633 {
634     pkt_t pkt;
635     struct rsh_handle *rh = cookie;
636
637     assert(rh != NULL);
638
639     /*
640      * We need to cancel the recvpkt request before calling
641      * the callback because the callback may reschedule us.
642      */
643     rsh_recvpkt_cancel(rh);
644
645     switch (bufsize) {
646     case 0:
647         security_seterror(&rh->sech,
648             "EOF on read from %s", rh->hostname);
649         (*rh->fn.recvpkt)(rh->arg, NULL, S_ERROR);
650         return;
651     case -1:
652         security_seterror(&rh->sech, security_stream_geterror(&rh->rs->secstr));
653         (*rh->fn.recvpkt)(rh->arg, NULL, S_ERROR);
654         return;
655     default:
656         break;
657     }
658
659     parse_pkt(&pkt, buf, bufsize);
660     rshprintf(("rsh: received %s packet (%d) from %s, contains:\n\n\"%s\"\n\n",
661         pkt_type2str(pkt.type), pkt.type, rh->hostname, pkt.body));
662     (*rh->fn.recvpkt)(rh->arg, &pkt, S_OK);
663 }
664
665 /*
666  * This is called when a handle times out before receiving a packet.
667  */
668 static void
669 recvpkt_timeout(cookie)
670     void *cookie;
671 {
672     struct rsh_handle *rh = cookie;
673
674     assert(rh != NULL);
675
676     rshprintf(("rsh: recvpkt timeout for %s\n", rh->hostname));
677
678     rsh_recvpkt_cancel(rh);
679     (*rh->fn.recvpkt)(rh->arg, NULL, S_TIMEOUT);
680 }
681
682 /*
683  * Create the server end of a stream.  For rsh, this means setup a stream
684  * object and allocate a new handle for it.
685  */
686 static void *
687 rsh_stream_server(h)
688     void *h;
689 {
690     struct rsh_handle *rh = h;
691     struct rsh_stream *rs;
692
693     assert(rh != NULL);
694
695     rs = alloc(sizeof(*rs));
696     security_streaminit(&rs->secstr, &rsh_security_driver);
697     rs->rc = conn_get(rh->hostname);
698     /*
699      * Stream should already be setup!
700      */
701     if (rs->rc->read < 0) {
702         conn_put(rs->rc);
703         amfree(rs);
704         security_seterror(&rh->sech, "lost connection to %s", rh->hostname);
705         return (NULL);
706     }
707     rh->hostname = rs->rc->hostname;
708     /*
709      * so as not to conflict with the amanda server's handle numbers,
710      * we start at 5000 and work down
711      */
712     rs->handle = 5000 - newhandle++;
713     rs->ev_read = NULL;
714     rshprintf(("rsh: stream_server: created stream %d\n", rs->handle));
715     return (rs);
716 }
717
718 /*
719  * Accept an incoming connection on a stream_server socket
720  * Nothing needed for rsh.
721  */
722 static int
723 rsh_stream_accept(s)
724     void *s;
725 {
726
727     return (0);
728 }
729
730 /*
731  * Return a connected stream.  For rsh, this means setup a stream
732  * with the supplied handle.
733  */
734 static void *
735 rsh_stream_client(h, id)
736     void *h;
737     int id;
738 {
739     struct rsh_handle *rh = h;
740     struct rsh_stream *rs;
741
742     assert(rh != NULL);
743
744     if (id <= 0) {
745         security_seterror(&rh->sech,
746             "%d: invalid security stream id", id);
747         return (NULL);
748     }
749
750     rs = alloc(sizeof(*rs));
751     security_streaminit(&rs->secstr, &rsh_security_driver);
752     rs->handle = id;
753     rs->ev_read = NULL;
754     rs->rc = conn_get(rh->hostname);
755
756     rshprintf(("rsh: stream_client: connected to stream %d\n", id));
757
758     return (rs);
759 }
760
761 /*
762  * Close and unallocate resources for a stream.
763  */
764 static void
765 rsh_stream_close(s)
766     void *s;
767 {
768     struct rsh_stream *rs = s;
769
770     assert(rs != NULL);
771
772     rshprintf(("rsh: stream_close: closing stream %d\n", rs->handle));
773
774     rsh_stream_read_cancel(rs);
775     conn_put(rs->rc);
776     amfree(rs);
777 }
778
779 /*
780  * Authenticate a stream
781  * Nothing needed for rsh.  The connection is authenticated by rshd
782  * on startup.
783  */
784 static int
785 rsh_stream_auth(s)
786     void *s;
787 {
788
789     return (0);
790 }
791
792 /*
793  * Returns the stream id for this stream.  This is just the local
794  * port.
795  */
796 static int
797 rsh_stream_id(s)
798     void *s;
799 {
800     struct rsh_stream *rs = s;
801
802     assert(rs != NULL);
803
804     return (rs->handle);
805 }
806
807 /*
808  * Write a chunk of data to a stream.  Blocks until completion.
809  */
810 static int
811 rsh_stream_write(s, buf, size)
812     void *s;
813     const void *buf;
814     size_t size;
815 {
816     struct rsh_stream *rs = s;
817
818     assert(rs != NULL);
819
820     rshprintf(("rsh: stream_write: writing %d bytes to %s:%d\n", size,
821         rs->rc->hostname, rs->handle));
822
823     if (send_token(rs->rc, rs->handle, buf, size) < 0) {
824         security_stream_seterror(&rs->secstr, rs->rc->errmsg);
825         return (-1);
826     }
827     return (0);
828 }
829
830 /*
831  * Submit a request to read some data.  Calls back with the given
832  * function and arg when completed.
833  */
834 static void
835 rsh_stream_read(s, fn, arg)
836     void *s, *arg;
837     void (*fn) P((void *, void *, ssize_t));
838 {
839     struct rsh_stream *rs = s;
840
841     assert(rs != NULL);
842
843     /*
844      * Only one read request can be active per stream.
845      */
846     if (rs->ev_read == NULL) {
847         rs->ev_read = event_register((event_id_t)rs->rc, EV_WAIT,
848             stream_read_callback, rs);
849         conn_read(rs->rc);
850     }
851     rs->fn = fn;
852     rs->arg = arg;
853 }
854
855 /*
856  * Cancel a previous stream read request.  It's ok if we didn't have a read
857  * scheduled.
858  */
859 static void
860 rsh_stream_read_cancel(s)
861     void *s;
862 {
863     struct rsh_stream *rs = s;
864
865     assert(rs != NULL);
866
867     if (rs->ev_read != NULL) {
868         event_release(rs->ev_read);
869         rs->ev_read = NULL;
870         conn_read_cancel(rs->rc);
871     }
872 }
873
874 /*
875  * Callback for rsh_stream_read
876  */
877 static void
878 stream_read_callback(arg)
879     void *arg;
880 {
881     struct rsh_stream *rs = arg;
882     assert(rs != NULL);
883
884     rshprintf(("rsh: stream_read_callback: handle %d\n", rs->handle));
885
886     /*
887      * Make sure this was for us.  If it was, then blow away the handle
888      * so it doesn't get claimed twice.  Otherwise, leave it alone.
889      *
890      * If the handle is EOF, pass that up to our callback.
891      */
892     if (rs->rc->handle == rs->handle) {
893         rshprintf(("rsh: stream_read_callback: it was for us\n"));
894         rs->rc->handle = H_TAKEN;
895     } else if (rs->rc->handle != H_EOF) {
896         rshprintf(("rsh: stream_read_callback: not for us\n"));
897         return;
898     }
899
900     /*
901      * Remove the event first, and then call the callback.
902      * We remove it first because we don't want to get in their
903      * way if they reschedule it.
904      */
905     rsh_stream_read_cancel(rs);
906
907     if (rs->rc->pktlen == 0) {
908         rshprintf(("rsh: stream_read_callback: EOF\n"));
909         (*rs->fn)(rs->arg, NULL, 0);
910         return;
911     }
912     rshprintf(("rsh: stream_read_callback: read %ld bytes from %s:%d\n",
913         rs->rc->pktlen, rs->rc->hostname, rs->handle));
914     (*rs->fn)(rs->arg, rs->rc->pkt, rs->rc->pktlen);
915 }
916
917 /*
918  * The callback for the netfd for the event handler
919  * Determines if this packet is for this security handle,
920  * and does the real callback if so.
921  */
922 static void
923 conn_read_callback(cookie)
924     void *cookie;
925 {
926     struct rsh_conn *rc = cookie;
927     struct rsh_handle *rh;
928     pkt_t pkt;
929     int rval;
930
931     assert(cookie != NULL);
932
933     rshprintf(("rsh: conn_read_callback\n"));
934
935     /* Read the data off the wire.  If we get errors, shut down. */
936     rval = recv_token(rc, 60);
937     rshprintf(("rsh: conn_read_callback: recv_token returned %d\n", rval));
938     if (rval <= 0) {
939         rc->pktlen = 0;
940         rc->handle = H_EOF;
941         rval = event_wakeup((event_id_t)rc);
942         rshprintf(("rsh: conn_read_callback: event_wakeup return %d\n", rval));
943         /* delete our 'accept' reference */
944         if (accept_fn != NULL)
945             conn_put(rc);
946         accept_fn = NULL;
947         return;
948     }
949
950     /* If there are events waiting on this handle, we're done */
951     rval = event_wakeup((event_id_t)rc);
952     rshprintf(("rsh: conn_read_callback: event_wakeup return %d\n", rval));
953     if (rval > 0)
954         return;
955
956     /* If there is no accept fn registered, then drop the packet */
957     if (accept_fn == NULL)
958         return;
959
960     rh = alloc(sizeof(*rh));
961     security_handleinit(&rh->sech, &rsh_security_driver);
962     rh->hostname = rc->hostname;
963     rh->rs = rsh_stream_client(rh, rc->handle);
964     rh->ev_timeout = NULL;
965
966     rshprintf(("rsh: new connection\n"));
967     parse_pkt(&pkt, rc->pkt, rc->pktlen);
968     rshprintf(("rsh: calling accept_fn\n"));
969     (*accept_fn)(&rh->sech, &pkt);
970 }
971
972 static void
973 parse_pkt(pkt, buf, bufsize)
974     pkt_t *pkt;
975     const void *buf;
976     size_t bufsize;
977 {
978     const unsigned char *bufp = buf;
979
980     rshprintf(("rsh: parse_pkt: parsing buffer of %d bytes\n", bufsize));
981
982     pkt->type = (pktype_t)*bufp++;
983     bufsize--;
984
985     if (bufsize == 0) {
986         pkt->body[0] = '\0';
987     } else {
988         if (bufsize > sizeof(pkt->body) - 1)
989             bufsize = sizeof(pkt->body) - 1;
990         memcpy(pkt->body, bufp, bufsize);
991         pkt->body[sizeof(pkt->body) - 1] = '\0';
992     }
993
994     rshprintf(("rsh: parse_pkt: %s (%d): \"%s\"\n", pkt_type2str(pkt->type),
995         pkt->type, pkt->body));
996 }
997
998
999 /*
1000  * Transmits a chunk of data over a rsh_handle, adding
1001  * the necessary headers to allow the remote end to decode it.
1002  */
1003 static int
1004 send_token(rc, handle, buf, len)
1005     struct rsh_conn *rc;
1006     int handle;
1007     const void *buf;
1008     size_t len;
1009 {
1010     unsigned int netlength, nethandle;
1011     struct iovec iov[3];
1012
1013     rshprintf(("rsh: send_token: writing %d bytes to %s\n", len,
1014         rc->hostname));
1015
1016     assert(sizeof(netlength) == 4);
1017
1018     /*
1019      * Format is:
1020      *   32 bit length (network byte order)
1021      *   32 bit handle (network byte order)
1022      *   data
1023      */
1024     netlength = htonl(len);
1025     iov[0].iov_base = (void *)&netlength;
1026     iov[0].iov_len = sizeof(netlength);
1027
1028     nethandle = htonl(handle);
1029     iov[1].iov_base = (void *)&nethandle;
1030     iov[1].iov_len = sizeof(nethandle);
1031
1032     iov[2].iov_base = (void *)buf;
1033     iov[2].iov_len = len;
1034
1035     if (net_writev(rc->write, iov, 3) < 0) {
1036         rc->errmsg = newvstralloc(rc->errmsg, "rsh write error to ",
1037             rc->hostname, ": ", strerror(errno), NULL);
1038         return (-1);
1039     }
1040     return (0);
1041 }
1042
1043 static int
1044 recv_token(rc, timeout)
1045     struct rsh_conn *rc;
1046     int timeout;
1047 {
1048     unsigned int netint;
1049
1050     assert(sizeof(netint) == 4);
1051
1052     assert(rc->read >= 0);
1053
1054     rshprintf(("rsh: recv_token: reading from %s\n", rc->hostname));
1055
1056     switch (net_read(rc, &netint, sizeof(netint), timeout)) {
1057     case -1:
1058         rc->errmsg = newvstralloc(rc->errmsg, "recv error: ", strerror(errno),
1059             NULL);
1060         return (-1);
1061     case 0:
1062         rc->pktlen = 0;
1063         return (0);
1064     default:
1065         break;
1066     }
1067     rc->pktlen = ntohl(netint);
1068     if (rc->pktlen > sizeof(rc->pkt)) {
1069         rc->errmsg = newstralloc(rc->errmsg, "recv error: huge packet");
1070         return (-1);
1071     }
1072
1073     switch (net_read(rc, &netint, sizeof(netint), timeout)) {
1074     case -1:
1075         rc->errmsg = newvstralloc(rc->errmsg, "recv error: ", strerror(errno),
1076             NULL);
1077         return (-1);
1078     case 0:
1079         rc->pktlen = 0;
1080         return (0);
1081     default:
1082         break;
1083     }
1084     rc->handle = ntohl(netint);
1085
1086     switch (net_read(rc, rc->pkt, rc->pktlen, timeout)) {
1087     case -1:
1088         rc->errmsg = newvstralloc(rc->errmsg, "recv error: ", strerror(errno),
1089             NULL);
1090         return (-1);
1091     case 0:
1092         rc->pktlen = 0;
1093         break;
1094     default:
1095         break;
1096     }
1097
1098     rshprintf(("rsh: recv_token: read %ld bytes from %s\n", rc->pktlen,
1099         rc->hostname));
1100     return (rc->pktlen);
1101 }
1102
1103 /*
1104  * Writes out the entire iovec
1105  */
1106 static int
1107 net_writev(fd, iov, iovcnt)
1108     int fd, iovcnt;
1109     struct iovec *iov;
1110 {
1111     int delta, n, total;
1112
1113     assert(iov != NULL);
1114
1115     total = 0;
1116     while (iovcnt > 0) {
1117         /*
1118          * Write the iovec
1119          */
1120         total += n = writev(fd, iov, iovcnt);
1121         if (n < 0)
1122             return (-1);
1123         if (n == 0) {
1124             errno = EIO;
1125             return (-1);
1126         }
1127         /*
1128          * Iterate through each iov.  Figure out what we still need
1129          * to write out.
1130          */
1131         for (; n > 0; iovcnt--, iov++) {
1132             /* 'delta' is the bytes written from this iovec */
1133             delta = n < iov->iov_len ? n : iov->iov_len;
1134             /* subtract from the total num bytes written */
1135             n -= delta;
1136             assert(n >= 0);
1137             /* subtract from this iovec */
1138             iov->iov_len -= delta;
1139             iov->iov_base = (char *)iov->iov_base + delta;
1140             /* if this iovec isn't empty, run the writev again */
1141             if (iov->iov_len > 0)
1142                 break;
1143         }
1144     }
1145     return (total);
1146 }
1147
1148 /*
1149  * Like read(), but waits until the entire buffer has been filled.
1150  */
1151 static ssize_t
1152 net_read(rc, vbuf, origsize, timeout)
1153     struct rsh_conn *rc;
1154     void *vbuf;
1155     size_t origsize;
1156     int timeout;
1157 {
1158     char *buf = vbuf, *off;     /* ptr arith */
1159     int nread;
1160     size_t size = origsize;
1161
1162     while (size > 0) {
1163         if (rc->readbuf.left == 0) {
1164             if (net_read_fillbuf(rc, timeout, size) < 0)
1165                 return (-1);
1166             if (rc->readbuf.size == 0)
1167                 return (0);
1168         }
1169         nread = min(rc->readbuf.left, size);
1170         off = rc->readbuf.buf + rc->readbuf.size - rc->readbuf.left;
1171         memcpy(buf, off, nread);
1172
1173         buf += nread;
1174         size -= nread;
1175         rc->readbuf.left -= nread;
1176     }
1177     return ((ssize_t)origsize);
1178 }
1179
1180 /*
1181  * net_read likes to do a lot of little reads.  Buffer it.
1182  */
1183 static int
1184 net_read_fillbuf(rc, timeout, size)
1185     struct rsh_conn *rc;
1186     int timeout;
1187     int size;
1188 {
1189     fd_set readfds;
1190     struct timeval tv;
1191     if(size > sizeof(rc->readbuf.buf)) size = sizeof(rc->readbuf.buf);
1192
1193     FD_ZERO(&readfds);
1194     FD_SET(rc->read, &readfds);
1195     tv.tv_sec = timeout;
1196     tv.tv_usec = 0;
1197     switch (select(rc->read + 1, &readfds, NULL, NULL, &tv)) {
1198     case 0:
1199         errno = ETIMEDOUT;
1200         /* FALLTHROUGH */
1201     case -1:
1202         return (-1);
1203     case 1:
1204         assert(FD_ISSET(rc->read, &readfds));
1205         break;
1206     default:
1207         assert(0);
1208         break;
1209     }
1210     rc->readbuf.left = 0;
1211     rc->readbuf.size = read(rc->read, rc->readbuf.buf, size);
1212     if (rc->readbuf.size < 0)
1213         return (-1);
1214     rc->readbuf.left = rc->readbuf.size;
1215     return (0);
1216 }
1217
1218 #endif  /* RSH_SECURITY */