2 * Amanda, The Advanced Maryland Automatic Network Disk Archiver
3 * Copyright (c) 2008-2012 Zmanda, Inc. All Rights Reserved.
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12 * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19 * Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
20 * Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
24 #include "ipc-binary.h"
26 struct ipc_binary_proto_t {
29 ipc_binary_cmd_t *cmds;
32 /* extra flag to indicate that an argument exists */
33 #define IPC_BINARY_EXISTS (1 << 7)
35 struct ipc_binary_cmd_t {
41 #define MSG_HDR_LEN 10
50 ipc_binary_buf_t *buf,
53 gsize new_len = buf->length + size;
55 /* allocate space in the buffer if necessary */
56 if (buf->offset + new_len > buf->size) {
57 if (buf->offset != 0 && new_len <= buf->size) {
59 buf->buf + buf->offset,
63 buf->size = buf->offset + new_len;
64 buf->buf = g_realloc(buf->buf, buf->size);
71 ipc_binary_buf_t *buf,
75 expand_buffer(buf, size);
77 g_memmove(buf->buf + buf->offset + buf->length, data, size);
83 ipc_binary_buf_t *buf,
86 g_assert(size <= buf->length);
97 ipc_binary_message_t *msg)
101 for (i = 0; i < msg->cmd->n_args; i++) {
102 if (msg->args[i].data == NULL
103 && (msg->cmd->arg_flags[i] & IPC_BINARY_EXISTS)
104 && !(msg->cmd->arg_flags[i] & IPC_BINARY_OPTIONAL)) {
105 g_debug("ipc-binary message missing mandatory arg %d", i);
114 * Creating a protocol
118 ipc_binary_proto_new(
121 ipc_binary_proto_t *prot = g_new(ipc_binary_proto_t, 1);
131 ipc_binary_proto_add_cmd(
132 ipc_binary_proto_t *proto,
135 g_assert(proto != NULL);
138 if (id >= proto->n_cmds) {
139 guint16 new_len = id+1;
142 proto->cmds = g_renew(ipc_binary_cmd_t, proto->cmds, new_len);
143 for (i = proto->n_cmds; i < new_len; i++) {
144 proto->cmds[i].n_args = 0;
145 proto->cmds[i].exists = FALSE;
146 proto->cmds[i].arg_flags = NULL;
148 proto->n_cmds = new_len;
151 /* make sure this command hasn't been defined already */
152 g_assert(!proto->cmds[id].exists);
153 proto->cmds[id].exists = TRUE;
155 return &proto->cmds[id];
159 ipc_binary_cmd_add_arg(
160 ipc_binary_cmd_t *cmd,
164 g_assert(cmd != NULL);
166 flags |= IPC_BINARY_EXISTS;
168 if (id >= cmd->n_args) {
169 guint16 new_len = id+1;
172 cmd->arg_flags = g_realloc(cmd->arg_flags, new_len);
173 for (i = cmd->n_args; i < new_len; i++) {
174 cmd->arg_flags[i] = 0;
176 cmd->n_args = new_len;
179 /* make sure this arg hasn't been defined already */
180 g_assert(cmd->arg_flags[id] == 0);
182 cmd->arg_flags[id] = flags;
189 ipc_binary_channel_t *
190 ipc_binary_new_channel(
191 ipc_binary_proto_t *proto)
193 ipc_binary_channel_t *chan;
195 chan = g_new0(ipc_binary_channel_t, 1);
202 ipc_binary_free_channel(
203 ipc_binary_channel_t *chan)
206 g_free(chan->in.buf);
209 g_free(chan->out.buf);
214 ipc_binary_message_t *
215 ipc_binary_new_message(
216 ipc_binary_channel_t *chan,
219 ipc_binary_message_t *msg = g_new0(ipc_binary_message_t, 1);
220 ipc_binary_cmd_t *cmd;
222 /* make sure this is a valid command */
223 g_assert(chan != NULL);
224 g_assert(cmd_id > 0 && cmd_id < chan->proto->n_cmds);
225 g_assert(chan->proto->cmds[cmd_id].exists);
226 cmd = &chan->proto->cmds[cmd_id];
230 msg->cmd_id = cmd_id;
231 msg->n_args = cmd->n_args;
232 msg->args = g_malloc0(sizeof(*(msg->args)) * cmd->n_args);
239 ipc_binary_message_t *msg,
243 gboolean take_memory)
245 /* make sure this arg has not already been set for this message */
246 g_assert(msg != NULL);
247 g_assert(data != NULL);
248 g_assert(arg_id > 0 && arg_id < msg->cmd->n_args);
249 g_assert(msg->cmd->arg_flags[arg_id] & IPC_BINARY_EXISTS);
250 g_assert(msg->args[arg_id].data == NULL);
252 if (size == 0 && msg->cmd->arg_flags[arg_id] & IPC_BINARY_STRING) {
253 size = strlen((gchar *)data);
257 data = g_memdup(data, size);
260 msg->args[arg_id].len = size;
261 msg->args[arg_id].data = data;
265 ipc_binary_free_message(
266 ipc_binary_message_t *msg)
270 g_assert(msg != NULL);
272 for (i = 0; i < msg->cmd->n_args; i++) {
273 gpointer data = msg->args[i].data;
282 ipc_binary_message_t *
283 ipc_binary_read_message(
284 ipc_binary_channel_t *chan,
287 ipc_binary_message_t *msg;
289 /* read data until we have a whole packet or until EOF */
290 while (!(msg = ipc_binary_poll_message(chan))) {
296 /* read directly into the buffer, instead of using ipc_binary_feed_data */
297 expand_buffer(&chan->in, 32768);
298 bytes = read(fd, chan->in.buf + chan->in.offset + chan->in.length, 32768);
303 /* got EOF; if there are bytes left over, this is EIO */
304 if (chan->in.length) {
305 g_warning("got EOF reading ipc-binary channel with %zd bytes un-processed",
312 /* add the data to the buffer */
313 chan->in.length += bytes;
321 ipc_binary_write_message(
322 ipc_binary_channel_t *chan,
324 ipc_binary_message_t *msg)
328 /* add the message to the queue */
329 ipc_binary_queue_message(chan, msg);
331 /* and write the outgoing buffer */
332 written = full_write(fd, chan->out.buf + chan->out.offset, chan->out.length);
333 consume_from_buffer(&chan->out, written);
335 if (written < chan->out.length) {
343 ipc_binary_feed_data(
344 ipc_binary_channel_t *chan,
348 add_to_buffer(&chan->in, size, data);
352 ipc_binary_data_transmitted(
353 ipc_binary_channel_t *chan,
356 consume_from_buffer(&chan->out, size);
360 get_guint16(guint8 **p) {
363 v = *((*p)++) | v << 8;
368 get_guint32(guint8 **p) {
371 v = *((*p)++) | v << 8;
372 v = *((*p)++) | v << 8;
373 v = *((*p)++) | v << 8;
378 ipc_binary_message_t *
379 ipc_binary_poll_message(
380 ipc_binary_channel_t *chan)
383 ipc_binary_message_t *msg;
389 if (chan->in.length < MSG_HDR_LEN) {
394 /* read out the pocket header, using shifts to avoid endian and alignment
395 * problems, and checking each one as we proceed */
396 p = (guint8 *)(chan->in.buf + chan->in.offset);
398 magic = get_guint16(&p);
400 if (magic != chan->proto->magic) {
401 g_debug("ipc-binary got invalid magic 0x%04x", (int)magic);
406 cmd_id = get_guint16(&p);
408 /* make sure this is a valid command */
409 if (cmd_id <= 0 || cmd_id >= chan->proto->n_cmds
410 || !chan->proto->cmds[cmd_id].exists) {
415 length = get_guint32(&p);
417 /* see if there's enough data in this buffer for a whole message */
418 if (length > chan->in.length) {
420 return NULL; /* whole packet isn't here yet */
423 n_args = get_guint16(&p);
425 /* looks legit -- start building a message */
426 msg = ipc_binary_new_message(chan, cmd_id);
428 /* get each of the arguments */
433 arglen = get_guint32(&p);
434 arg_id = get_guint16(&p);
436 if (arg_id <= 0 || arg_id >= msg->cmd->n_args
437 || !(msg->cmd->arg_flags[arg_id] & IPC_BINARY_EXISTS)
438 || msg->args[arg_id].data != NULL) {
439 g_debug("ipc-binary invalid or duplicate arg");
441 ipc_binary_free_message(msg);
445 /* properly terminate string args, but do not include the nul byte in
447 if (msg->cmd->arg_flags[arg_id] & IPC_BINARY_STRING) {
450 /* copy and terminate the string */
451 data = g_malloc(arglen+1);
452 g_memmove(data, p, arglen);
454 msg->args[arg_id].data = (gpointer)data;
455 msg->args[arg_id].len = arglen;
457 msg->args[arg_id].data = g_memdup(p, arglen);
458 msg->args[arg_id].len = arglen;
464 /* check that all mandatory args are here */
465 if (!all_args_present(msg)) {
467 ipc_binary_free_message(msg);
471 consume_from_buffer(&chan->in, length);
477 put_guint16(guint8 *p, guint16 v)
485 put_guint32(guint8 *p, guint32 v)
495 ipc_binary_queue_message(
496 ipc_binary_channel_t *chan G_GNUC_UNUSED,
497 ipc_binary_message_t *msg G_GNUC_UNUSED)
504 g_assert(all_args_present(msg));
506 /* calculate the length and make enough room in the buffer */
507 msg_len = MSG_HDR_LEN;
508 for (i = 0; i < msg->cmd->n_args; i++) {
509 if (msg->args[i].data) {
511 msg_len += msg->args[i].len + ARG_HDR_LEN;
514 expand_buffer(&chan->out, msg_len);
515 p = (guint8 *)(chan->out.buf + chan->out.offset);
517 /* write the packet */
518 p = put_guint16(p, chan->proto->magic);
519 p = put_guint16(p, msg->cmd_id);
520 p = put_guint32(p, msg_len);
521 p = put_guint16(p, n_args);
523 for (i = 0; i < msg->cmd->n_args; i++) {
524 if (!msg->args[i].data)
527 p = put_guint32(p, msg->args[i].len);
528 p = put_guint16(p, i);
530 g_memmove(p, msg->args[i].data, msg->args[i].len);
531 p += msg->args[i].len;
533 chan->out.length += msg_len;
535 ipc_binary_free_message(msg);