Imported Upstream version 3.2.0
[debian/amanda] / perl / Amanda / IPC / LineProtocol.pm
1 # Copyright (c) 2009 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License version 2 as published
5 # by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
10 # for more details.
11 #
12 # You should have received a copy of the GNU General Public License along
13 # with this program; if not, write to the Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
15 #
16 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
18
19 package Amanda::IPC::LineProtocol;
20 =head1 NAME
21
22 Amanda::IPC::LineProtocol -- parent class for line-based protocols
23
24 =head1 SYNOPSIS
25
26 Define your protocol:
27
28     packge MyProtocol;
29     use Amanda::IPC::LineProtocol;
30     use base "Amanda::IPC::LineProtocol";
31
32     use constant SETSTATUS => message("SETSTATUS",
33         match => qr/^FOO$/i,
34         format => [ qw( param1 param2 optional? list* ) ],
35     );
36     use constant PING => message("PING",
37         match => qr/^PING$/i,
38         format => [ qw( id ) ],
39     );
40     use constant PONG => message("PONG",
41         match => qr/^PONG$/i,
42         format => [ qw( id ) ],
43     );
44     # ...
45
46     # And use the protocol
47     package main;
48     my $input_fh = IO::Handle->new(...);
49     my $output_fh = IO::Handle->new(...);
50     my $proto;
51
52     my $ping_cb = make_cb(ping_cb => sub {
53         my ($msg, %args) = @_;
54         $proto->send(MyProtocol::PONG, id => $args{'id'});
55     });
56
57     my $message_cb = make_cb(message_cb => sub {
58         my ($msg, %args) = @_;
59         if (!$msg) {
60             die $args{'error'};
61         }
62     });
63
64     $proto = MyProtocol->new(
65             rx_fh => $input_fh,
66             tx_fh => $output_fh,
67             message_cb => $message_cb);
68
69     # and add callbacks
70     $proto->set_message_cb(MyProtocol::PING, $ping_cb);
71     $proto->set_message_cb(MyProtocol::PONG, $pong_cb);
72
73     # or send messages to an object, with method names based on
74     # the message name
75     sub msg_PONG {
76         my $self = shift;
77         my ($msg, %args) = @_;
78     }
79     # ..
80     $proto = MyProtocol->new( # ..
81             message_obj => $self);
82
83     # send a message
84     $proto->send(MyProtocol::SETSTATUS,
85         param1 => "x",
86         param2 => "y",
87         );
88
89     # shut down the protocol, flushing any messages waiting to
90     # be sent first
91     my $finished_cb = make_cb(finished_cb => sub {
92         my ($err) = @_;
93         # ...
94     });
95     $proto->stop(finished_cb => $finished_cb);
96
97 =head1 DESCRIPTION
98
99 This library is used to implement communications between Amanda processes.
100 Amanda has historically implemented a number of distinct text-based protocols
101 for communications between various components, and this library servces to
102 abstract and centralize the implementation of those protocols.
103
104 The package supports point-to-point, message-based, symmetric protocols.  Two
105 communicating processes exchange discrete messages, and in principle either
106 process can send a message at any time, although this is limited by the (often
107 unwritten) rules of the protocol.
108
109 In protocols based on this package, each message is a single text line,
110 terminated with a newline and consisting of a sequence of quoted strings.  The
111 first string determines the type of message.  For example:
112
113   SEND-MORE-MONEY $150.00 "Books and pencils"
114   ORDER-PIZZA Large Mushrooms Olives Onions "Green Peppers"
115
116 The package is asynchronous (see L<Amanda::MainLoop>), triggering callbacks for
117 incoming messages rather than implementing a C<get_message> method or the like.
118 If necessary, outgoing messages are queued for later transmission, thus
119 avoiding deadlocks from full pipe buffers.  This allows processing to continue
120 unhindered in both processes while messages are in transit in either direction.
121
122 =head2 DEFINING A PROTOCOL
123
124 There are two parts to any use of this package.  First, define the protocol by
125 creating a subclass and populating it using the C<message> package method.
126 This begins with something like
127
128   package CollegeProtocol;
129   use base "Amanda::IPC::LineProtocol";
130   use Amanda::IPC::LineProtocol;
131
132 The usual trick for specifying messages is to simultaneously define a series of
133 constants, using the following idiom:
134
135   use constant ORDER_PIZZA => message("ORDER-PIZZA",
136     match => qr/^ORDER-PIZZA$/,
137     format => [ qw( size toppings* ) ],
138   );
139
140 The first argument to C<message> is the word with which this particular message
141 type will be sent.  The C<match> parameter gives a regular expression which
142 will be used to recognize incoming messages of this type.   If this parameter
143 is not specified, the default is to match the first argument with a
144 case-insensitive regexp.
145
146 The C<format> parameter describes the format of the arguments for this message
147 type.  A format parameter with the C<*> suffix gathers all remaining arguments
148 into a list.  A C<?> suffix indicates an optional parameter.  Note that it is
149 quite possible to specify ambiguous formats which will not work like you
150 expect.  The default format is an empty list (taking no arguments).
151
152 The optional C<on_eof> parameter will cause a a message of this type to be
153 generated on EOF.  For example, with:
154
155   use constant DROP_OUT => message("DROP-OUT",
156     on_eof => 1,
157   );
158
159 when an EOF is detected, a C<DROP_OUT> message will be generated.
160
161 The protocol class should contain, in POD, a full description of the syntax of
162 the protcol -- which messages may be sent when, and what they mean.  No
163 facility is provided to encode this description in perl.
164
165 In general, protocols are expected to be symmetrical -- any message can either
166 be sent or received.  However, some existing protocols use different formats in
167 different directions.  In this case, specify C<format> as a hashref with keys
168 C<in> and C<out> pointing to the two different formats:
169
170   use constant ERROR => message("ERROR",
171     match => qr/^ERROR$/,
172     format => { in => [ qw( message severity ) ],
173                 out => [ qw( component severity message ) ] },
174   );
175
176 =head2 USING A PROTOCOL
177
178 Once a protocol is defined, it forms a class which can be used to run the
179 protocol.  Multiple instances of this class can be created to handle
180 simultaneous uses of the protocol over different channels.
181
182 The constructor, C<new>, takes two C<IO::Handle> objects -- one to read from
183 (C<rx_fh>) and one to write to (C<tx_fh>).  In some cases (e.g., a socket),
184 these may be the same handle.  It takes an optional callback, C<message_cb>,
185 which will be called for any received messages not handled by a more specific
186 callback.  Any other parameters are considered message-type-specific callbacks.
187
188 For example, given a socket handle C<$sockh>, the following will start the
189 C<CollegeProtocol> running on that socket:
190
191   my $proto = CollegeProtocol->new(
192     rx_fh => $sockh,
193     tx_fh => $sockh,
194   );
195   $proto->set_message_cb(CollegeProtocol::PIZZA_DELIVERY, $pizza_delivery_cb);
196
197 For protocols with a lot of message types, it may be useful to have the
198 protocol call methods on an object.  This is done with the C<message_obj>
199 argument to the protocol constructor:
200
201   $proto = CollegeProtocol->new( # ..
202     message_obj => $obj);
203
204 The methods are named C<msg_$msgname>, where $msgname has all non-identifier
205 characters translated to an underscore (C<_>).  For situations where the meaning
206 of a message can change dynamically, it may be useful to set a callback after
207 the object has been crated:
208
209   $proto->set_message_cb(CollegeProtocol::MIDTERM,
210     sub { ... });
211
212 The constructor also takes a 'debug' argument; if given, then all incoming and
213 outgoing messages will be written to the debug log with this argument as
214 prefix.
215
216 All message callbacks have the same signature:
217
218   my $pizza_delivery_cb = make_cb(pizza_delivery_cb => sub {
219     # (note that object methods will get the usual $self)
220     my ($msgtype, %params) = @_;
221   });
222
223 where C<%params> contains all of the arguments to the message, keyed by the
224 argument names given in the message's C<format>.  Note that parameters
225 specified with the C<*> suffix will appear as arrayrefs.
226
227 Callbacks specified with C<set_message_cb> take precedence over other
228 specifications; next are message-specific callbacks given to the constructor,
229 followed by C<message_obj>, and finally C<message_cb>.
230
231 In case of an error, the C<message_cb> (if specified) is called with
232 C<$msgtype> undefined and with a single parameter named C<error> giving the
233 error message.  This generally indicates either an unknown or badly-formatted
234 message.
235
236 To send a message, use the C<send> method, which takes the same arguments as a
237 message callback:
238
239   $proto->send(CollegeProtocol::SEND_MORE_MONEY,
240     how_much => "$150.00",
241     what_for => "Books and pencils");
242
243 =cut
244
245 use Exporter ();
246 our @ISA = qw( Exporter );
247 our @EXPORT = qw( message new );
248
249 use IO::Handle;
250 use POSIX qw( :errno_h );
251 use strict;
252 use warnings;
253 use Carp;
254
255 use Amanda::Debug qw( debug );
256 use Amanda::MainLoop qw( :GIOCondition make_cb );
257 use Amanda::Util;
258
259 ##
260 # Package methods to support protocol definition
261
262 my %msgspecs_by_protocol;
263 sub message {
264     my ($name, @params) = @_;
265
266     my $msgspec = $msgspecs_by_protocol{caller()}->{$name} = { @params };
267
268     # do some parameter sanity checks
269     my $param;
270     my @allowed_params = qw( match format on_eof );
271     for $param (keys %$msgspec) {
272         die "invalid message() parameter '$param'"
273             unless grep { $_ eq $param } @allowed_params;
274     }
275
276     # normalize the results a little bit
277     $msgspec->{'name'} = $name;
278
279     if (!exists $msgspec->{'match'}) {
280         $msgspec->{'match'} = qr/^$msgspec->{'name'}$/i;
281     }
282     if (!exists $msgspec->{'format'}) {
283         $msgspec->{'format'} = [];
284     }
285
286     # calculate a method name
287     my $methname = "msg_$name";
288     $methname =~ tr/a-zA-Z0-9/_/c;
289     $msgspec->{'methname'} = $methname;
290
291     return $name;
292 }
293
294 ##
295 # class methods
296
297 sub new {
298     my $class = shift;
299     my %params = @_;
300
301     my $self = bless {
302         stopped => 0,
303         debug => $params{'debug'},
304
305         rx_fh => $params{'rx_fh'},
306         rx_fh_tty => 0,
307         rx_buffer => '',
308         rx_source => undef,
309
310         tx_fh => $params{'tx_fh'},
311         tx_fh_tty => 0,
312         tx_source => undef,
313         tx_finished_cb => undef,
314         tx_outstanding_writes => 0,
315
316         cmd_cbs => {},
317         message_obj => $params{'message_obj'},
318         default_cb => $params{'message_cb'},
319
320         # a ref to the existing structure
321         msgspecs => $msgspecs_by_protocol{$class},
322     }, $class;
323
324     # set nonblocking mode on both file descriptor, but only for non-tty
325     # handles -- non-blocking tty's don't work well at all.
326     if (POSIX::isatty($self->{'rx_fh'}->fileno())) {
327         $self->{'rx_fh_tty'} = 1;
328     } else {
329         if (!defined($self->{'rx_fh'}->blocking(0))) {
330             die("Could not make protocol filehandle non-blocking");
331         }
332     }
333
334     if (POSIX::isatty($self->{'tx_fh'}->fileno())) {
335         $self->{'tx_fh_tty'} = 1;
336     } else {
337         if (!defined($self->{'tx_fh'}->blocking(0))) {
338             die("Could not make protocol filehandle non-blocking");
339         }
340     }
341
342     # start reading..
343     $self->{'rx_source'} = Amanda::MainLoop::async_read(
344         fd => $self->{'rx_fh'}->fileno(),
345         async_read_cb => sub { $self->_async_read_cb(@_); });
346
347     return $self;
348 }
349
350 sub set_message_cb {
351     my $self = shift;
352     my ($name, $message_cb) = @_;
353
354     $self->{'cmd_cbs'}->{$name} = $message_cb;
355 }
356
357 sub stop {
358     my $self = shift;
359     my %params = @_;
360
361     $self->{'stopped'} = 1;
362
363     # abort listening for incoming data
364     if (defined $self->{'rx_source'}) {
365         $self->{'rx_source'}->remove();
366     }
367
368     # and flush any outgoing messages
369     if ($self->{'tx_outstanding_writes'} > 0) {
370         $self->{'tx_finished_cb'} = $params{'finished_cb'};
371     } else {
372         $params{'finished_cb'}->();
373     }
374 }
375
376 sub send {
377     my $self = shift;
378     my ($name, %info) = @_;
379
380     my $msgspec = $self->{'msgspecs'}->{$name};
381     die "No message spec for '$name'" unless defined($msgspec);
382
383     my @line = $msgspec->{'name'};
384
385     my $format = $msgspec->{'format'};
386     $format = $format->{'out'} if (ref $format eq "HASH");
387
388     for my $elt (@$format) {
389         my ($name, $kind)= ($elt =~ /^(.*?)([*?]?)$/);
390         my $val = $info{$name};
391         if (!defined $val) {
392             croak "Value for '$name' is undefined";
393         }
394
395         if ($kind eq "*") {
396             croak "message key '$name' must be an array"
397                 unless defined $val and ref($val) eq "ARRAY";
398             push @line, @$val;
399         } else {
400             croak "message key '$name' is required"
401                 unless defined $val or $kind eq "?";
402             push @line, $val if defined $val;
403         }
404     }
405
406     my $line = join(" ", map { Amanda::Util::quote_string("$_") } @line);
407     debug($self->{'debug'} . " >> $line") if ($self->{'debug'});
408     $line .= "\n";
409
410     ++$self->{'tx_outstanding_writes'};
411     my $write_done_cb = make_cb(write_done_cb => sub {
412         my ($err, $nbytes) = @_;
413
414         if ($err) {
415             # TODO: handle this better
416             die $err;
417         }
418
419         # call the protocol's finished_cb if necessary
420         if (--$self->{'tx_outstanding_writes'} == 0 and $self->{'tx_finished_cb'}) {
421             $self->{'tx_finished_cb'}->();
422         }
423     });
424     $self->{'tx_source'} = Amanda::MainLoop::async_write(
425         fd => $self->{'tx_fh'}->fileno(),
426         data => $line,
427         async_write_cb => $write_done_cb);
428 }
429
430 ##
431 # Handle incoming messages
432
433 sub _find_msgspec {
434     my $self = shift;
435     my ($cmdstr) = @_;
436
437     for my $msgspec (values %{$self->{'msgspecs'}}) {
438         my $match = $msgspec->{'match'};
439         next unless defined($match);
440         return $msgspec if ($cmdstr =~ $match);
441     }
442
443     return undef;
444 }
445
446 sub _parse_line {
447     my $self = shift;
448     my ($msgspec, @line) = @_;
449
450     # parse the message according to the "in" format
451     my $format = $msgspec->{'format'};
452     $format = $format->{'in'} if (ref $format eq "HASH");
453
454     my $args = {};
455     for my $elt (@$format) {
456         my ($name, $kind)= ($elt =~ /^(.*?)([*?]?)$/);
457
458         if ($kind eq "*") {
459             $args->{$name} = [ @line ];
460             @line = ();
461             last;
462         }
463
464         next if ($kind eq "?" and !@line);
465
466         if (!@line) {
467             return "too few arguments to '$msgspec->{name}': first missing argument is $name";
468         }
469
470         $args->{$name} = shift @line;
471     }
472
473     if (@line) {
474         return "too many arguments to '$msgspec->{name}': first unmatched argument is '$line[0]'";
475     }
476
477     return (undef, $args);
478 }
479
480 sub _call_message_cb {
481     my $self = shift;
482     my ($msgspec, $line, $args) = @_;
483
484     # after the user calls stop(), don't call any more callbacks
485     return if $self->{'stopped'};
486
487     # send a bogus line message to the default_cb if there's no msgspec
488     if (!defined $msgspec) {
489         if ($self->{'default_cb'}) {
490             $self->{'default_cb'}->(undef, %$args);
491         } else {
492             debug("IPC: " . ($args->{'error'} or "bogus line '$line'"));
493         }
494         return;
495     }
496
497     # otherwise, call the relevant callback
498     if (exists $self->{'cmd_cbs'}{$msgspec->{'name'}}) {
499         return $self->{'cmd_cbs'}{$msgspec->{'name'}}->($msgspec->{'name'}, %$args);
500     }
501     
502     if (defined $self->{'message_obj'} and $self->{'message_obj'}->can($msgspec->{'methname'})) {
503         my $methname = $msgspec->{'methname'};
504         return $self->{'message_obj'}->$methname($msgspec->{'name'}, %$args);
505     } 
506     
507     if ($self->{'default_cb'}) {
508         return $self->{'default_cb'}->($msgspec->{'name'}, %$args);
509     }
510
511     warn "IPC: Ignored unhandled line '$line'";
512 }
513
514 sub _incoming_line {
515     my $self = shift;
516     my ($line) = @_;
517
518     $line =~ s/\n//g;
519     return unless $line;
520
521     debug($self->{'debug'} . " << $line") if ($self->{'debug'});
522
523     # turn the line into a list of strings..
524     my @line = Amanda::Util::split_quoted_strings($line);
525     return unless @line;
526
527     # get the specification for this message
528     my $msgspec = $self->_find_msgspec(shift @line);
529     if (!defined $msgspec) {
530         $self->_call_message_cb(undef, $line, {error => 'unknown command'});
531         return;
532     }
533
534     my ($parserr, $args) = $self->_parse_line($msgspec, @line);
535     if ($parserr) {
536         $self->_call_message_cb(undef, $line, {error => $parserr});
537         return;
538     }
539
540     $self->_call_message_cb($msgspec, $line, $args);
541 }
542
543 sub _incoming_eof {
544     my $self = shift;
545
546     # handle a final line, even without a newline (is this wise?)
547     if ($self->{'rx_buffer'} ne '') {
548         $self->_incoming_line($self->{'rx_buffer'} . "\n");
549     }
550
551     # find the EOF msgspec and call it
552     for my $msgspec (values %{$self->{'msgspecs'}}) {
553         if ($msgspec->{'on_eof'}) {
554             $self->_call_message_cb($msgspec, "(EOF)", {});
555             last;
556         }
557     }
558 }
559
560 sub _async_read_cb {
561     my $self = shift;
562     my ($err, $data) = @_;
563
564     if (defined $err) {
565         # TODO: call an error_handler given to new()?
566         die $err;
567     }
568
569     if (!$data) {
570         $self->_incoming_eof();
571         return;
572     }
573
574     # set up to read the next chunk
575     $self->{'rx_source'} = Amanda::MainLoop::async_read(
576         fd => $self->{'rx_fh'}->fileno(),
577         async_read_cb => sub { $self->_async_read_cb(@_); });
578
579     # and process this data
580     $self->{'rx_buffer'} .= $data;
581
582     while ($self->{'rx_buffer'} =~ /\n/) {
583         my ($line, $rest) = split '\n', $self->{'rx_buffer'}, 2;
584         $self->{'rx_buffer'} = $rest;
585         $self->_incoming_line($line);
586     }
587 }
588
589 1;