1 # Copyright (c) 2009-2012 Zmanda, Inc. All Rights Reserved.
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 # Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
20 package Amanda::IPC::LineProtocol;
23 Amanda::IPC::LineProtocol -- parent class for line-based protocols
30 use Amanda::IPC::LineProtocol;
31 use base "Amanda::IPC::LineProtocol";
33 use constant SETSTATUS => message("SETSTATUS",
35 format => [ qw( param1 param2 optional? list* ) ],
37 use constant PING => message("PING",
39 format => [ qw( id ) ],
41 use constant PONG => message("PONG",
43 format => [ qw( id ) ],
47 # And use the protocol
49 my $input_fh = IO::Handle->new(...);
50 my $output_fh = IO::Handle->new(...);
53 my $ping_cb = make_cb(ping_cb => sub {
54 my ($msg, %args) = @_;
55 $proto->send(MyProtocol::PONG, id => $args{'id'});
58 my $message_cb = make_cb(message_cb => sub {
59 my ($msg, %args) = @_;
65 $proto = MyProtocol->new(
68 message_cb => $message_cb);
71 $proto->set_message_cb(MyProtocol::PING, $ping_cb);
72 $proto->set_message_cb(MyProtocol::PONG, $pong_cb);
74 # or send messages to an object, with method names based on
78 my ($msg, %args) = @_;
81 $proto = MyProtocol->new( # ..
82 message_obj => $self);
85 $proto->send(MyProtocol::SETSTATUS,
90 # shut down the protocol, flushing any messages waiting to
92 my $finished_cb = make_cb(finished_cb => sub {
96 $proto->stop(finished_cb => $finished_cb);
100 This library is used to implement communications between Amanda processes.
101 Amanda has historically implemented a number of distinct text-based protocols
102 for communications between various components, and this library servces to
103 abstract and centralize the implementation of those protocols.
105 The package supports point-to-point, message-based, symmetric protocols. Two
106 communicating processes exchange discrete messages, and in principle either
107 process can send a message at any time, although this is limited by the (often
108 unwritten) rules of the protocol.
110 In protocols based on this package, each message is a single text line,
111 terminated with a newline and consisting of a sequence of quoted strings. The
112 first string determines the type of message. For example:
114 SEND-MORE-MONEY $150.00 "Books and pencils"
115 ORDER-PIZZA Large Mushrooms Olives Onions "Green Peppers"
117 The package is asynchronous (see L<Amanda::MainLoop>), triggering callbacks for
118 incoming messages rather than implementing a C<get_message> method or the like.
119 If necessary, outgoing messages are queued for later transmission, thus
120 avoiding deadlocks from full pipe buffers. This allows processing to continue
121 unhindered in both processes while messages are in transit in either direction.
123 =head2 DEFINING A PROTOCOL
125 There are two parts to any use of this package. First, define the protocol by
126 creating a subclass and populating it using the C<message> package method.
127 This begins with something like
129 package CollegeProtocol;
130 use base "Amanda::IPC::LineProtocol";
131 use Amanda::IPC::LineProtocol;
133 The usual trick for specifying messages is to simultaneously define a series of
134 constants, using the following idiom:
136 use constant ORDER_PIZZA => message("ORDER-PIZZA",
137 match => qr/^ORDER-PIZZA$/,
138 format => [ qw( size toppings* ) ],
141 The first argument to C<message> is the word with which this particular message
142 type will be sent. The C<match> parameter gives a regular expression which
143 will be used to recognize incoming messages of this type. If this parameter
144 is not specified, the default is to match the first argument with a
145 case-insensitive regexp.
147 The C<format> parameter describes the format of the arguments for this message
148 type. A format parameter with the C<*> suffix gathers all remaining arguments
149 into a list. A C<?> suffix indicates an optional parameter. Note that it is
150 quite possible to specify ambiguous formats which will not work like you
151 expect. The default format is an empty list (taking no arguments).
153 The optional C<on_eof> parameter will cause a a message of this type to be
154 generated on EOF. For example, with:
156 use constant DROP_OUT => message("DROP-OUT",
160 when an EOF is detected, a C<DROP_OUT> message will be generated.
162 The protocol class should contain, in POD, a full description of the syntax of
163 the protcol -- which messages may be sent when, and what they mean. No
164 facility is provided to encode this description in perl.
166 In general, protocols are expected to be symmetrical -- any message can either
167 be sent or received. However, some existing protocols use different formats in
168 different directions. In this case, specify C<format> as a hashref with keys
169 C<in> and C<out> pointing to the two different formats:
171 use constant ERROR => message("ERROR",
172 match => qr/^ERROR$/,
173 format => { in => [ qw( message severity ) ],
174 out => [ qw( component severity message ) ] },
177 =head2 USING A PROTOCOL
179 Once a protocol is defined, it forms a class which can be used to run the
180 protocol. Multiple instances of this class can be created to handle
181 simultaneous uses of the protocol over different channels.
183 The constructor, C<new>, takes two C<IO::Handle> objects -- one to read from
184 (C<rx_fh>) and one to write to (C<tx_fh>). In some cases (e.g., a socket),
185 these may be the same handle. It takes an optional callback, C<message_cb>,
186 which will be called for any received messages not handled by a more specific
187 callback. Any other parameters are considered message-type-specific callbacks.
189 For example, given a socket handle C<$sockh>, the following will start the
190 C<CollegeProtocol> running on that socket:
192 my $proto = CollegeProtocol->new(
196 $proto->set_message_cb(CollegeProtocol::PIZZA_DELIVERY, $pizza_delivery_cb);
198 For protocols with a lot of message types, it may be useful to have the
199 protocol call methods on an object. This is done with the C<message_obj>
200 argument to the protocol constructor:
202 $proto = CollegeProtocol->new( # ..
203 message_obj => $obj);
205 The methods are named C<msg_$msgname>, where $msgname has all non-identifier
206 characters translated to an underscore (C<_>). For situations where the meaning
207 of a message can change dynamically, it may be useful to set a callback after
208 the object has been crated:
210 $proto->set_message_cb(CollegeProtocol::MIDTERM,
213 The constructor also takes a 'debug' argument; if given, then all incoming and
214 outgoing messages will be written to the debug log with this argument as
217 All message callbacks have the same signature:
219 my $pizza_delivery_cb = make_cb(pizza_delivery_cb => sub {
220 # (note that object methods will get the usual $self)
221 my ($msgtype, %params) = @_;
224 where C<%params> contains all of the arguments to the message, keyed by the
225 argument names given in the message's C<format>. Note that parameters
226 specified with the C<*> suffix will appear as arrayrefs.
228 Callbacks specified with C<set_message_cb> take precedence over other
229 specifications; next are message-specific callbacks given to the constructor,
230 followed by C<message_obj>, and finally C<message_cb>.
232 In case of an error, the C<message_cb> (if specified) is called with
233 C<$msgtype> undefined and with a single parameter named C<error> giving the
234 error message. This generally indicates either an unknown or badly-formatted
237 To send a message, use the C<send> method, which takes the same arguments as a
240 $proto->send(CollegeProtocol::SEND_MORE_MONEY,
241 how_much => "$150.00",
242 what_for => "Books and pencils");
247 our @ISA = qw( Exporter );
248 our @EXPORT = qw( message new );
251 use POSIX qw( :errno_h );
256 use Amanda::Debug qw( debug );
257 use Amanda::MainLoop qw( :GIOCondition make_cb );
261 # Package methods to support protocol definition
263 my %msgspecs_by_protocol;
265 my ($name, @params) = @_;
267 my $msgspec = $msgspecs_by_protocol{caller()}->{$name} = { @params };
269 # do some parameter sanity checks
271 my @allowed_params = qw( match format on_eof );
272 for $param (keys %$msgspec) {
273 die "invalid message() parameter '$param'"
274 unless grep { $_ eq $param } @allowed_params;
277 # normalize the results a little bit
278 $msgspec->{'name'} = $name;
280 if (!exists $msgspec->{'match'}) {
281 $msgspec->{'match'} = qr/^$msgspec->{'name'}$/i;
283 if (!exists $msgspec->{'format'}) {
284 $msgspec->{'format'} = [];
287 # calculate a method name
288 my $methname = "msg_$name";
289 $methname =~ tr/a-zA-Z0-9/_/c;
290 $msgspec->{'methname'} = $methname;
304 debug => $params{'debug'},
306 rx_fh => $params{'rx_fh'},
311 tx_fh => $params{'tx_fh'},
314 tx_finished_cb => undef,
315 tx_outstanding_writes => 0,
318 message_obj => $params{'message_obj'},
319 default_cb => $params{'message_cb'},
321 # a ref to the existing structure
322 msgspecs => $msgspecs_by_protocol{$class},
325 # set nonblocking mode on both file descriptor, but only for non-tty
326 # handles -- non-blocking tty's don't work well at all.
327 if (POSIX::isatty($self->{'rx_fh'}->fileno())) {
328 $self->{'rx_fh_tty'} = 1;
330 if (!defined($self->{'rx_fh'}->blocking(0))) {
331 die("Could not make protocol filehandle non-blocking");
335 if (POSIX::isatty($self->{'tx_fh'}->fileno())) {
336 $self->{'tx_fh_tty'} = 1;
338 if (!defined($self->{'tx_fh'}->blocking(0))) {
339 die("Could not make protocol filehandle non-blocking");
344 $self->{'rx_source'} = Amanda::MainLoop::async_read(
345 fd => $self->{'rx_fh'}->fileno(),
346 async_read_cb => sub { $self->_async_read_cb(@_); });
353 my ($name, $message_cb) = @_;
355 $self->{'cmd_cbs'}->{$name} = $message_cb;
362 $self->{'stopped'} = 1;
364 # abort listening for incoming data
365 if (defined $self->{'rx_source'}) {
366 $self->{'rx_source'}->remove();
369 # and flush any outgoing messages
370 if ($self->{'tx_outstanding_writes'} > 0) {
371 $self->{'tx_finished_cb'} = $params{'finished_cb'};
373 $params{'finished_cb'}->();
379 my ($name, %info) = @_;
381 my $msgspec = $self->{'msgspecs'}->{$name};
382 die "No message spec for '$name'" unless defined($msgspec);
384 my @line = $msgspec->{'name'};
386 my $format = $msgspec->{'format'};
387 $format = $format->{'out'} if (ref $format eq "HASH");
389 for my $elt (@$format) {
390 my ($name, $kind)= ($elt =~ /^(.*?)([*?]?)$/);
391 my $val = $info{$name};
393 croak "Value for '$name' is undefined";
397 croak "message key '$name' must be an array"
398 unless defined $val and ref($val) eq "ARRAY";
401 croak "message key '$name' is required"
402 unless defined $val or $kind eq "?";
403 push @line, $val if defined $val;
407 my $line = join(" ", map { Amanda::Util::quote_string("$_") } @line);
408 debug($self->{'debug'} . " >> $line") if ($self->{'debug'});
411 ++$self->{'tx_outstanding_writes'};
412 my $write_done_cb = make_cb(write_done_cb => sub {
413 my ($err, $nbytes) = @_;
416 # TODO: handle this better
420 # call the protocol's finished_cb if necessary
421 if (--$self->{'tx_outstanding_writes'} == 0 and $self->{'tx_finished_cb'}) {
422 $self->{'tx_finished_cb'}->();
425 $self->{'tx_source'} = Amanda::MainLoop::async_write(
426 fd => $self->{'tx_fh'}->fileno(),
428 async_write_cb => $write_done_cb);
432 # Handle incoming messages
438 for my $msgspec (values %{$self->{'msgspecs'}}) {
439 my $match = $msgspec->{'match'};
440 next unless defined($match);
441 return $msgspec if ($cmdstr =~ $match);
449 my ($msgspec, @line) = @_;
451 # parse the message according to the "in" format
452 my $format = $msgspec->{'format'};
453 $format = $format->{'in'} if (ref $format eq "HASH");
456 for my $elt (@$format) {
457 my ($name, $kind)= ($elt =~ /^(.*?)([*?]?)$/);
460 $args->{$name} = [ @line ];
465 next if ($kind eq "?" and !@line);
468 return "too few arguments to '$msgspec->{name}': first missing argument is $name";
471 $args->{$name} = shift @line;
475 return "too many arguments to '$msgspec->{name}': first unmatched argument is '$line[0]'";
478 return (undef, $args);
481 sub _call_message_cb {
483 my ($msgspec, $line, $args) = @_;
485 # after the user calls stop(), don't call any more callbacks
486 return if $self->{'stopped'};
488 # send a bogus line message to the default_cb if there's no msgspec
489 if (!defined $msgspec) {
490 if ($self->{'default_cb'}) {
491 $self->{'default_cb'}->(undef, %$args);
493 debug("IPC: " . ($args->{'error'} or "bogus line '$line'"));
498 # otherwise, call the relevant callback
499 if (exists $self->{'cmd_cbs'}{$msgspec->{'name'}}) {
500 return $self->{'cmd_cbs'}{$msgspec->{'name'}}->($msgspec->{'name'}, %$args);
503 if (defined $self->{'message_obj'} and $self->{'message_obj'}->can($msgspec->{'methname'})) {
504 my $methname = $msgspec->{'methname'};
505 return $self->{'message_obj'}->$methname($msgspec->{'name'}, %$args);
508 if ($self->{'default_cb'}) {
509 return $self->{'default_cb'}->($msgspec->{'name'}, %$args);
512 warn "IPC: Ignored unhandled line '$line'";
522 debug($self->{'debug'} . " << $line") if ($self->{'debug'});
524 # turn the line into a list of strings..
525 my @line = Amanda::Util::split_quoted_strings($line);
528 # get the specification for this message
529 my $msgspec = $self->_find_msgspec(shift @line);
530 if (!defined $msgspec) {
531 $self->_call_message_cb(undef, $line, {error => 'unknown command'});
535 my ($parserr, $args) = $self->_parse_line($msgspec, @line);
537 $self->_call_message_cb(undef, $line, {error => $parserr});
541 $self->_call_message_cb($msgspec, $line, $args);
547 # handle a final line, even without a newline (is this wise?)
548 if ($self->{'rx_buffer'} ne '') {
549 $self->_incoming_line($self->{'rx_buffer'} . "\n");
552 # find the EOF msgspec and call it
553 for my $msgspec (values %{$self->{'msgspecs'}}) {
554 if ($msgspec->{'on_eof'}) {
555 $self->_call_message_cb($msgspec, "(EOF)", {});
563 my ($err, $data) = @_;
566 # TODO: call an error_handler given to new()?
571 $self->_incoming_eof();
575 # set up to read the next chunk
576 $self->{'rx_source'} = Amanda::MainLoop::async_read(
577 fd => $self->{'rx_fh'}->fileno(),
578 async_read_cb => sub { $self->_async_read_cb(@_); });
580 # and process this data
581 $self->{'rx_buffer'} .= $data;
583 while ($self->{'rx_buffer'} =~ /\n/) {
584 my ($line, $rest) = split '\n', $self->{'rx_buffer'}, 2;
585 $self->{'rx_buffer'} = $rest;
586 $self->_incoming_line($line);