Imported Upstream version 3.3.3
[debian/amanda] / common-src / ipc-binary.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU General Public License
7  * as published by the Free Software Foundation; either version 2
8  * of the License, or (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * for more details.
14  *
15  * You should have received a copy of the GNU General Public License along
16  * with this program; if not, write to the Free Software Foundation, Inc.,
17  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
18  *
19  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
21  */
22
23 #include "amanda.h"
24 #include "ipc-binary.h"
25
26 struct ipc_binary_proto_t {
27     guint16 magic;
28     guint16 n_cmds;
29     ipc_binary_cmd_t *cmds;
30 };
31
32 /* extra flag to indicate that an argument exists */
33 #define IPC_BINARY_EXISTS (1 << 7)
34
35 struct ipc_binary_cmd_t {
36     gboolean exists;
37     guint8 *arg_flags;
38     guint16 n_args;
39 };
40
41 #define MSG_HDR_LEN 10
42 #define ARG_HDR_LEN 6
43
44 /*
45  * Utilities
46  */
47
48 static void
49 expand_buffer(
50     ipc_binary_buf_t *buf,
51     gsize size)
52 {
53     gsize new_len = buf->length + size;
54
55     /* allocate space in the buffer if necessary */
56     if (buf->offset + new_len > buf->size) {
57         if (buf->offset != 0 && new_len <= buf->size) {
58             g_memmove(buf->buf,
59                       buf->buf + buf->offset,
60                       buf->length);
61             buf->offset = 0;
62         } else {
63             buf->size = buf->offset + new_len;
64             buf->buf = g_realloc(buf->buf, buf->size);
65         }
66     }
67 }
68
69 static void
70 add_to_buffer(
71     ipc_binary_buf_t *buf,
72     gsize size,
73     gpointer data)
74 {
75     expand_buffer(buf, size);
76
77     g_memmove(buf->buf + buf->offset + buf->length, data, size);
78     buf->length += size;
79 }
80
81 static void
82 consume_from_buffer(
83     ipc_binary_buf_t *buf,
84     gsize size)
85 {
86     g_assert(size <= buf->length);
87
88     buf->length -= size;
89     if (buf->length == 0)
90         buf->offset = 0;
91     else
92         buf->offset += size;
93 }
94
95 static gboolean
96 all_args_present(
97     ipc_binary_message_t *msg)
98 {
99     int i;
100
101     for (i = 0; i < msg->cmd->n_args; i++) {
102         if (msg->args[i].data == NULL
103                 && (msg->cmd->arg_flags[i] & IPC_BINARY_EXISTS)
104                 && !(msg->cmd->arg_flags[i] & IPC_BINARY_OPTIONAL)) {
105             g_debug("ipc-binary message missing mandatory arg %d", i);
106             return FALSE;
107         }
108     }
109
110     return TRUE;
111 }
112
113 /*
114  * Creating a protocol
115  */
116
117 ipc_binary_proto_t *
118 ipc_binary_proto_new(
119     guint16 magic)
120 {
121     ipc_binary_proto_t *prot = g_new(ipc_binary_proto_t, 1);
122
123     prot->magic = magic;
124     prot->n_cmds = 0;
125     prot->cmds = NULL;
126
127     return prot;
128 }
129
130 ipc_binary_cmd_t *
131 ipc_binary_proto_add_cmd(
132     ipc_binary_proto_t *proto,
133     guint16 id)
134 {
135     g_assert(proto != NULL);
136     g_assert(id != 0);
137
138     if (id >= proto->n_cmds) {
139         guint16 new_len = id+1;
140         int i;
141
142         proto->cmds = g_renew(ipc_binary_cmd_t, proto->cmds, new_len);
143         for (i = proto->n_cmds; i < new_len; i++) {
144             proto->cmds[i].n_args = 0;
145             proto->cmds[i].exists = FALSE;
146             proto->cmds[i].arg_flags = NULL;
147         }
148         proto->n_cmds = new_len;
149     }
150
151     /* make sure this command hasn't been defined already */
152     g_assert(!proto->cmds[id].exists);
153     proto->cmds[id].exists = TRUE;
154
155     return &proto->cmds[id];
156 }
157
158 void
159 ipc_binary_cmd_add_arg(
160     ipc_binary_cmd_t *cmd,
161     guint16 id,
162     guint8 flags)
163 {
164     g_assert(cmd != NULL);
165     g_assert(id != 0);
166     flags |= IPC_BINARY_EXISTS;
167
168     if (id >= cmd->n_args) {
169         guint16 new_len = id+1;
170         int i;
171
172         cmd->arg_flags = g_realloc(cmd->arg_flags, new_len);
173         for (i = cmd->n_args; i < new_len; i++) {
174             cmd->arg_flags[i] = 0;
175         }
176         cmd->n_args = new_len;
177     }
178
179     /* make sure this arg hasn't been defined already */
180     g_assert(cmd->arg_flags[id] == 0);
181
182     cmd->arg_flags[id] = flags;
183 }
184
185 /*
186  * Using a protocol
187  */
188
189 ipc_binary_channel_t *
190 ipc_binary_new_channel(
191     ipc_binary_proto_t *proto)
192 {
193     ipc_binary_channel_t *chan;
194
195     chan = g_new0(ipc_binary_channel_t, 1);
196     chan->proto = proto;
197
198     return chan;
199 }
200
201 void
202 ipc_binary_free_channel(
203     ipc_binary_channel_t *chan)
204 {
205     if (chan->in.buf)
206         g_free(chan->in.buf);
207
208     if (chan->out.buf)
209         g_free(chan->out.buf);
210
211     g_free(chan);
212 }
213
214 ipc_binary_message_t *
215 ipc_binary_new_message(
216     ipc_binary_channel_t *chan,
217     guint16 cmd_id)
218 {
219     ipc_binary_message_t *msg = g_new0(ipc_binary_message_t, 1);
220     ipc_binary_cmd_t *cmd;
221
222     /* make sure this is a valid command */
223     g_assert(chan != NULL);
224     g_assert(cmd_id > 0 && cmd_id < chan->proto->n_cmds);
225     g_assert(chan->proto->cmds[cmd_id].exists);
226     cmd = &chan->proto->cmds[cmd_id];
227
228     msg->chan = chan;
229     msg->cmd = cmd;
230     msg->cmd_id = cmd_id;
231     msg->n_args = cmd->n_args;
232     msg->args = g_malloc0(sizeof(*(msg->args)) * cmd->n_args);
233
234     return msg;
235 }
236
237 void
238 ipc_binary_add_arg(
239     ipc_binary_message_t *msg,
240     guint16 arg_id,
241     gsize size,
242     gpointer data,
243     gboolean take_memory)
244 {
245     /* make sure this arg has not already been set for this message */
246     g_assert(msg != NULL);
247     g_assert(data != NULL);
248     g_assert(arg_id > 0 && arg_id < msg->cmd->n_args);
249     g_assert(msg->cmd->arg_flags[arg_id] & IPC_BINARY_EXISTS);
250     g_assert(msg->args[arg_id].data == NULL);
251
252     if (size == 0 && msg->cmd->arg_flags[arg_id] & IPC_BINARY_STRING) {
253         size = strlen((gchar *)data);
254     }
255
256     if (!take_memory) {
257         data = g_memdup(data, size);
258     }
259
260     msg->args[arg_id].len = size;
261     msg->args[arg_id].data = data;
262 }
263
264 void
265 ipc_binary_free_message(
266     ipc_binary_message_t *msg)
267 {
268     int i;
269
270     g_assert(msg != NULL);
271
272     for (i = 0; i < msg->cmd->n_args; i++) {
273         gpointer data = msg->args[i].data;
274         if (data)
275             g_free(data);
276     }
277
278     g_free(msg->args);
279     g_free(msg);
280 }
281
282 ipc_binary_message_t *
283 ipc_binary_read_message(
284     ipc_binary_channel_t *chan,
285     int fd)
286 {
287     ipc_binary_message_t *msg;
288
289     /* read data until we have a whole packet or until EOF */
290     while (!(msg = ipc_binary_poll_message(chan))) {
291         gssize bytes;
292
293         if (errno)
294             return NULL;
295
296         /* read directly into the buffer, instead of using ipc_binary_feed_data */
297         expand_buffer(&chan->in, 32768);
298         bytes = read(fd, chan->in.buf + chan->in.offset + chan->in.length, 32768);
299         if (bytes < 0) {
300             /* error on read */
301             return NULL;
302         } else if (!bytes) {
303             /* got EOF; if there are bytes left over, this is EIO */
304             if (chan->in.length) {
305                 g_warning("got EOF reading ipc-binary channel with %zd bytes un-processed",
306                           chan->in.length);
307                 errno = EIO;
308             }
309
310             return NULL;
311         } else {
312             /* add the data to the buffer */
313             chan->in.length += bytes;
314         }
315     }
316
317     return msg;
318 }
319
320 int
321 ipc_binary_write_message(
322     ipc_binary_channel_t *chan,
323     int fd,
324     ipc_binary_message_t *msg)
325 {
326     gsize written;
327
328     /* add the message to the queue */
329     ipc_binary_queue_message(chan, msg);
330
331     /* and write the outgoing buffer */
332     written = full_write(fd, chan->out.buf + chan->out.offset, chan->out.length);
333     consume_from_buffer(&chan->out, written);
334
335     if (written < chan->out.length) {
336         return -1;
337     }
338
339     return 0;
340 }
341
342 void
343 ipc_binary_feed_data(
344     ipc_binary_channel_t *chan,
345     gsize size,
346     gpointer data)
347 {
348     add_to_buffer(&chan->in, size, data);
349 }
350
351 void
352 ipc_binary_data_transmitted(
353     ipc_binary_channel_t *chan,
354     gsize size)
355 {
356     consume_from_buffer(&chan->out, size);
357 }
358
359 static guint16
360 get_guint16(guint8 **p) {
361     guint16 v = 0;
362     v = *((*p)++);
363     v = *((*p)++) | v << 8;
364     return v;
365 }
366
367 static guint32
368 get_guint32(guint8 **p) {
369     guint32 v = 0;
370     v = *((*p)++);
371     v = *((*p)++) | v << 8;
372     v = *((*p)++) | v << 8;
373     v = *((*p)++) | v << 8;
374     return v;
375 }
376
377
378 ipc_binary_message_t *
379 ipc_binary_poll_message(
380     ipc_binary_channel_t *chan)
381 {
382     guint8 *p;
383     ipc_binary_message_t *msg;
384     guint16 magic;
385     guint16 cmd_id;
386     guint32 length;
387     guint16 n_args;
388
389     if (chan->in.length < MSG_HDR_LEN) {
390         errno = 0;
391         return NULL;
392     }
393
394     /* read out the pocket header, using shifts to avoid endian and alignment
395      * problems, and checking each one as we proceed */
396     p = (guint8 *)(chan->in.buf + chan->in.offset);
397
398     magic = get_guint16(&p);
399
400     if (magic != chan->proto->magic) {
401         g_debug("ipc-binary got invalid magic 0x%04x", (int)magic);
402         errno = EINVAL;
403         return NULL;
404     }
405
406     cmd_id = get_guint16(&p);
407
408     /* make sure this is a valid command */
409     if (cmd_id <= 0 || cmd_id >= chan->proto->n_cmds
410                             || !chan->proto->cmds[cmd_id].exists) {
411         errno = EINVAL;
412         return NULL;
413     }
414
415     length = get_guint32(&p);
416
417     /* see if there's enough data in this buffer for a whole message */
418     if (length > chan->in.length) {
419         errno = 0;
420         return NULL; /* whole packet isn't here yet */
421     }
422
423     n_args = get_guint16(&p);
424
425     /* looks legit -- start building a message */
426     msg = ipc_binary_new_message(chan, cmd_id);
427
428     /* get each of the arguments */
429     while (n_args--) {
430         guint16 arg_id;
431         guint32 arglen;
432
433         arglen = get_guint32(&p);
434         arg_id = get_guint16(&p);
435
436         if (arg_id <= 0 || arg_id >= msg->cmd->n_args
437                 || !(msg->cmd->arg_flags[arg_id] & IPC_BINARY_EXISTS)
438                 || msg->args[arg_id].data != NULL) {
439             g_debug("ipc-binary invalid or duplicate arg");
440             errno = EINVAL;
441             ipc_binary_free_message(msg);
442             return NULL;
443         }
444
445         /* properly terminate string args, but do not include the nul byte in
446          * the arglen */
447         if (msg->cmd->arg_flags[arg_id] & IPC_BINARY_STRING) {
448             gchar *data;
449
450             /* copy and terminate the string */
451             data = g_malloc(arglen+1);
452             g_memmove(data, p, arglen);
453             data[arglen] = '\0';
454             msg->args[arg_id].data = (gpointer)data;
455             msg->args[arg_id].len = arglen;
456         } else {
457             msg->args[arg_id].data = g_memdup(p, arglen);
458             msg->args[arg_id].len = arglen;
459         }
460
461         p += arglen;
462     }
463
464     /* check that all mandatory args are here */
465     if (!all_args_present(msg)) {
466         errno = EINVAL;
467         ipc_binary_free_message(msg);
468         return NULL;
469     }
470
471     consume_from_buffer(&chan->in, length);
472
473     return msg;
474 }
475
476 static guint8 *
477 put_guint16(guint8 *p, guint16 v)
478 {
479     *(p++) = v >> 8;
480     *(p++) = v;
481     return p;
482 }
483
484 static guint8 *
485 put_guint32(guint8 *p, guint32 v)
486 {
487     *(p++) = v >> 24;
488     *(p++) = v >> 16;
489     *(p++) = v >> 8;
490     *(p++) = v;
491     return p;
492 }
493
494 void
495 ipc_binary_queue_message(
496     ipc_binary_channel_t *chan G_GNUC_UNUSED,
497     ipc_binary_message_t *msg G_GNUC_UNUSED)
498 {
499     gsize msg_len;
500     guint8 *p;
501     int i;
502     guint16 n_args = 0;
503
504     g_assert(all_args_present(msg));
505
506     /* calculate the length and make enough room in the buffer */
507     msg_len = MSG_HDR_LEN;
508     for (i = 0; i < msg->cmd->n_args; i++) {
509         if (msg->args[i].data) {
510             n_args++;
511             msg_len += msg->args[i].len + ARG_HDR_LEN;
512         }
513     }
514     expand_buffer(&chan->out, msg_len);
515     p = (guint8 *)(chan->out.buf + chan->out.offset);
516
517     /* write the packet */
518     p = put_guint16(p, chan->proto->magic);
519     p = put_guint16(p, msg->cmd_id);
520     p = put_guint32(p, msg_len);
521     p = put_guint16(p, n_args);
522
523     for (i = 0; i < msg->cmd->n_args; i++) {
524         if (!msg->args[i].data)
525             continue;
526
527         p = put_guint32(p, msg->args[i].len);
528         p = put_guint16(p, i);
529
530         g_memmove(p, msg->args[i].data, msg->args[i].len);
531         p += msg->args[i].len;
532     }
533     chan->out.length += msg_len;
534
535     ipc_binary_free_message(msg);
536 }