/*
* Amanda, The Advanced Maryland Automatic Network Disk Archiver
- * Copyright (c) 1991-1998 University of Maryland at College Park
+ * Copyright (c) 1991-1999 University of Maryland at College Park
* All Rights Reserved.
*
* Permission to use, copy, modify, distribute, and sell this software and its
* file named AUTHORS, in the root directory of this distribution.
*/
/*
- * $Id: protocol.c,v 1.27.2.1.6.3 2004/04/14 13:24:35 martinea Exp $
+ * $Id: protocol.c,v 1.45 2006/05/25 17:07:31 martinea Exp $
*
* implements amanda protocol
*/
#include "amanda.h"
+#include "conffile.h"
+#include "event.h"
+#include "packet.h"
+#include "security.h"
#include "protocol.h"
-#include "version.h"
-#ifdef KRB4_SECURITY
-# include "krb4-security.h"
-#endif
-#define ACK_WAIT 10 /* time (secs) to wait for ACK - keep short */
-#define ACK_TRIES 3 /* # times we'll retry after ACK_WAIT timeout */
-#define REQ_TRIES 2 /* # times client can start over (reboot/crash) */
+#define proto_debug(i, ...) do { \
+ if ((i) <= debug_protocol) { \
+ dbprintf(__VA_ARGS__); \
+ } \
+} while (0)
-#define DROP_DEAD_TIME (60*60) /* If no reply in an hour, just forget it */
-
-#define MAX_HANDLES 4096
-#define OFS_DIGITS 3 /* log2(MAX_HANDLES)/4 */
-
-proto_t *pending_head = NULL;
-proto_t *pending_tail = NULL;
-int pending_qlength = 0;
+/*
+ * Valid actions that can be passed to the state machine
+ */
+typedef enum {
+ PA_START,
+ PA_TIMEOUT,
+ PA_ERROR,
+ PA_RCVDATA,
+ PA_CONTPEND,
+ PA_PENDING,
+ PA_CONTINUE,
+ PA_FINISH,
+ PA_ABORT
+} p_action_t;
-int proto_socket = -1;
-int proto_global_seq = 0;
-#define relseq(s) (s-proto_global_seq)
+/*
+ * The current state type. States are represented as function
+ * vectors.
+ */
+struct proto;
+typedef p_action_t (*pstate_t)(struct proto *, p_action_t, pkt_t *);
-proto_t **proto_handle_table;
-proto_t **proto_next_handle;
-int proto_handles;
+/*
+ * This is a request structure that is wrapped around a packet while it
+ * is being passed through amanda. It holds the timeouts, state, and handles
+ * for each request.
+ */
+typedef struct proto {
+ pstate_t state; /* current state of the request */
+ char *hostname; /* remote host */
+ const security_driver_t *security_driver; /* for connect retries */
+ security_handle_t *security_handle; /* network stream for this req */
+ time_t timeout; /* seconds for this timeout */
+ time_t repwait; /* seconds to wait for reply */
+ time_t origtime; /* orig start time of this request */
+ time_t curtime; /* time when this attempt started */
+ int connecttries; /* times we'll retry a connect */
+ int resettries; /* times we'll resend a REQ */
+ int reqtries; /* times we'll wait for an a ACK */
+ pkt_t req; /* the actual wire request */
+ protocol_sendreq_callback continuation; /* call when req dies/finishes */
+ void *datap; /* opaque cookie passed to above */
+ char *(*conf_fn)(char *, void *); /* configuration function */
+} proto_t;
+
+#define CONNECT_WAIT 5 /* secs between connect attempts */
+#define ACK_WAIT 10 /* time (secs) to wait for ACK - keep short */
+#define RESET_TRIES 2 /* num restarts (reboot/crash) */
+#define CURTIME (time(0) - proto_init_time) /* time relative to start */
+
+/* if no reply in an hour, just forget it */
+#define DROP_DEAD_TIME(t) (CURTIME - (t) > (60 * 60))
+
+/* get the size of an array */
+#define ASIZE(arr) (int)(sizeof(arr) / sizeof((arr)[0]))
-time_t proto_init_time;
-#define CURTIME (time(0)-proto_init_time)
+/*
+ * Initialization time
+ */
+static time_t proto_init_time;
/* local functions */
-static char *prnpstate P((pstate_t s));
-static char *prnaction P((action_t s));
-#ifdef PROTO_DEBUG
-static char *prnpktype P((pktype_t s));
-#endif
-static void pending_enqueue P((proto_t *newp));
-static proto_t *pending_dequeue P((void));
-static void pending_remove P((proto_t *p));
-static void alloc_handle P((proto_t *p));
-static void free_handle P((proto_t *p));
-static void hex P((char *str, int digits, unsigned int v));
-static int unhex P((char *str, int digits));
-static proto_t *handle2ptr P((char *str));
-static char *ptr2handle P((proto_t *p));
-static void eat_string P((dgram_t *msg, char *str));
-static int parse_integer P((dgram_t *msg));
-static char *parse_string P((dgram_t *msg));
-static char *parse_line P((dgram_t *msg));
-void parse_pkt_header P((pkt_t *pkt));
-static void setup_dgram P((proto_t *p, dgram_t *msg,
- char *security, char *typestr));
-static void send_req P((proto_t *p));
-static void send_ack P((proto_t *p));
-static void send_ack_repl P((pkt_t *pkt));
-static void state_machine P((proto_t *p, action_t action, pkt_t *pkt));
-static void add_bsd_security P((proto_t *p));
-static int select_til P((time_t waketime));
-static int packet_arrived P((void));
-static void handle_incoming_packet P((void));
-
-
-/* -------------- */
-
-
-static char *prnpstate(s)
-pstate_t s;
-{
- static char str[80];
-
- switch(s) {
- case S_BOGUS: return "S_BOGUS";
- case S_STARTUP: return "S_STARTUP";
- case S_SENDREQ: return "S_SENDREQ";
- case S_ACKWAIT: return "S_ACKWAIT";
- case S_REPWAIT: return "S_REPWAIT";
- case S_SUCCEEDED: return "S_SUCCEEDED";
- case S_FAILED: return "S_FAILED";
- default:
- ap_snprintf(str, sizeof(str), "<bad state %d>", s);
- return str;
- }
-}
-
-static char *prnaction(s)
-action_t s;
-{
- static char str[80];
-
- switch(s) {
- case A_BOGUS: return "A_BOGUS";
- case A_START: return "A_START";
- case A_TIMEOUT: return "A_TIMEOUT";
- case A_RCVDATA: return "A_RCVDATA";
- default:
- ap_snprintf(str, sizeof(str), "<bad action %d>", s);
- return str;
- }
-}
-
-#ifdef PROTO_DEBUG
-
-static char *prnpktype(s)
-pktype_t s;
-{
- static char str[80];
-
- switch(s) {
- case P_BOGUS: return "P_BOGUS";
- case P_REQ: return "P_REQ";
- case P_REP: return "P_REP";
- case P_ACK: return "P_ACK";
- case P_NAK: return "P_NAK";
- default:
- ap_snprintf(str, sizeof(str), "<bad pktype %d>", s);
- return str;
- }
-}
-
-#endif
-
-
-void proto_init(socket, startseq, handles)
-int socket, startseq, handles;
-{
- int i;
-
-#ifdef PROTO_DEBUG
- dbprintf(("%s: proto_init(socket %d, startseq %d, handles %d)\n",
- debug_prefix_time(": protocol"),
- socket,
- startseq,
- handles));
-#endif
- if(socket < 0 || socket >= FD_SETSIZE) {
- error("proto_init: socket %d out of range (0 .. %d)\n",
- socket, FD_SETSIZE-1);
- }
- proto_socket = socket;
- proto_global_seq = startseq;
- proto_handles = handles;
-
- proto_handle_table = alloc(proto_handles * sizeof(proto_t *));
- malloc_mark(proto_handle_table);
- proto_next_handle = proto_handle_table;
- for(i = 0; i < proto_handles; i++)
- proto_handle_table[i] = NULL;
- proto_init_time = time(0);
-}
+static const char *action2str(p_action_t);
+static const char *pstate2str(pstate_t);
-static void pending_enqueue(newp)
-proto_t *newp;
-{
- proto_t *curp;
-
- /* common case shortcut: check if adding to end of list */
+static void connect_callback(void *, security_handle_t *, security_status_t);
+static void connect_wait_callback(void *);
+static void recvpkt_callback(void *, pkt_t *, security_status_t);
- if(pending_tail && pending_tail->timeout <= newp->timeout)
- curp = NULL;
- else {
- /* scan list for insert-sort */
- curp = pending_head;
- while(curp && curp->timeout <= newp->timeout)
- curp = curp->next;
- }
+static p_action_t s_sendreq(proto_t *, p_action_t, pkt_t *);
+static p_action_t s_ackwait(proto_t *, p_action_t, pkt_t *);
+static p_action_t s_repwait(proto_t *, p_action_t, pkt_t *);
+static void state_machine(proto_t *, p_action_t, pkt_t *);
- newp->next = curp;
- if(curp) {
- newp->prev = curp->prev;
- curp->prev = newp;
- }
- else {
- newp->prev = pending_tail;
- pending_tail = newp;
- }
-
- if(newp->prev) newp->prev->next = newp;
- else pending_head = newp;
-
- pending_qlength++;
-}
+/*
+ * -------------------
+ * Interface functions
+ */
-static proto_t *pending_dequeue()
+/*
+ * Initialize globals.
+ */
+void
+protocol_init(void)
{
- proto_t *p;
-
- p = pending_head;
- if(p) {
- pending_head = p->next;
- p->next = NULL;
- if(pending_head)
- pending_head->prev = NULL;
- else
- pending_tail = NULL;
- pending_qlength--;
- }
- return p;
+ proto_init_time = time(NULL);
}
-static void pending_remove(p)
-proto_t *p;
+/*
+ * Generate a request packet, and submit it to the state machine
+ * for transmission.
+ */
+void
+protocol_sendreq(
+ const char * hostname,
+ const security_driver_t * security_driver,
+ char * (*conf_fn)(char *, void *),
+ const char * req,
+ time_t repwait,
+ protocol_sendreq_callback continuation,
+ void * datap)
{
- if(p->next) p->next->prev = p->prev;
- else pending_tail = p->prev;
-
- if(p->prev) p->prev->next = p->next;
- else pending_head = p->next;
-
- p->prev = p->next = NULL;
- pending_qlength--;
-}
-
-/* -------- */
-
-#define PTR_CHARS sizeof(proto_t *) /* chars in a pointer */
-#define CHAR_DIGITS 2 /* hex digits in a char */
-#define HANDLE_CHARS (OFS_DIGITS+1+PTR_CHARS*CHAR_DIGITS) /* "xxx-yyyyyyyy" */
-union handle_u {
- unsigned char c[PTR_CHARS];
proto_t *p;
-} hu;
-static void alloc_handle(p)
-proto_t *p;
-{
- int i;
- proto_t **hp;
-
- hp = proto_next_handle;
- for(i = 0; i < proto_handles; i++) {
- if(*hp == NULL) break;
- hp++;
- if(hp >= proto_handle_table + proto_handles)
- hp = proto_handle_table;
- }
- if(i == proto_handles)
- error("protocol out of handles");
- p->handleofs = hp-proto_handle_table;
- *hp = p;
-}
-
-static void free_handle(p)
-proto_t *p;
-{
- if(proto_handle_table[p->handleofs] == p)
- proto_handle_table[p->handleofs] = NULL;
- p->handleofs = -1;
-}
-
-static void hex(str, digits, v)
-char *str;
-int digits;
-unsigned int v;
-{
- str = str + digits - 1;
-
- while(digits--) {
- *str-- = "0123456789ABCDEF"[v % 16];
- v /= 16;
- }
-}
-
-static int unhex(str, digits)
-char *str;
-int digits;
-{
- int d, v = 0;
-
- while(*str && digits--) {
- d = *str >= 'A'? *str - 'A' + 10 : *str - '0';
- v = v * 16 + d;
- str++;
- }
- return v;
-}
-
-
-static proto_t *handle2ptr(str)
-char *str;
-{
- int ofs, i;
-
- if(strlen(str) != HANDLE_CHARS)
- return NULL;
-
- ofs = unhex(str, OFS_DIGITS);
- str += OFS_DIGITS;
- if(ofs < 0 || ofs >= proto_handles)
- return NULL;
-
- if(*str++ != '-')
- return NULL;
-
- for(i=0; i < PTR_CHARS; i++) {
- hu.c[i] = unhex(str, CHAR_DIGITS);
- str += CHAR_DIGITS;
- }
-
- if(proto_handle_table[ofs] != hu.p)
- return NULL;
-
- return hu.p;
-}
-
-
-static char *ptr2handle(p)
-proto_t *p;
-{
- int i;
- char *s;
- static char hstr[HANDLE_CHARS+1];
-
- assert(p->handleofs != -1 && proto_handle_table[p->handleofs] == p);
-
- hu.p = p;
-
- hex(hstr, OFS_DIGITS, p->handleofs);
- s = &hstr[OFS_DIGITS];
- *s++ = '-';
-
- for(i=0;i<PTR_CHARS;i++) {
- hex(s, CHAR_DIGITS, hu.c[i]);
- s += CHAR_DIGITS;
- }
- *s = '\0';
- return hstr;
-}
-
-/* -------- */
+ p = alloc(SIZEOF(proto_t));
+ p->state = s_sendreq;
+ p->hostname = stralloc(hostname);
+ p->security_driver = security_driver;
+ /* p->security_handle set in connect_callback */
+ p->repwait = repwait;
+ p->origtime = CURTIME;
+ /* p->curtime set in the sendreq state */
+ p->connecttries = getconf_int(CNF_CONNECT_TRIES);
+ p->resettries = RESET_TRIES;
+ p->reqtries = getconf_int(CNF_REQ_TRIES);
+ p->conf_fn = conf_fn;
+ pkt_init(&p->req, P_REQ, "%s", req);
+
+ /*
+ * These are here for the caller
+ * We call the continuation function after processing is complete.
+ * We pass the datap on through untouched. It is here so the caller
+ * has a way to keep state with each request.
+ */
+ p->continuation = continuation;
+ p->datap = datap;
-jmp_buf parse_failed;
-char *parse_errmsg = NULL;
+ proto_debug(1, _("protocol: security_connect: host %s -> p %p\n"),
+ hostname, p);
-static void eat_string(msg, str)
-dgram_t *msg;
-char *str;
-{
- char *saved_str, *saved_msg;
-
- /* eat leading whitespace */
- while(isspace((int)(*msg->cur))) msg->cur++;
-
- saved_msg = msg->cur;
- saved_str = str;
-
- /* eat any characters that match str */
- while(*str && *msg->cur++ == *str++);
-
- /* if we didn't eat all of str, we've failed */
- if(*str) {
- int len = strlen(saved_str);
- char *tmp = NULL;
-
- tmp = alloc(len+1);
- strncpy(tmp, saved_msg, len);
- tmp[len] = '\0';
- parse_errmsg = newvstralloc(parse_errmsg,
- "expected \"", saved_str, "\",",
- " got \"", tmp, "\"",
- NULL);
- amfree(tmp);
- longjmp(parse_failed,1);
- }
+ security_connect(p->security_driver, p->hostname, conf_fn, connect_callback,
+ p, p->datap);
}
-static int parse_integer(msg)
-dgram_t *msg;
-{
- int i = 0;
- int sign = 1;
-
- /* eat leading whitespace */
- while(isspace((int)(*msg->cur))) msg->cur++;
-
- /* handle negative values */
- if(*msg->cur == '-') {
- sign = -1;
- msg->cur++;
- }
-
- /* must have at least one digit */
- if(*msg->cur < '0' || *msg->cur > '9') {
- char non_digit[2];
-
- non_digit[0] = *msg->cur;
- non_digit[1] = '\0';
- parse_errmsg = newvstralloc(parse_errmsg,
- "expected digit, got \"", non_digit, "\"",
- NULL);
- longjmp(parse_failed,1);
- }
+/*
+ * This is a callback for security_connect. After the security layer
+ * has initiated a connection to the given host, this will be called
+ * with a security_handle_t.
+ *
+ * On error, the security_status_t arg will reflect errors which can
+ * be had via security_geterror on the handle.
+ */
+static void
+connect_callback(
+ void * cookie,
+ security_handle_t * security_handle,
+ security_status_t status)
+{
+ proto_t *p = cookie;
+
+ assert(p != NULL);
+ p->security_handle = security_handle;
+
+ proto_debug(1, _("protocol: connect_callback: p %p\n"), p);
+
+ switch (status) {
+ case S_OK:
+ state_machine(p, PA_START, NULL);
+ break;
+
+ case S_TIMEOUT:
+ security_seterror(p->security_handle, _("timeout during connect"));
+ /* FALLTHROUGH */
+
+ case S_ERROR:
+ /*
+ * For timeouts or errors, retry a few times, waiting CONNECT_WAIT
+ * seconds between each attempt. If they all fail, just return
+ * an error back to the caller.
+ */
+ if (--p->connecttries == 0) {
+ state_machine(p, PA_ABORT, NULL);
+ } else {
+ proto_debug(1, _("protocol: connect_callback: p %p: retrying %s\n"),
+ p, p->hostname);
+ security_close(p->security_handle);
+ /* XXX overload p->security handle to hold the event handle */
+ p->security_handle =
+ (security_handle_t *)event_register(CONNECT_WAIT, EV_TIME,
+ connect_wait_callback, p);
+ }
+ break;
- while(*msg->cur >= '0' && *msg->cur <= '9') {
- i = i * 10 + (*msg->cur - '0');
- msg->cur++;
+ default:
+ assert(0);
+ break;
}
- return sign * i;
}
-static char *parse_string(msg)
-dgram_t *msg;
+/*
+ * This gets called when a host has been put on a wait queue because
+ * initial connection attempts failed.
+ */
+static void
+connect_wait_callback(
+ void * cookie)
{
- char *str;
-
- /* eat leading whitespace */
- while(isspace((int)(*msg->cur))) msg->cur++;
-
- /* mark start of string */
- str = msg->cur;
-
- /* stop at whitespace (including newlines) or end-of-packet */
- while(*msg->cur && !isspace((int)(*msg->cur))) msg->cur++;
-
- /* empty fields not allowed */
- if(msg->cur == str) {
- parse_errmsg = newstralloc(parse_errmsg,
- "expected string, got empty field");
- longjmp(parse_failed,1);
- }
-
- /* mark end of string in the packet, but don't fall off the end of it */
- if(*msg->cur) *msg->cur++ = '\0';
+ proto_t *p = cookie;
- return str;
+ event_release((event_handle_t *)p->security_handle);
+ security_connect(p->security_driver, p->hostname, p->conf_fn,
+ connect_callback, p, p->datap);
}
-static char *parse_line(msg)
-dgram_t *msg;
-{
- char *str;
-
- /* eat leading whitespace */
- while(isspace((int)(*msg->cur))) msg->cur++;
-
- /* mark start of string */
- str = msg->cur;
-
- /* stop at end of line or end-of-packet */
- while(*msg->cur && *msg->cur != '\n') msg->cur++;
-
- /* empty fields not allowed */
- if(msg->cur == str) {
- parse_errmsg = newstralloc(parse_errmsg,
- "expected string, got empty field");
- longjmp(parse_failed,1);
- }
- /* mark end of string in the packet, but don't fall off the end of it */
- if(*msg->cur) *msg->cur++ = '\0';
-
- return str;
-}
-
-void parse_pkt_header(pkt)
-pkt_t *pkt;
+/*
+ * Does a one pass protocol sweep. Handles any incoming packets that
+ * are waiting to be processed, and then deals with any pending
+ * requests that have timed out.
+ *
+ * Callers should periodically call this after they have submitted
+ * requests if they plan on doing a lot of work.
+ */
+void
+protocol_check(void)
{
- dgram_t *msg;
- char *typestr;
-
- if(setjmp(parse_failed)) {
-/* dbprintf(("%s: leftover:\n----\n%s----\n\n", errmsg, msg->cur)); */
- pkt->type = P_BOGUS;
- return;
- }
-
- msg = &pkt->dgram;
-
-#ifdef PROTO_DEBUG
- dbprintf(("%s: parsing packet:\n-------\n%s-------\n\n",
- debug_prefix_time(": protocol"),
- msg->cur));
-#endif
-
- eat_string(msg, "Amanda"); pkt->version_major = parse_integer(msg);
- eat_string(msg, "."); pkt->version_minor = parse_integer(msg);
- typestr = parse_string(msg);
-
- if(strcmp(typestr, "REQ") == 0) pkt->type = P_REQ;
- else if(strcmp(typestr, "REP") == 0) pkt->type = P_REP;
- else if(strcmp(typestr, "ACK") == 0) pkt->type = P_ACK;
- else if(strcmp(typestr, "NAK") == 0) pkt->type = P_NAK;
- else pkt->type = P_BOGUS;
-
- eat_string(msg, "HANDLE"); pkt->handle = parse_string(msg);
- eat_string(msg, "SEQ"); pkt->sequence = parse_integer(msg);
-
- eat_string(msg, "");
-#define sc "SECURITY "
- if(strncmp(msg->cur, sc, sizeof(sc)-1) == 0) {
- /* got security tag */
- eat_string(msg, sc);
-#undef sc
- pkt->security = parse_line(msg);
- }
- else pkt->security = NULL;
-
- if(pkt->type == P_REQ) {
-
-#ifdef KRB4_SECURITY
- eat_string(msg, "");
- pkt->cksum = kerberos_cksum(msg->cur);
-#ifdef PROTO_DEBUG
- dbprintf(("%s: parse_pkt/cksum %ld over \'%s\'\n\n",
- debug_prefix_time(": protocol"),
- pkt->cksum,
- msg->cur));
-#endif
- fflush(stdout);
-#endif
- eat_string(msg, "SERVICE"); pkt->service = parse_string(msg);
- }
- eat_string(msg, "");
- pkt->body = msg->cur;
+ /* arg == 1 means don't block */
+ event_loop(1);
}
-static void setup_dgram(p, msg, security, typestr)
-proto_t *p;
-dgram_t *msg;
-char *security, *typestr;
-{
- char *linebuf = NULL;
- char major_str[NUM_STR_SIZE];
- char minor_str[NUM_STR_SIZE];
- char seq_str[NUM_STR_SIZE];
-
- ap_snprintf(major_str, sizeof(major_str), "%d", VERSION_MAJOR);
- ap_snprintf(minor_str, sizeof(minor_str), "%d", VERSION_MINOR);
- ap_snprintf(seq_str, sizeof(seq_str), "%d", p->curseq);
-
- dgram_zero(msg);
- dgram_socket(msg,proto_socket);
- linebuf = vstralloc("Amanda ", major_str, ".", minor_str,
- " ", typestr,
- " HANDLE ", ptr2handle(p),
- " SEQ ", seq_str,
- "\n",
- security ? security : "",
- security ? "\n" : "",
- NULL);
- dgram_cat(msg, linebuf);
- amfree(linebuf);
-}
-
-static void send_req(p)
-proto_t *p;
-{
- dgram_t outmsg;
- setup_dgram(p, &outmsg, p->security, "REQ");
- dgram_cat(&outmsg, p->req);
-
-#ifdef PROTO_DEBUG
- dbprintf(("%s: send_req: len %d: packet:\n----\n%s----\n\n",
- debug_prefix_time(": protocol"),
- outmsg.len,
- outmsg.data));
-#endif
-
- if(dgram_send_addr(p->peer, &outmsg))
- fprintf(stderr,"send req failed: %s\n", strerror(errno));
-}
-
-static void send_ack(p)
-proto_t *p;
+/*
+ * Does an infinite pass protocol sweep. This doesn't return until all
+ * requests have been satisfied or have timed out.
+ *
+ * Callers should call this after they have finished submitting requests
+ * and are just waiting for all of the answers to come back.
+ */
+void
+protocol_run(void)
{
- dgram_t outmsg;
- setup_dgram(p, &outmsg, NULL, "ACK");
-
-#ifdef PROTO_DEBUG
- dbprintf(("%s: send_ack: len %d: packet:\n----\n%s----\n\n",
- debug_prefix_time(": protocol"),
- outmsg.len,
- outmsg.data));
-#endif
-
- if(dgram_send_addr(p->peer, &outmsg))
- error("send ack failed: %s", strerror(errno));
+ /* arg == 0 means block forever until no more events are left */
+ event_loop(0);
}
-static void send_ack_repl(pkt)
-pkt_t *pkt;
-{
- dgram_t outmsg;
- char *linebuf = NULL;
- char major_str[NUM_STR_SIZE];
- char minor_str[NUM_STR_SIZE];
- char seq_str[NUM_STR_SIZE];
-
- ap_snprintf(major_str, sizeof(major_str), "%d", VERSION_MAJOR);
- ap_snprintf(minor_str, sizeof(minor_str), "%d", VERSION_MINOR);
- ap_snprintf(seq_str, sizeof(seq_str), "%d", pkt->sequence);
-
- dgram_zero(&outmsg);
- dgram_socket(&outmsg,proto_socket);
-
- linebuf = vstralloc("Amanda ", major_str, ".", minor_str,
- " ACK HANDLE ", pkt->handle,
- " SEQ ", seq_str,
- "\n", NULL);
-
- dgram_cat(&outmsg, linebuf);
- amfree(linebuf);
-
-#ifdef PROTO_DEBUG
- dbprintf(("%s: send_ack_repl: len %d: packet:\n----\n%s----\n\n",
- debug_prefix_time(": protocol"),
- outmsg.len,
- outmsg.data));
-#endif
-
- if(dgram_send_addr(pkt->peer, &outmsg))
- error("send ack failed: %s", strerror(errno));
-}
+/*
+ * ------------------
+ * Internal functions
+ */
-static void state_machine(p, action, pkt)
-proto_t *p;
-action_t action;
-pkt_t *pkt;
-{
+/*
+ * The guts of the protocol. This handles the many paths a request can
+ * make, including retrying the request and acknowledgements, and dealing
+ * with timeouts and successfull replies.
+ */
+static void
+state_machine(
+ proto_t * p,
+ p_action_t action,
+ pkt_t * pkt)
+{
+ pstate_t curstate;
+ p_action_t retaction;
+
+ proto_debug(1, _("protocol: state_machine: initial: p %p action %s pkt %p\n"),
+ p, action2str(action), (void *)NULL);
+
+ assert(p != NULL);
+ assert(action == PA_RCVDATA || pkt == NULL);
+ assert(p->state != NULL);
+
+ for (;;) {
+ proto_debug(1, _("protocol: state_machine: p %p state %s action %s\n"),
+ p, pstate2str(p->state), action2str(action));
+ if (pkt != NULL) {
+ proto_debug(1, _("protocol: pkt: %s (t %d) orig REQ (t %d cur %d)\n"),
+ pkt_type2str(pkt->type), (int)CURTIME,
+ (int)p->origtime, (int)p->curtime);
+ proto_debug(1, _("protocol: pkt contents:\n-----\n%s-----\n"),
+ pkt->body);
+ }
-#ifdef PROTO_DEBUG
- dbprintf(("%s: state_machine: p %X state %s action %s%s%s\n",
- debug_prefix_time(": protocol"),
- (int)p,
- prnpstate(p->state),
- prnaction(action),
- pkt == NULL? "" : " pktype ",
- pkt == NULL? "" : prnpktype(pkt->type)));
-#endif
-
- while(1) {
- p->prevstate = p->state;
- switch(p->state) {
- case S_STARTUP:
- if(action != A_START) goto badaction;
- p->origseq = p->curseq = proto_global_seq++;
- p->reqtries = REQ_TRIES;
- p->state = S_SENDREQ;
- p->acktries = ACK_TRIES;
- alloc_handle(p);
- break;
-
- case S_SENDREQ:
- send_req(p);
- p->curtime = CURTIME;
- if(p->curseq == p->origseq) p->origtime = p->curtime;
- p->timeout = time(0) + ACK_WAIT;
- p->state = S_ACKWAIT;
- pending_enqueue(p);
- return;
+ /*
+ * p->state is a function pointer to the current state a request
+ * is in.
+ *
+ * We keep track of the last state we were in so we can make
+ * sure states which return PA_CONTINUE really have transitioned
+ * the request to a new state.
+ */
+ curstate = p->state;
+
+ if (action == PA_ABORT)
+ /*
+ * If the passed action indicates a terminal error, then we
+ * need to move to abort right away.
+ */
+ retaction = PA_ABORT;
+ else
+ /*
+ * Else we run the state and perform the action it
+ * requests.
+ */
+ retaction = (*curstate)(p, action, pkt);
+
+ proto_debug(1, _("protocol: state_machine: p %p state %s returned %s\n"),
+ p, pstate2str(p->state), action2str(retaction));
+
+ /*
+ * The state function is expected to return one of the following
+ * p_action_t's.
+ */
+ switch (retaction) {
+
+ /*
+ * Request is still waiting for more data off of the network.
+ * Setup to receive another pkt, and wait for the recv event
+ * to occur.
+ */
+ case PA_CONTPEND:
+ (*p->continuation)(p->datap, pkt, p->security_handle);
+ /* FALLTHROUGH */
+
+ case PA_PENDING:
+ proto_debug(1, _("protocol: state_machine: p %p state %s: timeout %d\n"),
+ p, pstate2str(p->state), (int)p->timeout);
+ /*
+ * Get the security layer to register a receive event for this
+ * security handle on our behalf. Have it timeout in p->timeout
+ * seconds.
+ */
+ security_recvpkt(p->security_handle, recvpkt_callback, p,
+ (int)p->timeout);
- case S_ACKWAIT:
- if(action == A_TIMEOUT) {
- if(--p->acktries == 0) {
- p->state = S_FAILED;
- free_handle(p);
- p->continuation(p, NULL);
- amfree(p->req);
- amfree(p->security);
- amfree(p);
- return;
- }
- else {
- p->state = S_SENDREQ;
- break;
- }
- }
- else if(action != A_RCVDATA)
- goto badaction;
-
- /* got the packet with the right handle, now check it */
-
-#ifdef PROTO_DEBUG
- dbprintf((
- "%s: RESPTIME p %X pkt %s (t %d s %d) orig (t %d s %d) cur (t %d s %d)\n",
- debug_prefix_time(": protocol"),
- (int)p, prnpktype(pkt->type),
- (int)CURTIME, relseq(pkt->sequence),
- (int)p->origtime, relseq(p->origseq),
- (int)p->curtime, relseq(p->curseq)));
-#endif
-
- if(pkt->type == P_ACK) {
- if(pkt->sequence != p->origseq)
- p->reqtries--;
- p->state = S_REPWAIT;
- p->timeout = time(0) + p->repwait;
- pending_enqueue(p);
- return;
- }
- else if(pkt->type == P_NAK) {
- p->state = S_FAILED;
- free_handle(p);
- p->continuation(p, pkt);
- amfree(p->req);
- amfree(p->security);
- amfree(p);
- return;
- }
- else if(pkt->type == P_REP) {
- /* no ack, just rep */
- p->state = S_REPWAIT;
- break;
- }
- /* else unexpected packet, put back on queue */
- pending_enqueue(p);
return;
- case S_REPWAIT:
- if(action == A_TIMEOUT) {
- if(p->reqtries == 0 ||
- (CURTIME - p->origtime > DROP_DEAD_TIME)) {
- p->state = S_FAILED;
- free_handle(p);
- p->continuation(p, NULL);
- amfree(p->req);
- amfree(p->security);
- amfree(p);
- return;
- }
- else {
- p->reqtries--;
- p->state = S_SENDREQ;
- p->acktries = ACK_TRIES;
- break;
- }
- }
- else if(action != A_RCVDATA)
- goto badaction;
- /* got the packet with the right handle, now check it */
- if(pkt->type != P_REP) {
- pending_enqueue(p);
- return;
- }
- send_ack(p);
- p->state = S_SUCCEEDED;
- free_handle(p);
- p->continuation(p, pkt);
- amfree(p->req);
- amfree(p->security);
+ /*
+ * Request has moved to another state. Loop and run it again.
+ */
+ case PA_CONTINUE:
+ assert(p->state != curstate);
+ proto_debug(1, _("protocol: state_machine: p %p: moved from %s to %s\n"),
+ p, pstate2str(curstate),
+ pstate2str(p->state));
+ continue;
+
+ /*
+ * Request has failed in some way locally. The security_handle will
+ * contain an appropriate error message via security_geterror(). Set
+ * pkt to NULL to indicate failure to the callback, and then
+ * fall through to the common finish code.
+ *
+ * Note that remote failures finish via PA_FINISH, because they did
+ * complete successfully locally.
+ */
+ case PA_ABORT:
+ pkt = NULL;
+ /* FALLTHROUGH */
+
+ /*
+ * Request has completed successfully.
+ * Free up resources the request has used, call the continuation
+ * function specified by the caller and quit.
+ */
+ case PA_FINISH:
+ (*p->continuation)(p->datap, pkt, p->security_handle);
+ security_close(p->security_handle);
+ amfree(p->hostname);
+ amfree(p->req.body);
amfree(p);
return;
default:
- badaction:
- error("protocol error: no handler for state %s action %s\n",
- prnpstate(p->state), prnaction(action));
+ assert(0);
+ break; /* in case asserts are turned off */
}
+ /*NOTREACHED*/
}
+ /*NOTREACHED*/
}
-static void add_bsd_security(p)
-proto_t *p;
-{
- p->security = get_bsd_security();
-}
-
-int make_request(hostname, port, req, datap, repwait, continuation)
-char *hostname;
-int port;
-char *req;
-void *datap;
-time_t repwait;
-void (*continuation) P((proto_t *p, pkt_t *pkt));
-{
- proto_t *p;
- struct hostent *hp;
-
-
- p = alloc(sizeof(proto_t));
- p->state = S_STARTUP;
- p->prevstate = S_STARTUP;
- p->continuation = continuation;
- p->req = req;
- p->repwait = repwait;
- p->datap = datap;
-
-#ifdef PROTO_DEBUG
- dbprintf(("%s: make_request: host %s -> p %X\n",
- debug_prefix_time(": protocol"),
- hostname,
- (int)p));
-#endif
-
- if((hp = gethostbyname(hostname)) == 0) return -1;
- memcpy(&p->peer.sin_addr, hp->h_addr, hp->h_length);
- p->peer.sin_family = AF_INET;
- p->peer.sin_port = htons(port);
-
- add_bsd_security(p);
-
- state_machine(p, A_START, NULL);
- return 0;
-}
-
-#ifdef KRB4_SECURITY
-
-static int add_krb_security P((proto_t *p, char *host_inst, char *realm));
-
-static int add_krb_security(p, host_inst, realm)
-proto_t *p;
-char *host_inst, *realm;
-{
- p->security = get_krb_security(p->req, host_inst, realm, &p->auth_cksum);
-
-#ifdef PROTO_DEBUG
- dbprintf(("%s: add_krb_security() cksum: %lu: \'%s\'\n",
- debug_prefix_time(": protocol"),
- p->auth_cksum,
- p->req));
-#endif
-
- return p->security == NULL;
-}
-
-int make_krb_request(hostname, port, req, datap, repwait, continuation)
-char *hostname;
-int port;
-char *req;
-void *datap;
-time_t repwait;
-void (*continuation) P((proto_t *p, pkt_t *pkt));
-{
- proto_t *p;
- struct hostent *hp;
- char inst[256], realm[256];
- int rc;
-
- p = alloc(sizeof(proto_t));
- p->state = S_STARTUP;
- p->prevstate = S_STARTUP;
- p->continuation = continuation;
- p->req = req;
- p->repwait = repwait;
- p->datap = datap;
-
-#ifdef PROTO_DEBUG
- dbprintf(("%s: make_krb_request: host %s -> p %X\n",
- debug_prefix_time(": protocol"),
- hostname,
- req, (int)p));
-#endif
-
- if((hp = host2krbname(hostname, inst, realm)) == 0)
- return -1;
- memcpy(&p->peer.sin_addr, hp->h_addr, hp->h_length);
- p->peer.sin_family = AF_INET;
- p->peer.sin_port = htons(port);
+/*
+ * The request send state. Here, the packet is actually transmitted
+ * across the network. After setting up timeouts, the request
+ * moves to the acknowledgement wait state. We return from the state
+ * machine at this point, and let the request be received from the network.
+ */
+static p_action_t
+s_sendreq(
+ proto_t * p,
+ p_action_t action,
+ pkt_t * pkt)
+{
+
+ assert(p != NULL);
+ (void)action; /* Quiet unused parameter warning */
+ (void)pkt; /* Quiet unused parameter warning */
+
+ if (security_sendpkt(p->security_handle, &p->req) < 0) {
+ /* XXX should retry */
+ security_seterror(p->security_handle, _("error sending REQ: %s"),
+ security_geterror(p->security_handle));
+ return (PA_ABORT);
+ }
- if((rc = add_krb_security(p, inst, realm)))
- return rc;
+ /*
+ * Remember when this request was first sent
+ */
+ p->curtime = CURTIME;
- state_machine(p, A_START, NULL);
- return 0;
+ /*
+ * Move to the ackwait state
+ */
+ p->state = s_ackwait;
+ p->timeout = ACK_WAIT;
+ return (PA_PENDING);
}
-#endif
+/*
+ * The acknowledge wait state. We can enter here two ways:
+ *
+ * - the caller has received a packet, located the request for
+ * that packet, and called us with an action of PA_RCVDATA.
+ *
+ * - the caller has determined that a request has timed out,
+ * and has called us with PA_TIMEOUT.
+ *
+ * Here we process the acknowledgment, which usually means that
+ * the client has agreed to our request and is working on it.
+ * It will later send a reply when finished.
+ */
+static p_action_t
+s_ackwait(
+ proto_t * p,
+ p_action_t action,
+ pkt_t * pkt)
+{
+
+ assert(p != NULL);
+
+ /*
+ * The timeout case. If our retry count has gone to zero
+ * fail this request. Otherwise, move to the send state
+ * to retry the request.
+ */
+ if (action == PA_TIMEOUT) {
+ assert(pkt == NULL);
+
+ if (--p->reqtries == 0) {
+ security_seterror(p->security_handle, _("timeout waiting for ACK"));
+ return (PA_ABORT);
+ }
-static int select_til(waketime)
-time_t waketime;
-{
- fd_set ready;
- struct timeval to;
- time_t waittime;
- int rc;
-
- waittime = waketime - time(0);
- if(waittime < 0) waittime = 0; /* just poll */
-
- FD_ZERO(&ready);
- FD_SET(proto_socket, &ready);
- to.tv_sec = waittime;
- to.tv_usec = 0;
-
- rc = select(proto_socket+1, (SELECT_ARG_TYPE *)&ready, NULL, NULL, &to);
- if(rc == -1) {
- error("protocol socket select: %s", strerror(errno));
+ p->state = s_sendreq;
+ return (PA_CONTINUE);
}
- return rc;
-}
-static int packet_arrived()
-{
- return select_til(0);
+ assert(action == PA_RCVDATA);
+ assert(pkt != NULL);
+
+ /*
+ * The packet-received state. Determine what kind of
+ * packet we received, and act based on the reply type.
+ */
+ switch (pkt->type) {
+
+ /*
+ * Received an ACK. Everything's good. The client is
+ * now working on the request. We queue up again and
+ * wait for the reply.
+ */
+ case P_ACK:
+ p->state = s_repwait;
+ p->timeout = p->repwait;
+ return (PA_PENDING);
+
+ /*
+ * Received a NAK. The request failed, so free up the
+ * resources associated with it and return.
+ *
+ * This should NOT return PA_ABORT because it is not a local failure.
+ */
+ case P_NAK:
+ return (PA_FINISH);
+
+ /*
+ * The client skipped the ACK, and replied right away.
+ * Move to the reply state to handle it.
+ */
+ case P_REP:
+ case P_PREP:
+ p->state = s_repwait;
+ return (PA_CONTINUE);
+
+ /*
+ * Unexpected packet. Requeue this request and hope
+ * we get what we want later.
+ */
+ default:
+ return (PA_PENDING);
+ }
}
-static void handle_incoming_packet()
-{
- pkt_t inpkt;
- proto_t *p;
-
- dgram_zero(&inpkt.dgram);
- dgram_socket(&inpkt.dgram, proto_socket);
- if(dgram_recv(&inpkt.dgram, 0, &inpkt.peer) == -1) {
-#ifdef ECONNREFUSED
- if(errno == ECONNREFUSED)
- return;
-#endif
-#ifdef EAGAIN
- if(errno == EAGAIN)
- return;
-#endif
+/*
+ * The reply wait state. We enter here much like we do with s_ackwait.
+ */
+static p_action_t
+s_repwait(
+ proto_t * p,
+ p_action_t action,
+ pkt_t * pkt)
+{
+ pkt_t ack;
+
+ /*
+ * Timeout waiting for a reply.
+ */
+ if (action == PA_TIMEOUT) {
+ assert(pkt == NULL);
+
+ /*
+ * If we've blown our timeout limit, free up this packet and
+ * return.
+ */
+ if (p->resettries == 0 || DROP_DEAD_TIME(p->origtime)) {
+ security_seterror(p->security_handle, _("timeout waiting for REP"));
+ return (PA_ABORT);
+ }
- fprintf(stderr,"protocol packet receive: %s\n", strerror(errno));
+ /*
+ * We still have some tries left. Resend the request.
+ */
+ p->resettries--;
+ p->state = s_sendreq;
+ p->reqtries = getconf_int(CNF_REQ_TRIES);
+ return (PA_CONTINUE);
}
-#ifdef PROTO_DEBUG
- dbprintf(("%s: got packet:\n----\n%s----\n\n",
- debug_prefix_time(": protocol"),
- inpkt.dgram.data));
-#endif
-
- parse_pkt_header(&inpkt);
- if(inpkt.type == P_BOGUS)
- return;
- if((p = handle2ptr(inpkt.handle)) == NULL) {
- /* ack succeeded reps */
- if(inpkt.type == P_REP)
- send_ack_repl(&inpkt);
- return;
+ assert(action == PA_RCVDATA);
+
+ /* Finish if we get a NAK */
+ if (pkt->type == P_NAK)
+ return (PA_FINISH);
+
+ /*
+ * We've received some data. If we didn't get a reply,
+ * requeue the packet and retry. Otherwise, acknowledge
+ * the reply, cleanup this packet, and return.
+ */
+ if (pkt->type != P_REP && pkt->type != P_PREP)
+ return (PA_PENDING);
+
+ if(pkt->type == P_REP) {
+ pkt_init_empty(&ack, P_ACK);
+ if (security_sendpkt(p->security_handle, &ack) < 0) {
+ /* XXX should retry */
+ amfree(ack.body);
+ security_seterror(p->security_handle, _("error sending ACK: %s"),
+ security_geterror(p->security_handle));
+ return (PA_ABORT);
+ }
+ amfree(ack.body);
+ return (PA_FINISH);
+ }
+ else if(pkt->type == P_PREP) {
+ p->timeout = p->repwait - CURTIME + p->curtime + 1;
+ if (p->timeout <= 0)
+ p->timeout = 1;
+ return (PA_CONTPEND);
}
-#ifdef PROTO_DEBUG
- dbprintf(("%s: handle %s p %X got packet type %s\n",
- debug_prefix_time(": protocol"),
- inpkt.handle,
- (int)p,
- prnpktype(inpkt.type)));
-#endif
-
- pending_remove(p);
- state_machine(p, A_RCVDATA, &inpkt);
+ /* should never go here, shut up compiler warning */
+ return (PA_FINISH);
}
+/*
+ * event callback that receives a packet
+ */
+static void
+recvpkt_callback(
+ void * cookie,
+ pkt_t * pkt,
+ security_status_t status)
+{
+ proto_t *p = cookie;
+
+ assert(p != NULL);
+
+ switch (status) {
+ case S_OK:
+ state_machine(p, PA_RCVDATA, pkt);
+ break;
+ case S_TIMEOUT:
+ state_machine(p, PA_TIMEOUT, NULL);
+ break;
+ case S_ERROR:
+ state_machine(p, PA_ABORT, NULL);
+ break;
+ default:
+ assert(0);
+ break;
+ }
+}
+/*
+ * --------------
+ * Misc functions
+ */
-void check_protocol()
-{
- time_t curtime;
- proto_t *p;
-
- while(packet_arrived())
- handle_incoming_packet();
+/*
+ * Convert a pstate_t into a printable form.
+ */
+static const char *
+pstate2str(
+ pstate_t pstate)
+{
+ static const struct {
+ pstate_t type;
+ const char name[12];
+ } pstates[] = {
+#define X(s) { s, stringize(s) }
+ X(s_sendreq),
+ X(s_ackwait),
+ X(s_repwait),
+#undef X
+ };
+ int i;
- curtime = time(0);
- while(pending_head && curtime >= pending_head->timeout) {
- p = pending_dequeue();
- state_machine(p, A_TIMEOUT, NULL);
- }
+ for (i = 0; i < ASIZE(pstates); i++)
+ if (pstate == pstates[i].type)
+ return (pstates[i].name);
+ return (_("BOGUS PSTATE"));
}
+/*
+ * Convert an p_action_t into a printable form
+ */
+static const char *
+action2str(
+ p_action_t action)
+{
+ static const struct {
+ p_action_t type;
+ const char name[12];
+ } actions[] = {
+#define X(s) { s, stringize(s) }
+ X(PA_START),
+ X(PA_TIMEOUT),
+ X(PA_ERROR),
+ X(PA_RCVDATA),
+ X(PA_CONTPEND),
+ X(PA_PENDING),
+ X(PA_CONTINUE),
+ X(PA_FINISH),
+ X(PA_ABORT),
+#undef X
+ };
+ int i;
-void run_protocol()
-{
- time_t wakeup_time;
- proto_t *p;
-
- while(pending_head) {
- wakeup_time = pending_head->timeout;
-
-#ifdef PROTO_DEBUG
- dbprintf(("%s: run_protocol: waiting %d secs for %d pending reqs\n",
- debug_prefix_time(": protocol"),
- (int)(wakeup_time - time(0)),
- pending_qlength));
-#endif
-
- if(select_til(wakeup_time))
- handle_incoming_packet();
- else {
- p = pending_dequeue();
- state_machine(p, A_TIMEOUT, NULL);
- }
- }
+ for (i = 0; i < ASIZE(actions); i++)
+ if (action == actions[i].type)
+ return (actions[i].name);
+ return (_("BOGUS ACTION"));
}