4274e70b3a7a50244bbc1be700b4b014417f1cf9
[debian/amanda] / common-src / protocol.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1999 University of Maryland at College Park
4  * All Rights Reserved.
5  *
6  * Permission to use, copy, modify, distribute, and sell this software and its
7  * documentation for any purpose is hereby granted without fee, provided that
8  * the above copyright notice appear in all copies and that both that
9  * copyright notice and this permission notice appear in supporting
10  * documentation, and that the name of U.M. not be used in advertising or
11  * publicity pertaining to distribution of the software without specific,
12  * written prior permission.  U.M. makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as is"
14  * without express or implied warranty.
15  *
16  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22  *
23  * Authors: the Amanda Development Team.  Its members are listed in a
24  * file named AUTHORS, in the root directory of this distribution.
25  */
26 /*
27  * $Id: protocol.c,v 1.39 2006/02/28 16:36:13 martinea Exp $
28  *
29  * implements amanda protocol
30  */
31 #include "amanda.h"
32 #include "event.h"
33 #include "packet.h"
34 #include "security.h"
35 #include "protocol.h"
36
37 /*#define       PROTO_DEBUG*/
38
39 /*
40  * Valid actions that can be passed to the state machine
41  */
42 typedef enum {
43     A_START, A_TIMEOUT, A_ERROR, A_RCVDATA, A_CONTPEND, A_PENDING,
44     A_CONTINUE, A_FINISH, A_ABORT
45 } action_t;
46
47 /*
48  * The current state type.  States are represented as function
49  * vectors.
50  */
51 struct proto;
52 typedef action_t (*pstate_t) P((struct proto *, action_t, pkt_t *));
53
54 /*
55  * This is a request structure that is wrapped around a packet while it
56  * is being passed through amanda.  It holds the timeouts, state, and handles
57  * for each request.
58  */
59 typedef struct proto {
60     pstate_t state;                     /* current state of the request */
61     char *hostname;                     /* remote host */
62     const security_driver_t *security_driver;   /* for connect retries */
63     security_handle_t *security_handle; /* network stream for this req */
64     time_t timeout;                     /* seconds for this timeout */
65     time_t repwait;                     /* seconds to wait for reply */
66     time_t origtime;                    /* orig start time of this request */
67     time_t curtime;                     /* time when this attempt started */
68     int connecttries;                   /* times we'll retry a connect */
69     int reqtries;                       /* times we'll resend a REQ */
70     int acktries;                       /* times we'll wait for an a ACK */
71     pkt_t req;                          /* the actual wire request */
72     protocol_sendreq_callback continuation; /* call when req dies/finishes */
73     void *datap;                        /* opaque cookie passed to above */
74     char *(*conf_fn) P((char *, void *));/* configuration function */
75 } proto_t;
76
77 #define CONNECT_TRIES   3       /* num retries after connect errors */
78 #define CONNECT_WAIT    5       /* secs between connect attempts */
79 #define ACK_WAIT        10      /* time (secs) to wait for ACK - keep short */
80 #define ACK_TRIES       3       /* num retries after ACK_WAIT timeout */
81 #define REQ_TRIES       2       /* num restarts (reboot/crash) */
82 #define CURTIME (time(0) - proto_init_time) /* time relative to start */
83
84 /* if no reply in an hour, just forget it */
85 #define DROP_DEAD_TIME(t)       (CURTIME - (t) > (60 * 60))
86
87 /* get the size of an array */
88 #define ASIZE(arr)      (sizeof(arr) / sizeof((arr)[0]))
89
90 /*
91  * Initialization time
92  */
93 static time_t proto_init_time;
94
95 /* local functions */
96
97 #ifdef PROTO_DEBUG
98 static const char *action2str P((action_t));
99 static const char *pstate2str P((pstate_t));
100 #endif
101
102 static void connect_callback P((void *, security_handle_t *,
103     security_status_t));
104 static void connect_wait_callback P((void *));
105 static void recvpkt_callback P((void *, pkt_t *, security_status_t));
106
107 static action_t s_sendreq P((proto_t *, action_t, pkt_t *));
108 static action_t s_ackwait P((proto_t *, action_t, pkt_t *));
109 static action_t s_repwait P((proto_t *, action_t, pkt_t *));
110 static void state_machine P((proto_t *, action_t, pkt_t *));
111
112 /*
113  * -------------------
114  * Interface functions
115  */
116
117 /*
118  * Initialize globals.
119  */
120 void
121 protocol_init()
122 {
123
124     proto_init_time = time(NULL);
125 }
126
127 /*
128  * Generate a request packet, and submit it to the state machine
129  * for transmission.
130  */
131 void
132 protocol_sendreq(hostname, security_driver, conf_fn, req, repwait, continuation, datap)
133     const char *hostname;
134     const security_driver_t *security_driver;
135     char *(*conf_fn) P((char *, void *));
136     const char *req;
137     time_t repwait;
138     protocol_sendreq_callback continuation;
139     void *datap;
140 {
141     proto_t *p;
142
143     p = alloc(sizeof(proto_t));
144     p->state = s_sendreq;
145     p->hostname = stralloc(hostname);
146     p->security_driver = security_driver;
147     /* p->security_handle set in connect_callback */
148     p->repwait = repwait;
149     p->origtime = CURTIME;
150     /* p->curtime set in the sendreq state */
151     p->connecttries = CONNECT_TRIES;
152     p->reqtries = REQ_TRIES;
153     p->acktries = ACK_TRIES;
154     p->conf_fn = conf_fn;
155     pkt_init(&p->req, P_REQ, req);
156
157     /*
158      * These are here for the caller
159      * We call the continuation function after processing is complete.
160      * We pass the datap on through untouched.  It is here so the caller
161      * has a way to keep state with each request.
162      */
163     p->continuation = continuation;
164     p->datap = datap;
165
166 #ifdef PROTO_DEBUG
167     dbprintf(("%s: security_connect: host %s -> p %X\n", 
168               debug_prefix_time(": protocol"), hostname, (int)p));
169 #endif
170
171     security_connect(p->security_driver, p->hostname, conf_fn, connect_callback, p);
172 }
173
174 /*
175  * This is a callback for security_connect.  After the security layer
176  * has initiated a connection to the given host, this will be called
177  * with a security_handle_t.
178  *
179  * On error, the security_status_t arg will reflect errors which can
180  * be had via security_geterror on the handle.
181  */
182 static void
183 connect_callback(cookie, security_handle, status)
184     void *cookie;
185     security_handle_t *security_handle;
186     security_status_t status;
187 {
188     proto_t *p = cookie;
189
190     assert(p != NULL);
191     p->security_handle = security_handle;
192
193 #ifdef PROTO_DEBUG
194     dbprintf(("%s: connect_callback: p %X\n",
195               debug_prefix_time(": protocol"), (int)p));
196 #endif
197
198     switch (status) {
199     case S_OK:
200         state_machine(p, A_START, NULL);
201         break;
202
203     case S_TIMEOUT:
204         security_seterror(p->security_handle, "timeout during connect");
205         /* FALLTHROUGH */
206
207     case S_ERROR:
208         /*
209          * For timeouts or errors, retry a few times, waiting CONNECT_WAIT
210          * seconds between each attempt.  If they all fail, just return
211          * an error back to the caller.
212          */
213         if (--p->connecttries == 0) {
214             state_machine(p, A_ABORT, NULL);
215         } else {
216 #ifdef PROTO_DEBUG
217     dbprintf(("%s: connect_callback: p %X: retrying %s\n",
218               debug_prefix_time(": protocol"), (int)p, p->hostname));
219 #endif
220             security_close(p->security_handle);
221             /* XXX overload p->security handle to hold the event handle */
222             p->security_handle =
223                 (security_handle_t *)event_register(CONNECT_WAIT, EV_TIME,
224                 connect_wait_callback, p);
225         }
226         break;
227
228     default:
229         assert(0);
230         break;
231     }
232 }
233
234 /*
235  * This gets called when a host has been put on a wait queue because
236  * initial connection attempts failed.
237  */
238 static void
239 connect_wait_callback(cookie)
240     void *cookie;
241 {
242     proto_t *p = cookie;
243
244     event_release((event_handle_t *)p->security_handle);
245     security_connect(p->security_driver, p->hostname, p->conf_fn,
246         connect_callback, p);
247 }
248
249
250 /*
251  * Does a one pass protocol sweep.  Handles any incoming packets that 
252  * are waiting to be processed, and then deals with any pending
253  * requests that have timed out.
254  *
255  * Callers should periodically call this after they have submitted
256  * requests if they plan on doing a lot of work.
257  */
258 void
259 protocol_check()
260 {
261
262     /* arg == 1 means don't block */
263     event_loop(1);
264 }
265
266
267 /*
268  * Does an infinite pass protocol sweep.  This doesn't return until all
269  * requests have been satisfied or have timed out.
270  *
271  * Callers should call this after they have finished submitting requests
272  * and are just waiting for all of the answers to come back.
273  */
274 void
275 protocol_run()
276 {
277
278     /* arg == 0 means block forever until no more events are left */
279     event_loop(0);
280 }
281
282
283 /*
284  * ------------------
285  * Internal functions
286  */
287
288 /*
289  * The guts of the protocol.  This handles the many paths a request can
290  * make, including retrying the request and acknowledgements, and dealing
291  * with timeouts and successfull replies.
292  */
293 static void
294 state_machine(p, action, pkt)
295     proto_t *p;
296     action_t action;
297     pkt_t *pkt;
298 {
299     pstate_t curstate;
300     action_t retaction;
301
302 #ifdef PROTO_DEBUG
303         dbprintf(("%s: state_machine: initial: p %X action %s pkt %X\n",
304                 debug_prefix_time(": protocol"),
305                 (int)p, action2str(action), NULL));
306 #endif
307
308     assert(p != NULL);
309     assert(action == A_RCVDATA || pkt == NULL);
310     assert(p->state != NULL);
311
312     for (;;) {
313 #ifdef PROTO_DEBUG
314         dbprintf(("%s: state_machine: p %X state %s action %s\n",
315                   debug_prefix_time(": protocol"),
316                   (int)p, pstate2str(p->state), action2str(action)));
317         if (pkt != NULL) {
318             dbprintf(("%s: pkt: %s (t %d) orig REQ (t %d cur %d)\n",
319                       debug_prefix(": protocol"),
320                       pkt_type2str(pkt->type), (int)CURTIME,
321                       (int)p->origtime, (int)p->curtime));
322             dbprintf(("%s: pkt contents:\n-----\n%s-----\n",
323                       debug_prefix(": protocol"), pkt->body));
324         }
325 #endif
326
327         /*
328          * p->state is a function pointer to the current state a request
329          * is in.
330          *
331          * We keep track of the last state we were in so we can make
332          * sure states which return A_CONTINUE really have transitioned
333          * the request to a new state.
334          */
335         curstate = p->state;
336
337         if (action == A_ABORT)
338             /*
339              * If the passed action indicates a terminal error, then we
340              * need to move to abort right away.
341              */
342             retaction = A_ABORT;
343         else
344             /*
345              * Else we run the state and perform the action it
346              * requests.
347              */
348             retaction = (*curstate)(p, action, pkt);
349
350 #ifdef PROTO_DEBUG
351         dbprintf(("%s: state_machine: p %X state %s returned %s\n",
352                   debug_prefix_time(": protocol"),
353                   (int)p, pstate2str(p->state), action2str(retaction)));
354 #endif
355
356         /*
357          * The state function is expected to return one of the following
358          * action_t's.
359          */
360         switch (retaction) {
361
362         /*
363          * Request is still waiting for more data off of the network.
364          * Setup to receive another pkt, and wait for the recv event
365          * to occur.
366          */
367         case A_CONTPEND:
368             (*p->continuation)(p->datap, pkt, p->security_handle);
369             /* FALLTHROUGH */
370
371         case A_PENDING:
372 #ifdef PROTO_DEBUG
373             dbprintf(("%s: state_machine: p %X state %s: timeout %d\n",
374                       debug_prefix_time(": protocol"),
375                       (int)p, pstate2str(p->state), (int)p->timeout));
376 #endif
377             /*
378              * Get the security layer to register a receive event for this
379              * security handle on our behalf.  Have it timeout in p->timeout
380              * seconds.
381              */
382             security_recvpkt(p->security_handle, recvpkt_callback, p,
383                 p->timeout);
384
385             return;
386
387         /*
388          * Request has moved to another state.  Loop and run it again.
389          */
390         case A_CONTINUE:
391             assert(p->state != curstate);
392 #ifdef PROTO_DEBUG
393             dbprintf(("%s: state_machine: p %X: moved from %s to %s\n",
394                       debug_prefix_time(": protocol"),
395                       (unsigned int)p, pstate2str(curstate),
396                       pstate2str(p->state)));
397 #endif
398             continue;
399
400         /*
401          * Request has failed in some way locally.  The security_handle will
402          * contain an appropriate error message via security_geterror().  Set
403          * pkt to NULL to indicate failure to the callback, and then
404          * fall through to the common finish code.
405          *
406          * Note that remote failures finish via A_FINISH, because they did
407          * complete successfully locally.
408          */
409         case A_ABORT:
410             pkt = NULL;
411             /* FALLTHROUGH */
412
413         /*
414          * Request has completed successfully.
415          * Free up resources the request has used, call the continuation
416          * function specified by the caller and quit.
417          */
418         case A_FINISH:
419             (*p->continuation)(p->datap, pkt, p->security_handle);
420             security_close(p->security_handle);
421             amfree(p->hostname);
422             amfree(p);
423             return;
424
425         default:
426             assert(0);
427             break;      /* in case asserts are turned off */
428         }
429         /* NOTREACHED */
430     }
431     /* NOTREACHED */
432 }
433
434 /*
435  * The request send state.  Here, the packet is actually transmitted
436  * across the network.  After setting up timeouts, the request
437  * moves to the acknowledgement wait state.  We return from the state
438  * machine at this point, and let the request be received from the network.
439  */
440 static action_t
441 s_sendreq(p, action, pkt)
442     proto_t *p;
443     action_t action;
444     pkt_t *pkt;
445 {
446
447     assert(p != NULL);
448
449     if (security_sendpkt(p->security_handle, &p->req) < 0) {
450         /* XXX should retry */
451         security_seterror(p->security_handle, "error sending REQ: %s",
452             security_geterror(p->security_handle));
453         return (A_ABORT);
454     }
455
456     /*
457      * Remember when this request was first sent
458      */
459     p->curtime = CURTIME;
460
461     /*
462      * Move to the ackwait state
463      */
464     p->state = s_ackwait;
465     p->timeout = ACK_WAIT;
466     return (A_PENDING);
467 }
468
469 /*
470  * The acknowledge wait state.  We can enter here two ways:
471  *
472  *  - the caller has received a packet, located the request for
473  *    that packet, and called us with an action of A_RCVDATA.
474  *    
475  *  - the caller has determined that a request has timed out,
476  *    and has called us with A_TIMEOUT.
477  *
478  * Here we process the acknowledgment, which usually means that
479  * the client has agreed to our request and is working on it.
480  * It will later send a reply when finished.
481  */
482 static action_t
483 s_ackwait(p, action, pkt)
484     proto_t *p;
485     action_t action;
486     pkt_t *pkt;
487 {
488
489     assert(p != NULL);
490
491     /*
492      * The timeout case.  If our retry count has gone to zero
493      * fail this request.  Otherwise, move to the send state
494      * to retry the request.
495      */
496     if (action == A_TIMEOUT) {
497         assert(pkt == NULL);
498
499         if (--p->acktries == 0) {
500             security_seterror(p->security_handle, "timeout waiting for ACK");
501             return (A_ABORT);
502         }
503
504         p->state = s_sendreq;
505         return (A_CONTINUE);
506     }
507
508     assert(action == A_RCVDATA);
509     assert(pkt != NULL);
510
511     /*
512      * The packet-received state.  Determine what kind of
513      * packet we received, and act based on the reply type.
514      */
515     switch (pkt->type) {
516
517     /*
518      * Received an ACK.  Everything's good.  The client is
519      * now working on the request.  We queue up again and
520      * wait for the reply.
521      */
522     case P_ACK:
523         p->state = s_repwait;
524         p->timeout = p->repwait;
525         return (A_PENDING);
526
527     /*
528      * Received a NAK.  The request failed, so free up the
529      * resources associated with it and return.
530      *
531      * This should NOT return A_ABORT because it is not a local failure.
532      */
533     case P_NAK:
534         return (A_FINISH);
535
536     /*
537      * The client skipped the ACK, and replied right away.
538      * Move to the reply state to handle it.
539      */
540     case P_REP:
541     case P_PREP:
542         p->state = s_repwait;
543         return (A_CONTINUE);
544
545     /*
546      * Unexpected packet.  Requeue this request and hope
547      * we get what we want later.
548      */
549     default:
550         return (A_PENDING);
551     }
552 }
553
554 /*
555  * The reply wait state.  We enter here much like we do with s_ackwait.
556  */
557 static action_t
558 s_repwait(p, action, pkt)
559     proto_t *p;
560     action_t action;
561     pkt_t *pkt;
562 {
563     pkt_t ack;
564
565     /*
566      * Timeout waiting for a reply.
567      */
568     if (action == A_TIMEOUT) {
569         assert(pkt == NULL);
570
571         /*
572          * If we've blown our timeout limit, free up this packet and
573          * return.
574          */
575         if (p->reqtries == 0 || DROP_DEAD_TIME(p->origtime)) {
576             security_seterror(p->security_handle, "timeout waiting for REP");
577             return (A_ABORT);
578         }
579
580         /*
581          * We still have some tries left.  Resend the request.
582          */
583         p->reqtries--;
584         p->state = s_sendreq;
585         p->acktries = ACK_TRIES;
586         return (A_CONTINUE);
587     }
588
589     assert(action == A_RCVDATA);
590
591     /*
592      * We've received some data.  If we didn't get a reply,
593      * requeue the packet and retry.  Otherwise, acknowledge
594      * the reply, cleanup this packet, and return.
595      */
596     if (pkt->type != P_REP && pkt->type != P_PREP)
597         return (A_PENDING);
598
599     if(pkt->type == P_REP) {
600         pkt_init(&ack, P_ACK, "");
601         if (security_sendpkt(p->security_handle, &ack) < 0) {
602             /* XXX should retry */
603             security_seterror(p->security_handle, "error sending ACK: %s",
604                 security_geterror(p->security_handle));
605             return (A_ABORT);
606         }
607         return (A_FINISH);
608     }
609     else if(pkt->type == P_PREP) {
610         p->timeout = p->repwait - CURTIME + p->curtime + 1;
611         return (A_CONTPEND);
612     }
613
614     /* should never go here, shut up compiler warning */
615     return (A_FINISH);
616 }
617
618 /*
619  * event callback that receives a packet
620  */
621 static void
622 recvpkt_callback(cookie, pkt, status)
623     void *cookie;
624     pkt_t *pkt;
625     security_status_t status;
626 {
627     proto_t *p = cookie;
628
629     assert(p != NULL);
630
631     switch (status) {
632     case S_OK:
633         state_machine(p, A_RCVDATA, pkt);
634         break;
635     case S_TIMEOUT:
636         state_machine(p, A_TIMEOUT, NULL);
637         break;
638     case S_ERROR:
639         state_machine(p, A_ABORT, NULL);
640         break;
641     default:
642         assert(0);
643         break;
644     }
645 }
646
647 /*
648  * --------------
649  * Misc functions
650  */
651
652 #ifdef PROTO_DEBUG
653 /*
654  * Convert a pstate_t into a printable form.
655  */
656 static const char *
657 pstate2str(pstate)
658     pstate_t pstate;
659 {
660     static const struct {
661         pstate_t type;
662         const char name[12];
663     } pstates[] = {
664 #define X(s)    { s, stringize(s) }
665         X(s_sendreq),
666         X(s_ackwait),
667         X(s_repwait),
668 #undef X
669     };
670     int i;
671
672     for (i = 0; i < ASIZE(pstates); i++)
673         if (pstate == pstates[i].type)
674             return (pstates[i].name);
675     return ("BOGUS PSTATE");
676 }
677
678 /*
679  * Convert an action_t into a printable form
680  */
681 static const char *
682 action2str(action)
683     action_t action;
684 {
685     static const struct {
686         action_t type;
687         const char name[12];
688     } actions[] = {
689 #define X(s)    { s, stringize(s) }
690         X(A_START),
691         X(A_TIMEOUT),
692         X(A_ERROR),
693         X(A_RCVDATA),
694         X(A_CONTPEND),
695         X(A_PENDING),
696         X(A_CONTINUE),
697         X(A_FINISH),
698         X(A_ABORT),
699 #undef X
700     };
701     int i;
702
703     for (i = 0; i < ASIZE(actions); i++)
704         if (action == actions[i].type)
705             return (actions[i].name);
706     return ("BOGUS ACTION");
707 }
708 #endif  /* PROTO_DEBUG */