1 # Copyright (c) 2009 Zmanda, Inc. All Rights Reserved.
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License version 2 as published
5 # by the Free Software Foundation.
7 # This program is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 # You should have received a copy of the GNU General Public License along
13 # with this program; if not, write to the Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19 package Amanda::IPC::LineProtocol;
22 Amanda::IPC::LineProtocol -- parent class for line-based protocols
29 use Amanda::IPC::LineProtocol;
30 use base "Amanda::IPC::LineProtocol";
32 use constant SETSTATUS => message("SETSTATUS",
34 format => [ qw( param1 param2 optional? list* ) ],
36 use constant PING => message("PING",
38 format => [ qw( id ) ],
40 use constant PONG => message("PONG",
42 format => [ qw( id ) ],
46 # And use the protocol
48 my $input_fh = IO::Handle->new(...);
49 my $output_fh = IO::Handle->new(...);
52 my $ping_cb = make_cb(ping_cb => sub {
53 my ($msg, %args) = @_;
54 $proto->send(MyProtocol::PONG, id => $args{'id'});
57 my $message_cb = make_cb(message_cb => sub {
58 my ($msg, %args) = @_;
64 $proto = MyProtocol->new(
67 message_cb => $message_cb);
70 $proto->set_message_cb(MyProtocol::PING, $ping_cb);
71 $proto->set_message_cb(MyProtocol::PONG, $pong_cb);
73 # or send messages to an object, with method names based on
77 my ($msg, %args) = @_;
80 $proto = MyProtocol->new( # ..
81 message_obj => $self);
84 $proto->send(MyProtocol::SETSTATUS,
89 # shut down the protocol, flushing any messages waiting to
91 my $finished_cb = make_cb(finished_cb => sub {
95 $proto->stop(finished_cb => $finished_cb);
99 This library is used to implement communications between Amanda processes.
100 Amanda has historically implemented a number of distinct text-based protocols
101 for communications between various components, and this library servces to
102 abstract and centralize the implementation of those protocols.
104 The package supports point-to-point, message-based, symmetric protocols. Two
105 communicating processes exchange discrete messages, and in principle either
106 process can send a message at any time, although this is limited by the (often
107 unwritten) rules of the protocol.
109 In protocols based on this package, each message is a single text line,
110 terminated with a newline and consisting of a sequence of quoted strings. The
111 first string determines the type of message. For example:
113 SEND-MORE-MONEY $150.00 "Books and pencils"
114 ORDER-PIZZA Large Mushrooms Olives Onions "Green Peppers"
116 The package is asynchronous (see L<Amanda::MainLoop>), triggering callbacks for
117 incoming messages rather than implementing a C<get_message> method or the like.
118 If necessary, outgoing messages are queued for later transmission, thus
119 avoiding deadlocks from full pipe buffers. This allows processing to continue
120 unhindered in both processes while messages are in transit in either direction.
122 =head2 DEFINING A PROTOCOL
124 There are two parts to any use of this package. First, define the protocol by
125 creating a subclass and populating it using the C<message> package method.
126 This begins with something like
128 package CollegeProtocol;
129 use base "Amanda::IPC::LineProtocol";
130 use Amanda::IPC::LineProtocol;
132 The usual trick for specifying messages is to simultaneously define a series of
133 constants, using the following idiom:
135 use constant ORDER_PIZZA => message("ORDER-PIZZA",
136 match => qr/^ORDER-PIZZA$/,
137 format => [ qw( size toppings* ) ],
140 The first argument to C<message> is the word with which this particular message
141 type will be sent. The C<match> parameter gives a regular expression which
142 will be used to recognize incoming messages of this type. If this parameter
143 is not specified, the default is to match the first argument with a
144 case-insensitive regexp.
146 The C<format> parameter describes the format of the arguments for this message
147 type. A format parameter with the C<*> suffix gathers all remaining arguments
148 into a list. A C<?> suffix indicates an optional parameter. Note that it is
149 quite possible to specify ambiguous formats which will not work like you
150 expect. The default format is an empty list (taking no arguments).
152 The optional C<on_eof> parameter will cause a a message of this type to be
153 generated on EOF. For example, with:
155 use constant DROP_OUT => message("DROP-OUT",
159 when an EOF is detected, a C<DROP_OUT> message will be generated.
161 The protocol class should contain, in POD, a full description of the syntax of
162 the protcol -- which messages may be sent when, and what they mean. No
163 facility is provided to encode this description in perl.
165 In general, protocols are expected to be symmetrical -- any message can either
166 be sent or received. However, some existing protocols use different formats in
167 different directions. In this case, specify C<format> as a hashref with keys
168 C<in> and C<out> pointing to the two different formats:
170 use constant ERROR => message("ERROR",
171 match => qr/^ERROR$/,
172 format => { in => [ qw( message severity ) ],
173 out => [ qw( component severity message ) ] },
176 =head2 USING A PROTOCOL
178 Once a protocol is defined, it forms a class which can be used to run the
179 protocol. Multiple instances of this class can be created to handle
180 simultaneous uses of the protocol over different channels.
182 The constructor, C<new>, takes two C<IO::Handle> objects -- one to read from
183 (C<rx_fh>) and one to write to (C<tx_fh>). In some cases (e.g., a socket),
184 these may be the same handle. It takes an optional callback, C<message_cb>,
185 which will be called for any received messages not handled by a more specific
186 callback. Any other parameters are considered message-type-specific callbacks.
188 For example, given a socket handle C<$sockh>, the following will start the
189 C<CollegeProtocol> running on that socket:
191 my $proto = CollegeProtocol->new(
195 $proto->set_message_cb(CollegeProtocol::PIZZA_DELIVERY, $pizza_delivery_cb);
197 For protocols with a lot of message types, it may be useful to have the
198 protocol call methods on an object. This is done with the C<message_obj>
199 argument to the protocol constructor:
201 $proto = CollegeProtocol->new( # ..
202 message_obj => $obj);
204 The methods are named C<msg_$msgname>, where $msgname has all non-identifier
205 characters translated to an underscore (C<_>). For situations where the meaning
206 of a message can change dynamically, it may be useful to set a callback after
207 the object has been crated:
209 $proto->set_message_cb(CollegeProtocol::MIDTERM,
212 The constructor also takes a 'debug' argument; if given, then all incoming and
213 outgoing messages will be written to the debug log with this argument as
216 All message callbacks have the same signature:
218 my $pizza_delivery_cb = make_cb(pizza_delivery_cb => sub {
219 # (note that object methods will get the usual $self)
220 my ($msgtype, %params) = @_;
223 where C<%params> contains all of the arguments to the message, keyed by the
224 argument names given in the message's C<format>. Note that parameters
225 specified with the C<*> suffix will appear as arrayrefs.
227 Callbacks specified with C<set_message_cb> take precedence over other
228 specifications; next are message-specific callbacks given to the constructor,
229 followed by C<message_obj>, and finally C<message_cb>.
231 In case of an error, the C<message_cb> (if specified) is called with
232 C<$msgtype> undefined and with a single parameter named C<error> giving the
233 error message. This generally indicates either an unknown or badly-formatted
236 To send a message, use the C<send> method, which takes the same arguments as a
239 $proto->send(CollegeProtocol::SEND_MORE_MONEY,
240 how_much => "$150.00",
241 what_for => "Books and pencils");
246 our @ISA = qw( Exporter );
247 our @EXPORT = qw( message new );
250 use POSIX qw( :errno_h );
255 use Amanda::Debug qw( debug );
256 use Amanda::MainLoop qw( :GIOCondition make_cb );
260 # Package methods to support protocol definition
262 my %msgspecs_by_protocol;
264 my ($name, @params) = @_;
266 my $msgspec = $msgspecs_by_protocol{caller()}->{$name} = { @params };
268 # do some parameter sanity checks
270 my @allowed_params = qw( match format on_eof );
271 for $param (keys %$msgspec) {
272 die "invalid message() parameter '$param'"
273 unless grep { $_ eq $param } @allowed_params;
276 # normalize the results a little bit
277 $msgspec->{'name'} = $name;
279 if (!exists $msgspec->{'match'}) {
280 $msgspec->{'match'} = qr/^$msgspec->{'name'}$/i;
282 if (!exists $msgspec->{'format'}) {
283 $msgspec->{'format'} = [];
286 # calculate a method name
287 my $methname = "msg_$name";
288 $methname =~ tr/a-zA-Z0-9/_/c;
289 $msgspec->{'methname'} = $methname;
303 debug => $params{'debug'},
305 rx_fh => $params{'rx_fh'},
310 tx_fh => $params{'tx_fh'},
313 tx_finished_cb => undef,
314 tx_outstanding_writes => 0,
317 message_obj => $params{'message_obj'},
318 default_cb => $params{'message_cb'},
320 # a ref to the existing structure
321 msgspecs => $msgspecs_by_protocol{$class},
324 # set nonblocking mode on both file descriptor, but only for non-tty
325 # handles -- non-blocking tty's don't work well at all.
326 if (POSIX::isatty($self->{'rx_fh'}->fileno())) {
327 $self->{'rx_fh_tty'} = 1;
329 if (!defined($self->{'rx_fh'}->blocking(0))) {
330 die("Could not make protocol filehandle non-blocking");
334 if (POSIX::isatty($self->{'tx_fh'}->fileno())) {
335 $self->{'tx_fh_tty'} = 1;
337 if (!defined($self->{'tx_fh'}->blocking(0))) {
338 die("Could not make protocol filehandle non-blocking");
343 $self->{'rx_source'} = Amanda::MainLoop::async_read(
344 fd => $self->{'rx_fh'}->fileno(),
345 async_read_cb => sub { $self->_async_read_cb(@_); });
352 my ($name, $message_cb) = @_;
354 $self->{'cmd_cbs'}->{$name} = $message_cb;
361 $self->{'stopped'} = 1;
363 # abort listening for incoming data
364 if (defined $self->{'rx_source'}) {
365 $self->{'rx_source'}->remove();
368 # and flush any outgoing messages
369 if ($self->{'tx_outstanding_writes'} > 0) {
370 $self->{'tx_finished_cb'} = $params{'finished_cb'};
372 $params{'finished_cb'}->();
378 my ($name, %info) = @_;
380 my $msgspec = $self->{'msgspecs'}->{$name};
381 die "No message spec for '$name'" unless defined($msgspec);
383 my @line = $msgspec->{'name'};
385 my $format = $msgspec->{'format'};
386 $format = $format->{'out'} if (ref $format eq "HASH");
388 for my $elt (@$format) {
389 my ($name, $kind)= ($elt =~ /^(.*?)([*?]?)$/);
390 my $val = $info{$name};
392 croak "Value for '$name' is undefined";
396 croak "message key '$name' must be an array"
397 unless defined $val and ref($val) eq "ARRAY";
400 croak "message key '$name' is required"
401 unless defined $val or $kind eq "?";
402 push @line, $val if defined $val;
406 my $line = join(" ", map { Amanda::Util::quote_string("$_") } @line);
407 debug($self->{'debug'} . " >> $line") if ($self->{'debug'});
410 ++$self->{'tx_outstanding_writes'};
411 my $write_done_cb = make_cb(write_done_cb => sub {
412 my ($err, $nbytes) = @_;
415 # TODO: handle this better
419 # call the protocol's finished_cb if necessary
420 if (--$self->{'tx_outstanding_writes'} == 0 and $self->{'tx_finished_cb'}) {
421 $self->{'tx_finished_cb'}->();
424 $self->{'tx_source'} = Amanda::MainLoop::async_write(
425 fd => $self->{'tx_fh'}->fileno(),
427 async_write_cb => $write_done_cb);
431 # Handle incoming messages
437 for my $msgspec (values %{$self->{'msgspecs'}}) {
438 my $match = $msgspec->{'match'};
439 next unless defined($match);
440 return $msgspec if ($cmdstr =~ $match);
448 my ($msgspec, @line) = @_;
450 # parse the message according to the "in" format
451 my $format = $msgspec->{'format'};
452 $format = $format->{'in'} if (ref $format eq "HASH");
455 for my $elt (@$format) {
456 my ($name, $kind)= ($elt =~ /^(.*?)([*?]?)$/);
459 $args->{$name} = [ @line ];
464 next if ($kind eq "?" and !@line);
467 return "too few arguments to '$msgspec->{name}': first missing argument is $name";
470 $args->{$name} = shift @line;
474 return "too many arguments to '$msgspec->{name}': first unmatched argument is '$line[0]'";
477 return (undef, $args);
480 sub _call_message_cb {
482 my ($msgspec, $line, $args) = @_;
484 # after the user calls stop(), don't call any more callbacks
485 return if $self->{'stopped'};
487 # send a bogus line message to the default_cb if there's no msgspec
488 if (!defined $msgspec) {
489 if ($self->{'default_cb'}) {
490 $self->{'default_cb'}->(undef, %$args);
492 debug("IPC: " . ($args->{'error'} or "bogus line '$line'"));
497 # otherwise, call the relevant callback
498 if (exists $self->{'cmd_cbs'}{$msgspec->{'name'}}) {
499 return $self->{'cmd_cbs'}{$msgspec->{'name'}}->($msgspec->{'name'}, %$args);
502 if (defined $self->{'message_obj'} and $self->{'message_obj'}->can($msgspec->{'methname'})) {
503 my $methname = $msgspec->{'methname'};
504 return $self->{'message_obj'}->$methname($msgspec->{'name'}, %$args);
507 if ($self->{'default_cb'}) {
508 return $self->{'default_cb'}->($msgspec->{'name'}, %$args);
511 warn "IPC: Ignored unhandled line '$line'";
521 debug($self->{'debug'} . " << $line") if ($self->{'debug'});
523 # turn the line into a list of strings..
524 my @line = Amanda::Util::split_quoted_strings($line);
527 # get the specification for this message
528 my $msgspec = $self->_find_msgspec(shift @line);
529 if (!defined $msgspec) {
530 $self->_call_message_cb(undef, $line, {error => 'unknown command'});
534 my ($parserr, $args) = $self->_parse_line($msgspec, @line);
536 $self->_call_message_cb(undef, $line, {error => $parserr});
540 $self->_call_message_cb($msgspec, $line, $args);
546 # handle a final line, even without a newline (is this wise?)
547 if ($self->{'rx_buffer'} ne '') {
548 $self->_incoming_line($self->{'rx_buffer'} . "\n");
551 # find the EOF msgspec and call it
552 for my $msgspec (values %{$self->{'msgspecs'}}) {
553 if ($msgspec->{'on_eof'}) {
554 $self->_call_message_cb($msgspec, "(EOF)", {});
562 my ($err, $data) = @_;
565 # TODO: call an error_handler given to new()?
570 $self->_incoming_eof();
574 # set up to read the next chunk
575 $self->{'rx_source'} = Amanda::MainLoop::async_read(
576 fd => $self->{'rx_fh'}->fileno(),
577 async_read_cb => sub { $self->_async_read_cb(@_); });
579 # and process this data
580 $self->{'rx_buffer'} .= $data;
582 while ($self->{'rx_buffer'} =~ /\n/) {
583 my ($line, $rest) = split '\n', $self->{'rx_buffer'}, 2;
584 $self->{'rx_buffer'} = $rest;
585 $self->_incoming_line($line);