Imported Upstream version 3.3.2
[debian/amanda] / common-src / ipc-binary.c
1 /*
2  * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3  * Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
4  *
5  * This program is free software; you can redistribute it and/or modify it
6  * under the terms of the GNU General Public License version 2 as published
7  * by the Free Software Foundation.
8  *
9  * This program is distributed in the hope that it will be useful, but
10  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
11  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
12  * for more details.
13  *
14  * You should have received a copy of the GNU General Public License along
15  * with this program; if not, write to the Free Software Foundation, Inc.,
16  * 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
17  *
18  * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
19  * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20  */
21
22 #include "amanda.h"
23 #include "ipc-binary.h"
24
25 struct ipc_binary_proto_t {
26     guint16 magic;
27     guint16 n_cmds;
28     ipc_binary_cmd_t *cmds;
29 };
30
31 /* extra flag to indicate that an argument exists */
32 #define IPC_BINARY_EXISTS (1 << 7)
33
34 struct ipc_binary_cmd_t {
35     gboolean exists;
36     guint8 *arg_flags;
37     guint16 n_args;
38 };
39
40 #define MSG_HDR_LEN 10
41 #define ARG_HDR_LEN 6
42
43 /*
44  * Utilities
45  */
46
47 static void
48 expand_buffer(
49     ipc_binary_buf_t *buf,
50     gsize size)
51 {
52     gsize new_len = buf->length + size;
53
54     /* allocate space in the buffer if necessary */
55     if (buf->offset + new_len > buf->size) {
56         if (buf->offset != 0 && new_len <= buf->size) {
57             g_memmove(buf->buf,
58                       buf->buf + buf->offset,
59                       buf->length);
60             buf->offset = 0;
61         } else {
62             buf->size = buf->offset + new_len;
63             buf->buf = g_realloc(buf->buf, buf->size);
64         }
65     }
66 }
67
68 static void
69 add_to_buffer(
70     ipc_binary_buf_t *buf,
71     gsize size,
72     gpointer data)
73 {
74     expand_buffer(buf, size);
75
76     g_memmove(buf->buf + buf->offset + buf->length, data, size);
77     buf->length += size;
78 }
79
80 static void
81 consume_from_buffer(
82     ipc_binary_buf_t *buf,
83     gsize size)
84 {
85     g_assert(size <= buf->length);
86
87     buf->length -= size;
88     if (buf->length == 0)
89         buf->offset = 0;
90     else
91         buf->offset += size;
92 }
93
94 static gboolean
95 all_args_present(
96     ipc_binary_message_t *msg)
97 {
98     int i;
99
100     for (i = 0; i < msg->cmd->n_args; i++) {
101         if (msg->args[i].data == NULL
102                 && (msg->cmd->arg_flags[i] & IPC_BINARY_EXISTS)
103                 && !(msg->cmd->arg_flags[i] & IPC_BINARY_OPTIONAL)) {
104             g_debug("ipc-binary message missing mandatory arg %d", i);
105             return FALSE;
106         }
107     }
108
109     return TRUE;
110 }
111
112 /*
113  * Creating a protocol
114  */
115
116 ipc_binary_proto_t *
117 ipc_binary_proto_new(
118     guint16 magic)
119 {
120     ipc_binary_proto_t *prot = g_new(ipc_binary_proto_t, 1);
121
122     prot->magic = magic;
123     prot->n_cmds = 0;
124     prot->cmds = NULL;
125
126     return prot;
127 }
128
129 ipc_binary_cmd_t *
130 ipc_binary_proto_add_cmd(
131     ipc_binary_proto_t *proto,
132     guint16 id)
133 {
134     g_assert(proto != NULL);
135     g_assert(id != 0);
136
137     if (id >= proto->n_cmds) {
138         guint16 new_len = id+1;
139         int i;
140
141         proto->cmds = g_renew(ipc_binary_cmd_t, proto->cmds, new_len);
142         for (i = proto->n_cmds; i < new_len; i++) {
143             proto->cmds[i].n_args = 0;
144             proto->cmds[i].exists = FALSE;
145             proto->cmds[i].arg_flags = NULL;
146         }
147         proto->n_cmds = new_len;
148     }
149
150     /* make sure this command hasn't been defined already */
151     g_assert(!proto->cmds[id].exists);
152     proto->cmds[id].exists = TRUE;
153
154     return &proto->cmds[id];
155 }
156
157 void
158 ipc_binary_cmd_add_arg(
159     ipc_binary_cmd_t *cmd,
160     guint16 id,
161     guint8 flags)
162 {
163     g_assert(cmd != NULL);
164     g_assert(id != 0);
165     flags |= IPC_BINARY_EXISTS;
166
167     if (id >= cmd->n_args) {
168         guint16 new_len = id+1;
169         int i;
170
171         cmd->arg_flags = g_realloc(cmd->arg_flags, new_len);
172         for (i = cmd->n_args; i < new_len; i++) {
173             cmd->arg_flags[i] = 0;
174         }
175         cmd->n_args = new_len;
176     }
177
178     /* make sure this arg hasn't been defined already */
179     g_assert(cmd->arg_flags[id] == 0);
180
181     cmd->arg_flags[id] = flags;
182 }
183
184 /*
185  * Using a protocol
186  */
187
188 ipc_binary_channel_t *
189 ipc_binary_new_channel(
190     ipc_binary_proto_t *proto)
191 {
192     ipc_binary_channel_t *chan;
193
194     chan = g_new0(ipc_binary_channel_t, 1);
195     chan->proto = proto;
196
197     return chan;
198 }
199
200 void
201 ipc_binary_free_channel(
202     ipc_binary_channel_t *chan)
203 {
204     if (chan->in.buf)
205         g_free(chan->in.buf);
206
207     if (chan->out.buf)
208         g_free(chan->out.buf);
209
210     g_free(chan);
211 }
212
213 ipc_binary_message_t *
214 ipc_binary_new_message(
215     ipc_binary_channel_t *chan,
216     guint16 cmd_id)
217 {
218     ipc_binary_message_t *msg = g_new0(ipc_binary_message_t, 1);
219     ipc_binary_cmd_t *cmd;
220
221     /* make sure this is a valid command */
222     g_assert(chan != NULL);
223     g_assert(cmd_id > 0 && cmd_id < chan->proto->n_cmds);
224     g_assert(chan->proto->cmds[cmd_id].exists);
225     cmd = &chan->proto->cmds[cmd_id];
226
227     msg->chan = chan;
228     msg->cmd = cmd;
229     msg->cmd_id = cmd_id;
230     msg->n_args = cmd->n_args;
231     msg->args = g_malloc0(sizeof(*(msg->args)) * cmd->n_args);
232
233     return msg;
234 }
235
236 void
237 ipc_binary_add_arg(
238     ipc_binary_message_t *msg,
239     guint16 arg_id,
240     gsize size,
241     gpointer data,
242     gboolean take_memory)
243 {
244     /* make sure this arg has not already been set for this message */
245     g_assert(msg != NULL);
246     g_assert(data != NULL);
247     g_assert(arg_id > 0 && arg_id < msg->cmd->n_args);
248     g_assert(msg->cmd->arg_flags[arg_id] & IPC_BINARY_EXISTS);
249     g_assert(msg->args[arg_id].data == NULL);
250
251     if (size == 0 && msg->cmd->arg_flags[arg_id] & IPC_BINARY_STRING) {
252         size = strlen((gchar *)data);
253     }
254
255     if (!take_memory) {
256         data = g_memdup(data, size);
257     }
258
259     msg->args[arg_id].len = size;
260     msg->args[arg_id].data = data;
261 }
262
263 void
264 ipc_binary_free_message(
265     ipc_binary_message_t *msg)
266 {
267     int i;
268
269     g_assert(msg != NULL);
270
271     for (i = 0; i < msg->cmd->n_args; i++) {
272         gpointer data = msg->args[i].data;
273         if (data)
274             g_free(data);
275     }
276
277     g_free(msg->args);
278     g_free(msg);
279 }
280
281 ipc_binary_message_t *
282 ipc_binary_read_message(
283     ipc_binary_channel_t *chan,
284     int fd)
285 {
286     ipc_binary_message_t *msg;
287
288     /* read data until we have a whole packet or until EOF */
289     while (!(msg = ipc_binary_poll_message(chan))) {
290         gssize bytes;
291
292         if (errno)
293             return NULL;
294
295         /* read directly into the buffer, instead of using ipc_binary_feed_data */
296         expand_buffer(&chan->in, 32768);
297         bytes = read(fd, chan->in.buf + chan->in.offset + chan->in.length, 32768);
298         if (bytes < 0) {
299             /* error on read */
300             return NULL;
301         } else if (!bytes) {
302             /* got EOF; if there are bytes left over, this is EIO */
303             if (chan->in.length) {
304                 g_warning("got EOF reading ipc-binary channel with %zd bytes un-processed",
305                           chan->in.length);
306                 errno = EIO;
307             }
308
309             return NULL;
310         } else {
311             /* add the data to the buffer */
312             chan->in.length += bytes;
313         }
314     }
315
316     return msg;
317 }
318
319 int
320 ipc_binary_write_message(
321     ipc_binary_channel_t *chan,
322     int fd,
323     ipc_binary_message_t *msg)
324 {
325     gsize written;
326
327     /* add the message to the queue */
328     ipc_binary_queue_message(chan, msg);
329
330     /* and write the outgoing buffer */
331     written = full_write(fd, chan->out.buf + chan->out.offset, chan->out.length);
332     consume_from_buffer(&chan->out, written);
333
334     if (written < chan->out.length) {
335         return -1;
336     }
337
338     return 0;
339 }
340
341 void
342 ipc_binary_feed_data(
343     ipc_binary_channel_t *chan,
344     gsize size,
345     gpointer data)
346 {
347     add_to_buffer(&chan->in, size, data);
348 }
349
350 void
351 ipc_binary_data_transmitted(
352     ipc_binary_channel_t *chan,
353     gsize size)
354 {
355     consume_from_buffer(&chan->out, size);
356 }
357
358 static guint16
359 get_guint16(guint8 **p) {
360     guint16 v = 0;
361     v = *((*p)++);
362     v = *((*p)++) | v << 8;
363     return v;
364 }
365
366 static guint32
367 get_guint32(guint8 **p) {
368     guint32 v = 0;
369     v = *((*p)++);
370     v = *((*p)++) | v << 8;
371     v = *((*p)++) | v << 8;
372     v = *((*p)++) | v << 8;
373     return v;
374 }
375
376
377 ipc_binary_message_t *
378 ipc_binary_poll_message(
379     ipc_binary_channel_t *chan)
380 {
381     guint8 *p;
382     ipc_binary_message_t *msg;
383     guint16 magic;
384     guint16 cmd_id;
385     guint32 length;
386     guint16 n_args;
387
388     if (chan->in.length < MSG_HDR_LEN) {
389         errno = 0;
390         return NULL;
391     }
392
393     /* read out the pocket header, using shifts to avoid endian and alignment
394      * problems, and checking each one as we proceed */
395     p = (guint8 *)(chan->in.buf + chan->in.offset);
396
397     magic = get_guint16(&p);
398
399     if (magic != chan->proto->magic) {
400         g_debug("ipc-binary got invalid magic 0x%04x", (int)magic);
401         errno = EINVAL;
402         return NULL;
403     }
404
405     cmd_id = get_guint16(&p);
406
407     /* make sure this is a valid command */
408     if (cmd_id <= 0 || cmd_id >= chan->proto->n_cmds
409                             || !chan->proto->cmds[cmd_id].exists) {
410         errno = EINVAL;
411         return NULL;
412     }
413
414     length = get_guint32(&p);
415
416     /* see if there's enough data in this buffer for a whole message */
417     if (length > chan->in.length) {
418         errno = 0;
419         return NULL; /* whole packet isn't here yet */
420     }
421
422     n_args = get_guint16(&p);
423
424     /* looks legit -- start building a message */
425     msg = ipc_binary_new_message(chan, cmd_id);
426
427     /* get each of the arguments */
428     while (n_args--) {
429         guint16 arg_id;
430         guint32 arglen;
431
432         arglen = get_guint32(&p);
433         arg_id = get_guint16(&p);
434
435         if (arg_id <= 0 || arg_id >= msg->cmd->n_args
436                 || !(msg->cmd->arg_flags[arg_id] & IPC_BINARY_EXISTS)
437                 || msg->args[arg_id].data != NULL) {
438             g_debug("ipc-binary invalid or duplicate arg");
439             errno = EINVAL;
440             ipc_binary_free_message(msg);
441             return NULL;
442         }
443
444         /* properly terminate string args, but do not include the nul byte in
445          * the arglen */
446         if (msg->cmd->arg_flags[arg_id] & IPC_BINARY_STRING) {
447             gchar *data;
448
449             /* copy and terminate the string */
450             data = g_malloc(arglen+1);
451             g_memmove(data, p, arglen);
452             data[arglen] = '\0';
453             msg->args[arg_id].data = (gpointer)data;
454             msg->args[arg_id].len = arglen;
455         } else {
456             msg->args[arg_id].data = g_memdup(p, arglen);
457             msg->args[arg_id].len = arglen;
458         }
459
460         p += arglen;
461     }
462
463     /* check that all mandatory args are here */
464     if (!all_args_present(msg)) {
465         errno = EINVAL;
466         ipc_binary_free_message(msg);
467         return NULL;
468     }
469
470     consume_from_buffer(&chan->in, length);
471
472     return msg;
473 }
474
475 static guint8 *
476 put_guint16(guint8 *p, guint16 v)
477 {
478     *(p++) = v >> 8;
479     *(p++) = v;
480     return p;
481 }
482
483 static guint8 *
484 put_guint32(guint8 *p, guint32 v)
485 {
486     *(p++) = v >> 24;
487     *(p++) = v >> 16;
488     *(p++) = v >> 8;
489     *(p++) = v;
490     return p;
491 }
492
493 void
494 ipc_binary_queue_message(
495     ipc_binary_channel_t *chan G_GNUC_UNUSED,
496     ipc_binary_message_t *msg G_GNUC_UNUSED)
497 {
498     gsize msg_len;
499     guint8 *p;
500     int i;
501     guint16 n_args = 0;
502
503     g_assert(all_args_present(msg));
504
505     /* calculate the length and make enough room in the buffer */
506     msg_len = MSG_HDR_LEN;
507     for (i = 0; i < msg->cmd->n_args; i++) {
508         if (msg->args[i].data) {
509             n_args++;
510             msg_len += msg->args[i].len + ARG_HDR_LEN;
511         }
512     }
513     expand_buffer(&chan->out, msg_len);
514     p = (guint8 *)(chan->out.buf + chan->out.offset);
515
516     /* write the packet */
517     p = put_guint16(p, chan->proto->magic);
518     p = put_guint16(p, msg->cmd_id);
519     p = put_guint32(p, msg_len);
520     p = put_guint16(p, n_args);
521
522     for (i = 0; i < msg->cmd->n_args; i++) {
523         if (!msg->args[i].data)
524             continue;
525
526         p = put_guint32(p, msg->args[i].len);
527         p = put_guint16(p, i);
528
529         g_memmove(p, msg->args[i].data, msg->args[i].len);
530         p += msg->args[i].len;
531     }
532     chan->out.length += msg_len;
533
534     ipc_binary_free_message(msg);
535 }