Imported Upstream version 2.5.1
[debian/amanda] / common-src / protocol.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1999 University of Maryland at College Park
4  * All Rights Reserved.
5  *
6  * Permission to use, copy, modify, distribute, and sell this software and its
7  * documentation for any purpose is hereby granted without fee, provided that
8  * the above copyright notice appear in all copies and that both that
9  * copyright notice and this permission notice appear in supporting
10  * documentation, and that the name of U.M. not be used in advertising or
11  * publicity pertaining to distribution of the software without specific,
12  * written prior permission.  U.M. makes no representations about the
13  * suitability of this software for any purpose.  It is provided "as is"
14  * without express or implied warranty.
15  *
16  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
18  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
19  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
20  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
21  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
22  *
23  * Authors: the Amanda Development Team.  Its members are listed in a
24  * file named AUTHORS, in the root directory of this distribution.
25  */
26 /*
27  * $Id: protocol.c,v 1.45 2006/05/25 17:07:31 martinea Exp $
28  *
29  * implements amanda protocol
30  */
31 #include "amanda.h"
32 #include "event.h"
33 #include "packet.h"
34 #include "security.h"
35 #include "protocol.h"
36
37 /*#define       PROTO_DEBUG*/
38
39 /*
40  * Valid actions that can be passed to the state machine
41  */
42 typedef enum {
43         PA_START,
44         PA_TIMEOUT,
45         PA_ERROR,
46         PA_RCVDATA,
47         PA_CONTPEND,
48         PA_PENDING,
49         PA_CONTINUE,
50         PA_FINISH,
51         PA_ABORT
52 } p_action_t;
53
54 /*
55  * The current state type.  States are represented as function
56  * vectors.
57  */
58 struct proto;
59 typedef p_action_t (*pstate_t)(struct proto *, p_action_t, pkt_t *);
60
61 /*
62  * This is a request structure that is wrapped around a packet while it
63  * is being passed through amanda.  It holds the timeouts, state, and handles
64  * for each request.
65  */
66 typedef struct proto {
67     pstate_t state;                     /* current state of the request */
68     char *hostname;                     /* remote host */
69     const security_driver_t *security_driver;   /* for connect retries */
70     security_handle_t *security_handle; /* network stream for this req */
71     time_t timeout;                     /* seconds for this timeout */
72     time_t repwait;                     /* seconds to wait for reply */
73     time_t origtime;                    /* orig start time of this request */
74     time_t curtime;                     /* time when this attempt started */
75     int connecttries;                   /* times we'll retry a connect */
76     int reqtries;                       /* times we'll resend a REQ */
77     int acktries;                       /* times we'll wait for an a ACK */
78     pkt_t req;                          /* the actual wire request */
79     protocol_sendreq_callback continuation; /* call when req dies/finishes */
80     void *datap;                        /* opaque cookie passed to above */
81     char *(*conf_fn)(char *, void *);   /* configuration function */
82 } proto_t;
83
84 #define CONNECT_TRIES   3       /* num retries after connect errors */
85 #define CONNECT_WAIT    5       /* secs between connect attempts */
86 #define ACK_WAIT        10      /* time (secs) to wait for ACK - keep short */
87 #define ACK_TRIES       3       /* num retries after ACK_WAIT timeout */
88 #define REQ_TRIES       2       /* num restarts (reboot/crash) */
89 #define CURTIME (time(0) - proto_init_time) /* time relative to start */
90
91 /* if no reply in an hour, just forget it */
92 #define DROP_DEAD_TIME(t)       (CURTIME - (t) > (60 * 60))
93
94 /* get the size of an array */
95 #define ASIZE(arr)      (int)(sizeof(arr) / sizeof((arr)[0]))
96
97 /*
98  * Initialization time
99  */
100 static time_t proto_init_time;
101
102 /* local functions */
103
104 #ifdef PROTO_DEBUG
105 static const char *action2str(p_action_t);
106 static const char *pstate2str(pstate_t);
107 #endif
108
109 static void connect_callback(void *, security_handle_t *, security_status_t);
110 static void connect_wait_callback(void *);
111 static void recvpkt_callback(void *, pkt_t *, security_status_t);
112
113 static p_action_t s_sendreq(proto_t *, p_action_t, pkt_t *);
114 static p_action_t s_ackwait(proto_t *, p_action_t, pkt_t *);
115 static p_action_t s_repwait(proto_t *, p_action_t, pkt_t *);
116 static void state_machine(proto_t *, p_action_t, pkt_t *);
117
118 /*
119  * -------------------
120  * Interface functions
121  */
122
123 /*
124  * Initialize globals.
125  */
126 void
127 protocol_init(void)
128 {
129
130     proto_init_time = time(NULL);
131 }
132
133 /*
134  * Generate a request packet, and submit it to the state machine
135  * for transmission.
136  */
137 void
138 protocol_sendreq(
139     const char *                hostname,
140     const security_driver_t *   security_driver,
141     char *                      (*conf_fn)(char *, void *),
142     const char *                req,
143     time_t                      repwait,
144     protocol_sendreq_callback   continuation,
145     void *                      datap)
146 {
147     proto_t *p;
148
149     p = alloc(SIZEOF(proto_t));
150     p->state = s_sendreq;
151     p->hostname = stralloc(hostname);
152     p->security_driver = security_driver;
153     /* p->security_handle set in connect_callback */
154     p->repwait = repwait;
155     p->origtime = CURTIME;
156     /* p->curtime set in the sendreq state */
157     p->connecttries = CONNECT_TRIES;
158     p->reqtries = REQ_TRIES;
159     p->acktries = ACK_TRIES;
160     p->conf_fn = conf_fn;
161     pkt_init(&p->req, P_REQ, req);
162
163     /*
164      * These are here for the caller
165      * We call the continuation function after processing is complete.
166      * We pass the datap on through untouched.  It is here so the caller
167      * has a way to keep state with each request.
168      */
169     p->continuation = continuation;
170     p->datap = datap;
171
172 #ifdef PROTO_DEBUG
173     dbprintf(("%s: security_connect: host %s -> p %p\n", 
174               debug_prefix_time(": protocol"), hostname, p));
175 #endif
176
177     security_connect(p->security_driver, p->hostname, conf_fn, connect_callback,
178                          p, p->datap);
179 }
180
181 /*
182  * This is a callback for security_connect.  After the security layer
183  * has initiated a connection to the given host, this will be called
184  * with a security_handle_t.
185  *
186  * On error, the security_status_t arg will reflect errors which can
187  * be had via security_geterror on the handle.
188  */
189 static void
190 connect_callback(
191     void *              cookie,
192     security_handle_t * security_handle,
193     security_status_t   status)
194 {
195     proto_t *p = cookie;
196
197     assert(p != NULL);
198     p->security_handle = security_handle;
199
200 #ifdef PROTO_DEBUG
201     dbprintf(("%s: connect_callback: p %p\n",
202               debug_prefix_time(": protocol"), p));
203 #endif
204
205     switch (status) {
206     case S_OK:
207         state_machine(p, PA_START, NULL);
208         break;
209
210     case S_TIMEOUT:
211         security_seterror(p->security_handle, "timeout during connect");
212         /* FALLTHROUGH */
213
214     case S_ERROR:
215         /*
216          * For timeouts or errors, retry a few times, waiting CONNECT_WAIT
217          * seconds between each attempt.  If they all fail, just return
218          * an error back to the caller.
219          */
220         if (--p->connecttries == 0) {
221             state_machine(p, PA_ABORT, NULL);
222         } else {
223 #ifdef PROTO_DEBUG
224     dbprintf(("%s: connect_callback: p %p: retrying %s\n",
225               debug_prefix_time(": protocol"), p, p->hostname));
226 #endif
227             security_close(p->security_handle);
228             /* XXX overload p->security handle to hold the event handle */
229             p->security_handle =
230                 (security_handle_t *)event_register(CONNECT_WAIT, EV_TIME,
231                 connect_wait_callback, p);
232         }
233         break;
234
235     default:
236         assert(0);
237         break;
238     }
239 }
240
241 /*
242  * This gets called when a host has been put on a wait queue because
243  * initial connection attempts failed.
244  */
245 static void
246 connect_wait_callback(
247     void *      cookie)
248 {
249     proto_t *p = cookie;
250
251     event_release((event_handle_t *)p->security_handle);
252     security_connect(p->security_driver, p->hostname, p->conf_fn,
253         connect_callback, p, p->datap);
254 }
255
256
257 /*
258  * Does a one pass protocol sweep.  Handles any incoming packets that 
259  * are waiting to be processed, and then deals with any pending
260  * requests that have timed out.
261  *
262  * Callers should periodically call this after they have submitted
263  * requests if they plan on doing a lot of work.
264  */
265 void
266 protocol_check(void)
267 {
268
269     /* arg == 1 means don't block */
270     event_loop(1);
271 }
272
273
274 /*
275  * Does an infinite pass protocol sweep.  This doesn't return until all
276  * requests have been satisfied or have timed out.
277  *
278  * Callers should call this after they have finished submitting requests
279  * and are just waiting for all of the answers to come back.
280  */
281 void
282 protocol_run(void)
283 {
284
285     /* arg == 0 means block forever until no more events are left */
286     event_loop(0);
287 }
288
289
290 /*
291  * ------------------
292  * Internal functions
293  */
294
295 /*
296  * The guts of the protocol.  This handles the many paths a request can
297  * make, including retrying the request and acknowledgements, and dealing
298  * with timeouts and successfull replies.
299  */
300 static void
301 state_machine(
302     proto_t *   p,
303     p_action_t  action,
304     pkt_t *     pkt)
305 {
306     pstate_t curstate;
307     p_action_t retaction;
308
309 #ifdef PROTO_DEBUG
310         dbprintf(("%s: state_machine: initial: p %p action %s pkt %p\n",
311                 debug_prefix_time(": protocol"),
312                 p, action2str(action), NULL));
313 #endif
314
315     assert(p != NULL);
316     assert(action == PA_RCVDATA || pkt == NULL);
317     assert(p->state != NULL);
318
319     for (;;) {
320 #ifdef PROTO_DEBUG
321         dbprintf(("%s: state_machine: p %p state %s action %s\n",
322                   debug_prefix_time(": protocol"),
323                   p, pstate2str(p->state), action2str(action)));
324         if (pkt != NULL) {
325             dbprintf(("%s: pkt: %s (t %d) orig REQ (t %d cur %d)\n",
326                       debug_prefix(": protocol"),
327                       pkt_type2str(pkt->type), (int)CURTIME,
328                       (int)p->origtime, (int)p->curtime));
329             dbprintf(("%s: pkt contents:\n-----\n%s-----\n",
330                       debug_prefix(": protocol"), pkt->body));
331         }
332 #endif
333
334         /*
335          * p->state is a function pointer to the current state a request
336          * is in.
337          *
338          * We keep track of the last state we were in so we can make
339          * sure states which return PA_CONTINUE really have transitioned
340          * the request to a new state.
341          */
342         curstate = p->state;
343
344         if (action == PA_ABORT)
345             /*
346              * If the passed action indicates a terminal error, then we
347              * need to move to abort right away.
348              */
349             retaction = PA_ABORT;
350         else
351             /*
352              * Else we run the state and perform the action it
353              * requests.
354              */
355             retaction = (*curstate)(p, action, pkt);
356
357 #ifdef PROTO_DEBUG
358         dbprintf(("%s: state_machine: p %p state %s returned %s\n",
359                   debug_prefix_time(": protocol"),
360                   p, pstate2str(p->state), action2str(retaction)));
361 #endif
362
363         /*
364          * The state function is expected to return one of the following
365          * p_action_t's.
366          */
367         switch (retaction) {
368
369         /*
370          * Request is still waiting for more data off of the network.
371          * Setup to receive another pkt, and wait for the recv event
372          * to occur.
373          */
374         case PA_CONTPEND:
375             (*p->continuation)(p->datap, pkt, p->security_handle);
376             /* FALLTHROUGH */
377
378         case PA_PENDING:
379 #ifdef PROTO_DEBUG
380             dbprintf(("%s: state_machine: p %p state %s: timeout %d\n",
381                       debug_prefix_time(": protocol"),
382                       p, pstate2str(p->state), (int)p->timeout));
383 #endif
384             /*
385              * Get the security layer to register a receive event for this
386              * security handle on our behalf.  Have it timeout in p->timeout
387              * seconds.
388              */
389             security_recvpkt(p->security_handle, recvpkt_callback, p,
390                 (int)p->timeout);
391
392             return;
393
394         /*
395          * Request has moved to another state.  Loop and run it again.
396          */
397         case PA_CONTINUE:
398             assert(p->state != curstate);
399 #ifdef PROTO_DEBUG
400             dbprintf(("%s: state_machine: p %p: moved from %s to %s\n",
401                       debug_prefix_time(": protocol"),
402                       p, pstate2str(curstate),
403                       pstate2str(p->state)));
404 #endif
405             continue;
406
407         /*
408          * Request has failed in some way locally.  The security_handle will
409          * contain an appropriate error message via security_geterror().  Set
410          * pkt to NULL to indicate failure to the callback, and then
411          * fall through to the common finish code.
412          *
413          * Note that remote failures finish via PA_FINISH, because they did
414          * complete successfully locally.
415          */
416         case PA_ABORT:
417             pkt = NULL;
418             /* FALLTHROUGH */
419
420         /*
421          * Request has completed successfully.
422          * Free up resources the request has used, call the continuation
423          * function specified by the caller and quit.
424          */
425         case PA_FINISH:
426             (*p->continuation)(p->datap, pkt, p->security_handle);
427             security_close(p->security_handle);
428             amfree(p->hostname);
429             amfree(p->req.body);
430             amfree(p);
431             return;
432
433         default:
434             assert(0);
435             break;      /* in case asserts are turned off */
436         }
437         /*NOTREACHED*/
438     }
439     /*NOTREACHED*/
440 }
441
442 /*
443  * The request send state.  Here, the packet is actually transmitted
444  * across the network.  After setting up timeouts, the request
445  * moves to the acknowledgement wait state.  We return from the state
446  * machine at this point, and let the request be received from the network.
447  */
448 static p_action_t
449 s_sendreq(
450     proto_t *   p,
451     p_action_t  action,
452     pkt_t *     pkt)
453 {
454
455     assert(p != NULL);
456     (void)action;       /* Quiet unused parameter warning */
457     (void)pkt;          /* Quiet unused parameter warning */
458
459     if (security_sendpkt(p->security_handle, &p->req) < 0) {
460         /* XXX should retry */
461         security_seterror(p->security_handle, "error sending REQ: %s",
462             security_geterror(p->security_handle));
463         return (PA_ABORT);
464     }
465
466     /*
467      * Remember when this request was first sent
468      */
469     p->curtime = CURTIME;
470
471     /*
472      * Move to the ackwait state
473      */
474     p->state = s_ackwait;
475     p->timeout = ACK_WAIT;
476     return (PA_PENDING);
477 }
478
479 /*
480  * The acknowledge wait state.  We can enter here two ways:
481  *
482  *  - the caller has received a packet, located the request for
483  *    that packet, and called us with an action of PA_RCVDATA.
484  *    
485  *  - the caller has determined that a request has timed out,
486  *    and has called us with PA_TIMEOUT.
487  *
488  * Here we process the acknowledgment, which usually means that
489  * the client has agreed to our request and is working on it.
490  * It will later send a reply when finished.
491  */
492 static p_action_t
493 s_ackwait(
494     proto_t *   p,
495     p_action_t  action,
496     pkt_t *     pkt)
497 {
498
499     assert(p != NULL);
500
501     /*
502      * The timeout case.  If our retry count has gone to zero
503      * fail this request.  Otherwise, move to the send state
504      * to retry the request.
505      */
506     if (action == PA_TIMEOUT) {
507         assert(pkt == NULL);
508
509         if (--p->acktries == 0) {
510             security_seterror(p->security_handle, "timeout waiting for ACK");
511             return (PA_ABORT);
512         }
513
514         p->state = s_sendreq;
515         return (PA_CONTINUE);
516     }
517
518     assert(action == PA_RCVDATA);
519     assert(pkt != NULL);
520
521     /*
522      * The packet-received state.  Determine what kind of
523      * packet we received, and act based on the reply type.
524      */
525     switch (pkt->type) {
526
527     /*
528      * Received an ACK.  Everything's good.  The client is
529      * now working on the request.  We queue up again and
530      * wait for the reply.
531      */
532     case P_ACK:
533         p->state = s_repwait;
534         p->timeout = p->repwait;
535         return (PA_PENDING);
536
537     /*
538      * Received a NAK.  The request failed, so free up the
539      * resources associated with it and return.
540      *
541      * This should NOT return PA_ABORT because it is not a local failure.
542      */
543     case P_NAK:
544         return (PA_FINISH);
545
546     /*
547      * The client skipped the ACK, and replied right away.
548      * Move to the reply state to handle it.
549      */
550     case P_REP:
551     case P_PREP:
552         p->state = s_repwait;
553         return (PA_CONTINUE);
554
555     /*
556      * Unexpected packet.  Requeue this request and hope
557      * we get what we want later.
558      */
559     default:
560         return (PA_PENDING);
561     }
562 }
563
564 /*
565  * The reply wait state.  We enter here much like we do with s_ackwait.
566  */
567 static p_action_t
568 s_repwait(
569     proto_t *   p,
570     p_action_t  action,
571     pkt_t *     pkt)
572 {
573     pkt_t ack;
574
575     /*
576      * Timeout waiting for a reply.
577      */
578     if (action == PA_TIMEOUT) {
579         assert(pkt == NULL);
580
581         /*
582          * If we've blown our timeout limit, free up this packet and
583          * return.
584          */
585         if (p->reqtries == 0 || DROP_DEAD_TIME(p->origtime)) {
586             security_seterror(p->security_handle, "timeout waiting for REP");
587             return (PA_ABORT);
588         }
589
590         /*
591          * We still have some tries left.  Resend the request.
592          */
593         p->reqtries--;
594         p->state = s_sendreq;
595         p->acktries = ACK_TRIES;
596         return (PA_CONTINUE);
597     }
598
599     assert(action == PA_RCVDATA);
600
601     /*
602      * We've received some data.  If we didn't get a reply,
603      * requeue the packet and retry.  Otherwise, acknowledge
604      * the reply, cleanup this packet, and return.
605      */
606     if (pkt->type != P_REP && pkt->type != P_PREP)
607         return (PA_PENDING);
608
609     if(pkt->type == P_REP) {
610         pkt_init(&ack, P_ACK, "");
611         if (security_sendpkt(p->security_handle, &ack) < 0) {
612             /* XXX should retry */
613             amfree(ack.body);
614             security_seterror(p->security_handle, "error sending ACK: %s",
615                 security_geterror(p->security_handle));
616             return (PA_ABORT);
617         }
618         amfree(ack.body);
619         return (PA_FINISH);
620     }
621     else if(pkt->type == P_PREP) {
622         p->timeout = p->repwait - CURTIME + p->curtime + 1;
623         return (PA_CONTPEND);
624     }
625
626     /* should never go here, shut up compiler warning */
627     return (PA_FINISH);
628 }
629
630 /*
631  * event callback that receives a packet
632  */
633 static void
634 recvpkt_callback(
635     void *              cookie,
636     pkt_t *             pkt,
637     security_status_t   status)
638 {
639     proto_t *p = cookie;
640
641     assert(p != NULL);
642
643     switch (status) {
644     case S_OK:
645         state_machine(p, PA_RCVDATA, pkt);
646         break;
647     case S_TIMEOUT:
648         state_machine(p, PA_TIMEOUT, NULL);
649         break;
650     case S_ERROR:
651         state_machine(p, PA_ABORT, NULL);
652         break;
653     default:
654         assert(0);
655         break;
656     }
657 }
658
659 /*
660  * --------------
661  * Misc functions
662  */
663
664 #ifdef PROTO_DEBUG
665 /*
666  * Convert a pstate_t into a printable form.
667  */
668 static const char *
669 pstate2str(
670     pstate_t    pstate)
671 {
672     static const struct {
673         pstate_t type;
674         const char name[12];
675     } pstates[] = {
676 #define X(s)    { s, stringize(s) }
677         X(s_sendreq),
678         X(s_ackwait),
679         X(s_repwait),
680 #undef X
681     };
682     int i;
683
684     for (i = 0; i < ASIZE(pstates); i++)
685         if (pstate == pstates[i].type)
686             return (pstates[i].name);
687     return ("BOGUS PSTATE");
688 }
689
690 /*
691  * Convert an p_action_t into a printable form
692  */
693 static const char *
694 action2str(
695     p_action_t  action)
696 {
697     static const struct {
698         p_action_t type;
699         const char name[12];
700     } actions[] = {
701 #define X(s)    { s, stringize(s) }
702         X(PA_START),
703         X(PA_TIMEOUT),
704         X(PA_ERROR),
705         X(PA_RCVDATA),
706         X(PA_CONTPEND),
707         X(PA_PENDING),
708         X(PA_CONTINUE),
709         X(PA_FINISH),
710         X(PA_ABORT),
711 #undef X
712     };
713     int i;
714
715     for (i = 0; i < ASIZE(actions); i++)
716         if (action == actions[i].type)
717             return (actions[i].name);
718     return ("BOGUS ACTION");
719 }
720 #endif  /* PROTO_DEBUG */