Imported Upstream version 3.3.3
[debian/amanda] / common-src / protocol.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 1991-1999 University of Maryland at College Park
4  * Copyright (c) 2007-2012 Zmanda, Inc.  All Rights Reserved.
5  * All Rights Reserved.
6  *
7  * Permission to use, copy, modify, distribute, and sell this software and its
8  * documentation for any purpose is hereby granted without fee, provided that
9  * the above copyright notice appear in all copies and that both that
10  * copyright notice and this permission notice appear in supporting
11  * documentation, and that the name of U.M. not be used in advertising or
12  * publicity pertaining to distribution of the software without specific,
13  * written prior permission.  U.M. makes no representations about the
14  * suitability of this software for any purpose.  It is provided "as is"
15  * without express or implied warranty.
16  *
17  * U.M. DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING ALL
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN NO EVENT SHALL U.M.
19  * BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
20  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
21  * OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
22  * CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
23  *
24  * Authors: the Amanda Development Team.  Its members are listed in a
25  * file named AUTHORS, in the root directory of this distribution.
26  */
27 /*
28  * $Id: protocol.c,v 1.45 2006/05/25 17:07:31 martinea Exp $
29  *
30  * implements amanda protocol
31  */
32 #include "amanda.h"
33 #include "conffile.h"
34 #include "event.h"
35 #include "packet.h"
36 #include "security.h"
37 #include "protocol.h"
38
39 #define proto_debug(i, ...) do {        \
40        if ((i) <= debug_protocol) {     \
41            dbprintf(__VA_ARGS__);       \
42        }                                \
43 } while (0)
44
45 /*
46  * Valid actions that can be passed to the state machine
47  */
48 typedef enum {
49         PA_START,
50         PA_TIMEOUT,
51         PA_ERROR,
52         PA_RCVDATA,
53         PA_CONTPEND,
54         PA_PENDING,
55         PA_CONTINUE,
56         PA_FINISH,
57         PA_ABORT
58 } p_action_t;
59
60 /*
61  * The current state type.  States are represented as function
62  * vectors.
63  */
64 struct proto;
65 typedef p_action_t (*pstate_t)(struct proto *, p_action_t, pkt_t *);
66
67 /*
68  * This is a request structure that is wrapped around a packet while it
69  * is being passed through amanda.  It holds the timeouts, state, and handles
70  * for each request.
71  */
72 typedef struct proto {
73     pstate_t state;                     /* current state of the request */
74     char *hostname;                     /* remote host */
75     const security_driver_t *security_driver;   /* for connect retries */
76     security_handle_t *security_handle; /* network stream for this req */
77     time_t timeout;                     /* seconds for this timeout */
78     time_t repwait;                     /* seconds to wait for reply */
79     time_t origtime;                    /* orig start time of this request */
80     time_t curtime;                     /* time when this attempt started */
81     int connecttries;                   /* times we'll retry a connect */
82     int resettries;                     /* times we'll resend a REQ */
83     int reqtries;                       /* times we'll wait for an a ACK */
84     pkt_t req;                          /* the actual wire request */
85     protocol_sendreq_callback continuation; /* call when req dies/finishes */
86     void *datap;                        /* opaque cookie passed to above */
87     char *(*conf_fn)(char *, void *);   /* configuration function */
88 } proto_t;
89
90 #define CONNECT_WAIT    5       /* secs between connect attempts */
91 #define ACK_WAIT        10      /* time (secs) to wait for ACK - keep short */
92 #define RESET_TRIES     2       /* num restarts (reboot/crash) */
93 #define CURTIME (time(0) - proto_init_time) /* time relative to start */
94
95 /* if no reply in an hour, just forget it */
96 #define DROP_DEAD_TIME(t)       (CURTIME - (t) > (60 * 60))
97
98 /* get the size of an array */
99 #define ASIZE(arr)      (int)(sizeof(arr) / sizeof((arr)[0]))
100
101 /*
102  * Initialization time
103  */
104 static time_t proto_init_time;
105
106 /* local functions */
107
108 static const char *action2str(p_action_t);
109 static const char *pstate2str(pstate_t);
110
111 static void connect_callback(void *, security_handle_t *, security_status_t);
112 static void connect_wait_callback(void *);
113 static void recvpkt_callback(void *, pkt_t *, security_status_t);
114
115 static p_action_t s_sendreq(proto_t *, p_action_t, pkt_t *);
116 static p_action_t s_ackwait(proto_t *, p_action_t, pkt_t *);
117 static p_action_t s_repwait(proto_t *, p_action_t, pkt_t *);
118 static void state_machine(proto_t *, p_action_t, pkt_t *);
119
120 /*
121  * -------------------
122  * Interface functions
123  */
124
125 /*
126  * Initialize globals.
127  */
128 void
129 protocol_init(void)
130 {
131
132     proto_init_time = time(NULL);
133 }
134
135 /*
136  * Generate a request packet, and submit it to the state machine
137  * for transmission.
138  */
139 void
140 protocol_sendreq(
141     const char *                hostname,
142     const security_driver_t *   security_driver,
143     char *                      (*conf_fn)(char *, void *),
144     const char *                req,
145     time_t                      repwait,
146     protocol_sendreq_callback   continuation,
147     void *                      datap)
148 {
149     proto_t *p;
150
151     p = alloc(SIZEOF(proto_t));
152     p->state = s_sendreq;
153     p->hostname = stralloc(hostname);
154     p->security_driver = security_driver;
155     /* p->security_handle set in connect_callback */
156     p->repwait = repwait;
157     p->origtime = CURTIME;
158     /* p->curtime set in the sendreq state */
159     p->connecttries = getconf_int(CNF_CONNECT_TRIES);
160     p->resettries = RESET_TRIES;
161     p->reqtries = getconf_int(CNF_REQ_TRIES);
162     p->conf_fn = conf_fn;
163     pkt_init(&p->req, P_REQ, "%s", req);
164
165     /*
166      * These are here for the caller
167      * We call the continuation function after processing is complete.
168      * We pass the datap on through untouched.  It is here so the caller
169      * has a way to keep state with each request.
170      */
171     p->continuation = continuation;
172     p->datap = datap;
173
174     proto_debug(1, _("protocol: security_connect: host %s -> p %p\n"), 
175                     hostname, p);
176
177     security_connect(p->security_driver, p->hostname, conf_fn, connect_callback,
178                          p, p->datap);
179 }
180
181 /*
182  * This is a callback for security_connect.  After the security layer
183  * has initiated a connection to the given host, this will be called
184  * with a security_handle_t.
185  *
186  * On error, the security_status_t arg will reflect errors which can
187  * be had via security_geterror on the handle.
188  */
189 static void
190 connect_callback(
191     void *              cookie,
192     security_handle_t * security_handle,
193     security_status_t   status)
194 {
195     proto_t *p = cookie;
196
197     assert(p != NULL);
198     p->security_handle = security_handle;
199
200     proto_debug(1, _("protocol: connect_callback: p %p\n"), p);
201
202     switch (status) {
203     case S_OK:
204         state_machine(p, PA_START, NULL);
205         break;
206
207     case S_TIMEOUT:
208         security_seterror(p->security_handle, _("timeout during connect"));
209         /* FALLTHROUGH */
210
211     case S_ERROR:
212         /*
213          * For timeouts or errors, retry a few times, waiting CONNECT_WAIT
214          * seconds between each attempt.  If they all fail, just return
215          * an error back to the caller.
216          */
217         if (--p->connecttries == 0) {
218             state_machine(p, PA_ABORT, NULL);
219         } else {
220             proto_debug(1, _("protocol: connect_callback: p %p: retrying %s\n"),
221                             p, p->hostname);
222             security_close(p->security_handle);
223             /* XXX overload p->security handle to hold the event handle */
224             p->security_handle =
225                 (security_handle_t *)event_register(CONNECT_WAIT, EV_TIME,
226                 connect_wait_callback, p);
227         }
228         break;
229
230     default:
231         assert(0);
232         break;
233     }
234 }
235
236 /*
237  * This gets called when a host has been put on a wait queue because
238  * initial connection attempts failed.
239  */
240 static void
241 connect_wait_callback(
242     void *      cookie)
243 {
244     proto_t *p = cookie;
245
246     event_release((event_handle_t *)p->security_handle);
247     security_connect(p->security_driver, p->hostname, p->conf_fn,
248         connect_callback, p, p->datap);
249 }
250
251
252 /*
253  * Does a one pass protocol sweep.  Handles any incoming packets that 
254  * are waiting to be processed, and then deals with any pending
255  * requests that have timed out.
256  *
257  * Callers should periodically call this after they have submitted
258  * requests if they plan on doing a lot of work.
259  */
260 void
261 protocol_check(void)
262 {
263
264     /* arg == 1 means don't block */
265     event_loop(1);
266 }
267
268
269 /*
270  * Does an infinite pass protocol sweep.  This doesn't return until all
271  * requests have been satisfied or have timed out.
272  *
273  * Callers should call this after they have finished submitting requests
274  * and are just waiting for all of the answers to come back.
275  */
276 void
277 protocol_run(void)
278 {
279
280     /* arg == 0 means block forever until no more events are left */
281     event_loop(0);
282 }
283
284
285 /*
286  * ------------------
287  * Internal functions
288  */
289
290 /*
291  * The guts of the protocol.  This handles the many paths a request can
292  * make, including retrying the request and acknowledgements, and dealing
293  * with timeouts and successfull replies.
294  */
295 static void
296 state_machine(
297     proto_t *   p,
298     p_action_t  action,
299     pkt_t *     pkt)
300 {
301     pstate_t curstate;
302     p_action_t retaction;
303
304     proto_debug(1, _("protocol: state_machine: initial: p %p action %s pkt %p\n"),
305                     p, action2str(action), (void *)NULL);
306
307     assert(p != NULL);
308     assert(action == PA_RCVDATA || pkt == NULL);
309     assert(p->state != NULL);
310
311     for (;;) {
312         proto_debug(1, _("protocol: state_machine: p %p state %s action %s\n"),
313                         p, pstate2str(p->state), action2str(action));
314         if (pkt != NULL) {
315             proto_debug(1, _("protocol: pkt: %s (t %d) orig REQ (t %d cur %d)\n"),
316                             pkt_type2str(pkt->type), (int)CURTIME,
317                             (int)p->origtime, (int)p->curtime);
318             proto_debug(1, _("protocol: pkt contents:\n-----\n%s-----\n"),
319                             pkt->body);
320         }
321
322         /*
323          * p->state is a function pointer to the current state a request
324          * is in.
325          *
326          * We keep track of the last state we were in so we can make
327          * sure states which return PA_CONTINUE really have transitioned
328          * the request to a new state.
329          */
330         curstate = p->state;
331
332         if (action == PA_ABORT)
333             /*
334              * If the passed action indicates a terminal error, then we
335              * need to move to abort right away.
336              */
337             retaction = PA_ABORT;
338         else
339             /*
340              * Else we run the state and perform the action it
341              * requests.
342              */
343             retaction = (*curstate)(p, action, pkt);
344
345         proto_debug(1, _("protocol: state_machine: p %p state %s returned %s\n"),
346                         p, pstate2str(p->state), action2str(retaction));
347
348         /*
349          * The state function is expected to return one of the following
350          * p_action_t's.
351          */
352         switch (retaction) {
353
354         /*
355          * Request is still waiting for more data off of the network.
356          * Setup to receive another pkt, and wait for the recv event
357          * to occur.
358          */
359         case PA_CONTPEND:
360             (*p->continuation)(p->datap, pkt, p->security_handle);
361             /* FALLTHROUGH */
362
363         case PA_PENDING:
364             proto_debug(1, _("protocol: state_machine: p %p state %s: timeout %d\n"),
365                             p, pstate2str(p->state), (int)p->timeout);
366             /*
367              * Get the security layer to register a receive event for this
368              * security handle on our behalf.  Have it timeout in p->timeout
369              * seconds.
370              */
371             security_recvpkt(p->security_handle, recvpkt_callback, p,
372                 (int)p->timeout);
373
374             return;
375
376         /*
377          * Request has moved to another state.  Loop and run it again.
378          */
379         case PA_CONTINUE:
380             assert(p->state != curstate);
381             proto_debug(1, _("protocol: state_machine: p %p: moved from %s to %s\n"),
382                             p, pstate2str(curstate),
383                             pstate2str(p->state));
384             continue;
385
386         /*
387          * Request has failed in some way locally.  The security_handle will
388          * contain an appropriate error message via security_geterror().  Set
389          * pkt to NULL to indicate failure to the callback, and then
390          * fall through to the common finish code.
391          *
392          * Note that remote failures finish via PA_FINISH, because they did
393          * complete successfully locally.
394          */
395         case PA_ABORT:
396             pkt = NULL;
397             /* FALLTHROUGH */
398
399         /*
400          * Request has completed successfully.
401          * Free up resources the request has used, call the continuation
402          * function specified by the caller and quit.
403          */
404         case PA_FINISH:
405             (*p->continuation)(p->datap, pkt, p->security_handle);
406             security_close(p->security_handle);
407             amfree(p->hostname);
408             amfree(p->req.body);
409             amfree(p);
410             return;
411
412         default:
413             assert(0);
414             break;      /* in case asserts are turned off */
415         }
416         /*NOTREACHED*/
417     }
418     /*NOTREACHED*/
419 }
420
421 /*
422  * The request send state.  Here, the packet is actually transmitted
423  * across the network.  After setting up timeouts, the request
424  * moves to the acknowledgement wait state.  We return from the state
425  * machine at this point, and let the request be received from the network.
426  */
427 static p_action_t
428 s_sendreq(
429     proto_t *   p,
430     p_action_t  action,
431     pkt_t *     pkt)
432 {
433
434     assert(p != NULL);
435     (void)action;       /* Quiet unused parameter warning */
436     (void)pkt;          /* Quiet unused parameter warning */
437
438     if (security_sendpkt(p->security_handle, &p->req) < 0) {
439         /* XXX should retry */
440         security_seterror(p->security_handle, _("error sending REQ: %s"),
441             security_geterror(p->security_handle));
442         return (PA_ABORT);
443     }
444
445     /*
446      * Remember when this request was first sent
447      */
448     p->curtime = CURTIME;
449
450     /*
451      * Move to the ackwait state
452      */
453     p->state = s_ackwait;
454     p->timeout = ACK_WAIT;
455     return (PA_PENDING);
456 }
457
458 /*
459  * The acknowledge wait state.  We can enter here two ways:
460  *
461  *  - the caller has received a packet, located the request for
462  *    that packet, and called us with an action of PA_RCVDATA.
463  *    
464  *  - the caller has determined that a request has timed out,
465  *    and has called us with PA_TIMEOUT.
466  *
467  * Here we process the acknowledgment, which usually means that
468  * the client has agreed to our request and is working on it.
469  * It will later send a reply when finished.
470  */
471 static p_action_t
472 s_ackwait(
473     proto_t *   p,
474     p_action_t  action,
475     pkt_t *     pkt)
476 {
477
478     assert(p != NULL);
479
480     /*
481      * The timeout case.  If our retry count has gone to zero
482      * fail this request.  Otherwise, move to the send state
483      * to retry the request.
484      */
485     if (action == PA_TIMEOUT) {
486         assert(pkt == NULL);
487
488         if (--p->reqtries == 0) {
489             security_seterror(p->security_handle, _("timeout waiting for ACK"));
490             return (PA_ABORT);
491         }
492
493         p->state = s_sendreq;
494         return (PA_CONTINUE);
495     }
496
497     assert(action == PA_RCVDATA);
498     assert(pkt != NULL);
499
500     /*
501      * The packet-received state.  Determine what kind of
502      * packet we received, and act based on the reply type.
503      */
504     switch (pkt->type) {
505
506     /*
507      * Received an ACK.  Everything's good.  The client is
508      * now working on the request.  We queue up again and
509      * wait for the reply.
510      */
511     case P_ACK:
512         p->state = s_repwait;
513         p->timeout = p->repwait;
514         return (PA_PENDING);
515
516     /*
517      * Received a NAK.  The request failed, so free up the
518      * resources associated with it and return.
519      *
520      * This should NOT return PA_ABORT because it is not a local failure.
521      */
522     case P_NAK:
523         return (PA_FINISH);
524
525     /*
526      * The client skipped the ACK, and replied right away.
527      * Move to the reply state to handle it.
528      */
529     case P_REP:
530     case P_PREP:
531         p->state = s_repwait;
532         return (PA_CONTINUE);
533
534     /*
535      * Unexpected packet.  Requeue this request and hope
536      * we get what we want later.
537      */
538     default:
539         return (PA_PENDING);
540     }
541 }
542
543 /*
544  * The reply wait state.  We enter here much like we do with s_ackwait.
545  */
546 static p_action_t
547 s_repwait(
548     proto_t *   p,
549     p_action_t  action,
550     pkt_t *     pkt)
551 {
552     pkt_t ack;
553
554     /*
555      * Timeout waiting for a reply.
556      */
557     if (action == PA_TIMEOUT) {
558         assert(pkt == NULL);
559
560         /*
561          * If we've blown our timeout limit, free up this packet and
562          * return.
563          */
564         if (p->resettries == 0 || DROP_DEAD_TIME(p->origtime)) {
565             security_seterror(p->security_handle, _("timeout waiting for REP"));
566             return (PA_ABORT);
567         }
568
569         /*
570          * We still have some tries left.  Resend the request.
571          */
572         p->resettries--;
573         p->state = s_sendreq;
574         p->reqtries = getconf_int(CNF_REQ_TRIES);
575         return (PA_CONTINUE);
576     }
577
578     assert(action == PA_RCVDATA);
579
580     /* Finish if we get a NAK */
581     if (pkt->type == P_NAK)
582         return (PA_FINISH);
583
584     /*
585      * We've received some data.  If we didn't get a reply,
586      * requeue the packet and retry.  Otherwise, acknowledge
587      * the reply, cleanup this packet, and return.
588      */
589     if (pkt->type != P_REP && pkt->type != P_PREP)
590         return (PA_PENDING);
591
592     if(pkt->type == P_REP) {
593         pkt_init_empty(&ack, P_ACK);
594         if (security_sendpkt(p->security_handle, &ack) < 0) {
595             /* XXX should retry */
596             amfree(ack.body);
597             security_seterror(p->security_handle, _("error sending ACK: %s"),
598                 security_geterror(p->security_handle));
599             return (PA_ABORT);
600         }
601         amfree(ack.body);
602         return (PA_FINISH);
603     }
604     else if(pkt->type == P_PREP) {
605         p->timeout = p->repwait - CURTIME + p->curtime + 1;
606         if (p->timeout <= 0)
607             p->timeout = 1;
608         return (PA_CONTPEND);
609     }
610
611     /* should never go here, shut up compiler warning */
612     return (PA_FINISH);
613 }
614
615 /*
616  * event callback that receives a packet
617  */
618 static void
619 recvpkt_callback(
620     void *              cookie,
621     pkt_t *             pkt,
622     security_status_t   status)
623 {
624     proto_t *p = cookie;
625
626     assert(p != NULL);
627
628     switch (status) {
629     case S_OK:
630         state_machine(p, PA_RCVDATA, pkt);
631         break;
632     case S_TIMEOUT:
633         state_machine(p, PA_TIMEOUT, NULL);
634         break;
635     case S_ERROR:
636         state_machine(p, PA_ABORT, NULL);
637         break;
638     default:
639         assert(0);
640         break;
641     }
642 }
643
644 /*
645  * --------------
646  * Misc functions
647  */
648
649 /*
650  * Convert a pstate_t into a printable form.
651  */
652 static const char *
653 pstate2str(
654     pstate_t    pstate)
655 {
656     static const struct {
657         pstate_t type;
658         const char name[12];
659     } pstates[] = {
660 #define X(s)    { s, stringize(s) }
661         X(s_sendreq),
662         X(s_ackwait),
663         X(s_repwait),
664 #undef X
665     };
666     int i;
667
668     for (i = 0; i < ASIZE(pstates); i++)
669         if (pstate == pstates[i].type)
670             return (pstates[i].name);
671     return (_("BOGUS PSTATE"));
672 }
673
674 /*
675  * Convert an p_action_t into a printable form
676  */
677 static const char *
678 action2str(
679     p_action_t  action)
680 {
681     static const struct {
682         p_action_t type;
683         const char name[12];
684     } actions[] = {
685 #define X(s)    { s, stringize(s) }
686         X(PA_START),
687         X(PA_TIMEOUT),
688         X(PA_ERROR),
689         X(PA_RCVDATA),
690         X(PA_CONTPEND),
691         X(PA_PENDING),
692         X(PA_CONTINUE),
693         X(PA_FINISH),
694         X(PA_ABORT),
695 #undef X
696     };
697     int i;
698
699     for (i = 0; i < ASIZE(actions); i++)
700         if (action == actions[i].type)
701             return (actions[i].name);
702     return (_("BOGUS ACTION"));
703 }