Imported Upstream version 3.3.3
[debian/amanda] / perl / Amanda / IPC / LineProtocol.pm
1 # Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11 # for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 #
17 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
18 # Sunnyvale, CA 94085, USA, or: http://www.zmanda.com
19
20 package Amanda::IPC::LineProtocol;
21 =head1 NAME
22
23 Amanda::IPC::LineProtocol -- parent class for line-based protocols
24
25 =head1 SYNOPSIS
26
27 Define your protocol:
28
29     packge MyProtocol;
30     use Amanda::IPC::LineProtocol;
31     use base "Amanda::IPC::LineProtocol";
32
33     use constant SETSTATUS => message("SETSTATUS",
34         match => qr/^FOO$/i,
35         format => [ qw( param1 param2 optional? list* ) ],
36     );
37     use constant PING => message("PING",
38         match => qr/^PING$/i,
39         format => [ qw( id ) ],
40     );
41     use constant PONG => message("PONG",
42         match => qr/^PONG$/i,
43         format => [ qw( id ) ],
44     );
45     # ...
46
47     # And use the protocol
48     package main;
49     my $input_fh = IO::Handle->new(...);
50     my $output_fh = IO::Handle->new(...);
51     my $proto;
52
53     my $ping_cb = make_cb(ping_cb => sub {
54         my ($msg, %args) = @_;
55         $proto->send(MyProtocol::PONG, id => $args{'id'});
56     });
57
58     my $message_cb = make_cb(message_cb => sub {
59         my ($msg, %args) = @_;
60         if (!$msg) {
61             die $args{'error'};
62         }
63     });
64
65     $proto = MyProtocol->new(
66             rx_fh => $input_fh,
67             tx_fh => $output_fh,
68             message_cb => $message_cb);
69
70     # and add callbacks
71     $proto->set_message_cb(MyProtocol::PING, $ping_cb);
72     $proto->set_message_cb(MyProtocol::PONG, $pong_cb);
73
74     # or send messages to an object, with method names based on
75     # the message name
76     sub msg_PONG {
77         my $self = shift;
78         my ($msg, %args) = @_;
79     }
80     # ..
81     $proto = MyProtocol->new( # ..
82             message_obj => $self);
83
84     # send a message
85     $proto->send(MyProtocol::SETSTATUS,
86         param1 => "x",
87         param2 => "y",
88         );
89
90     # shut down the protocol, flushing any messages waiting to
91     # be sent first
92     my $finished_cb = make_cb(finished_cb => sub {
93         my ($err) = @_;
94         # ...
95     });
96     $proto->stop(finished_cb => $finished_cb);
97
98 =head1 DESCRIPTION
99
100 This library is used to implement communications between Amanda processes.
101 Amanda has historically implemented a number of distinct text-based protocols
102 for communications between various components, and this library servces to
103 abstract and centralize the implementation of those protocols.
104
105 The package supports point-to-point, message-based, symmetric protocols.  Two
106 communicating processes exchange discrete messages, and in principle either
107 process can send a message at any time, although this is limited by the (often
108 unwritten) rules of the protocol.
109
110 In protocols based on this package, each message is a single text line,
111 terminated with a newline and consisting of a sequence of quoted strings.  The
112 first string determines the type of message.  For example:
113
114   SEND-MORE-MONEY $150.00 "Books and pencils"
115   ORDER-PIZZA Large Mushrooms Olives Onions "Green Peppers"
116
117 The package is asynchronous (see L<Amanda::MainLoop>), triggering callbacks for
118 incoming messages rather than implementing a C<get_message> method or the like.
119 If necessary, outgoing messages are queued for later transmission, thus
120 avoiding deadlocks from full pipe buffers.  This allows processing to continue
121 unhindered in both processes while messages are in transit in either direction.
122
123 =head2 DEFINING A PROTOCOL
124
125 There are two parts to any use of this package.  First, define the protocol by
126 creating a subclass and populating it using the C<message> package method.
127 This begins with something like
128
129   package CollegeProtocol;
130   use base "Amanda::IPC::LineProtocol";
131   use Amanda::IPC::LineProtocol;
132
133 The usual trick for specifying messages is to simultaneously define a series of
134 constants, using the following idiom:
135
136   use constant ORDER_PIZZA => message("ORDER-PIZZA",
137     match => qr/^ORDER-PIZZA$/,
138     format => [ qw( size toppings* ) ],
139   );
140
141 The first argument to C<message> is the word with which this particular message
142 type will be sent.  The C<match> parameter gives a regular expression which
143 will be used to recognize incoming messages of this type.   If this parameter
144 is not specified, the default is to match the first argument with a
145 case-insensitive regexp.
146
147 The C<format> parameter describes the format of the arguments for this message
148 type.  A format parameter with the C<*> suffix gathers all remaining arguments
149 into a list.  A C<?> suffix indicates an optional parameter.  Note that it is
150 quite possible to specify ambiguous formats which will not work like you
151 expect.  The default format is an empty list (taking no arguments).
152
153 The optional C<on_eof> parameter will cause a a message of this type to be
154 generated on EOF.  For example, with:
155
156   use constant DROP_OUT => message("DROP-OUT",
157     on_eof => 1,
158   );
159
160 when an EOF is detected, a C<DROP_OUT> message will be generated.
161
162 The protocol class should contain, in POD, a full description of the syntax of
163 the protcol -- which messages may be sent when, and what they mean.  No
164 facility is provided to encode this description in perl.
165
166 In general, protocols are expected to be symmetrical -- any message can either
167 be sent or received.  However, some existing protocols use different formats in
168 different directions.  In this case, specify C<format> as a hashref with keys
169 C<in> and C<out> pointing to the two different formats:
170
171   use constant ERROR => message("ERROR",
172     match => qr/^ERROR$/,
173     format => { in => [ qw( message severity ) ],
174                 out => [ qw( component severity message ) ] },
175   );
176
177 =head2 USING A PROTOCOL
178
179 Once a protocol is defined, it forms a class which can be used to run the
180 protocol.  Multiple instances of this class can be created to handle
181 simultaneous uses of the protocol over different channels.
182
183 The constructor, C<new>, takes two C<IO::Handle> objects -- one to read from
184 (C<rx_fh>) and one to write to (C<tx_fh>).  In some cases (e.g., a socket),
185 these may be the same handle.  It takes an optional callback, C<message_cb>,
186 which will be called for any received messages not handled by a more specific
187 callback.  Any other parameters are considered message-type-specific callbacks.
188
189 For example, given a socket handle C<$sockh>, the following will start the
190 C<CollegeProtocol> running on that socket:
191
192   my $proto = CollegeProtocol->new(
193     rx_fh => $sockh,
194     tx_fh => $sockh,
195   );
196   $proto->set_message_cb(CollegeProtocol::PIZZA_DELIVERY, $pizza_delivery_cb);
197
198 For protocols with a lot of message types, it may be useful to have the
199 protocol call methods on an object.  This is done with the C<message_obj>
200 argument to the protocol constructor:
201
202   $proto = CollegeProtocol->new( # ..
203     message_obj => $obj);
204
205 The methods are named C<msg_$msgname>, where $msgname has all non-identifier
206 characters translated to an underscore (C<_>).  For situations where the meaning
207 of a message can change dynamically, it may be useful to set a callback after
208 the object has been crated:
209
210   $proto->set_message_cb(CollegeProtocol::MIDTERM,
211     sub { ... });
212
213 The constructor also takes a 'debug' argument; if given, then all incoming and
214 outgoing messages will be written to the debug log with this argument as
215 prefix.
216
217 All message callbacks have the same signature:
218
219   my $pizza_delivery_cb = make_cb(pizza_delivery_cb => sub {
220     # (note that object methods will get the usual $self)
221     my ($msgtype, %params) = @_;
222   });
223
224 where C<%params> contains all of the arguments to the message, keyed by the
225 argument names given in the message's C<format>.  Note that parameters
226 specified with the C<*> suffix will appear as arrayrefs.
227
228 Callbacks specified with C<set_message_cb> take precedence over other
229 specifications; next are message-specific callbacks given to the constructor,
230 followed by C<message_obj>, and finally C<message_cb>.
231
232 In case of an error, the C<message_cb> (if specified) is called with
233 C<$msgtype> undefined and with a single parameter named C<error> giving the
234 error message.  This generally indicates either an unknown or badly-formatted
235 message.
236
237 To send a message, use the C<send> method, which takes the same arguments as a
238 message callback:
239
240   $proto->send(CollegeProtocol::SEND_MORE_MONEY,
241     how_much => "$150.00",
242     what_for => "Books and pencils");
243
244 =cut
245
246 use Exporter ();
247 our @ISA = qw( Exporter );
248 our @EXPORT = qw( message new );
249
250 use IO::Handle;
251 use POSIX qw( :errno_h );
252 use strict;
253 use warnings;
254 use Carp;
255
256 use Amanda::Debug qw( debug );
257 use Amanda::MainLoop qw( :GIOCondition make_cb );
258 use Amanda::Util;
259
260 ##
261 # Package methods to support protocol definition
262
263 my %msgspecs_by_protocol;
264 sub message {
265     my ($name, @params) = @_;
266
267     my $msgspec = $msgspecs_by_protocol{caller()}->{$name} = { @params };
268
269     # do some parameter sanity checks
270     my $param;
271     my @allowed_params = qw( match format on_eof );
272     for $param (keys %$msgspec) {
273         die "invalid message() parameter '$param'"
274             unless grep { $_ eq $param } @allowed_params;
275     }
276
277     # normalize the results a little bit
278     $msgspec->{'name'} = $name;
279
280     if (!exists $msgspec->{'match'}) {
281         $msgspec->{'match'} = qr/^$msgspec->{'name'}$/i;
282     }
283     if (!exists $msgspec->{'format'}) {
284         $msgspec->{'format'} = [];
285     }
286
287     # calculate a method name
288     my $methname = "msg_$name";
289     $methname =~ tr/a-zA-Z0-9/_/c;
290     $msgspec->{'methname'} = $methname;
291
292     return $name;
293 }
294
295 ##
296 # class methods
297
298 sub new {
299     my $class = shift;
300     my %params = @_;
301
302     my $self = bless {
303         stopped => 0,
304         debug => $params{'debug'},
305
306         rx_fh => $params{'rx_fh'},
307         rx_fh_tty => 0,
308         rx_buffer => '',
309         rx_source => undef,
310
311         tx_fh => $params{'tx_fh'},
312         tx_fh_tty => 0,
313         tx_source => undef,
314         tx_finished_cb => undef,
315         tx_outstanding_writes => 0,
316
317         cmd_cbs => {},
318         message_obj => $params{'message_obj'},
319         default_cb => $params{'message_cb'},
320
321         # a ref to the existing structure
322         msgspecs => $msgspecs_by_protocol{$class},
323     }, $class;
324
325     # set nonblocking mode on both file descriptor, but only for non-tty
326     # handles -- non-blocking tty's don't work well at all.
327     if (POSIX::isatty($self->{'rx_fh'}->fileno())) {
328         $self->{'rx_fh_tty'} = 1;
329     } else {
330         if (!defined($self->{'rx_fh'}->blocking(0))) {
331             die("Could not make protocol filehandle non-blocking");
332         }
333     }
334
335     if (POSIX::isatty($self->{'tx_fh'}->fileno())) {
336         $self->{'tx_fh_tty'} = 1;
337     } else {
338         if (!defined($self->{'tx_fh'}->blocking(0))) {
339             die("Could not make protocol filehandle non-blocking");
340         }
341     }
342
343     # start reading..
344     $self->{'rx_source'} = Amanda::MainLoop::async_read(
345         fd => $self->{'rx_fh'}->fileno(),
346         async_read_cb => sub { $self->_async_read_cb(@_); });
347
348     return $self;
349 }
350
351 sub set_message_cb {
352     my $self = shift;
353     my ($name, $message_cb) = @_;
354
355     $self->{'cmd_cbs'}->{$name} = $message_cb;
356 }
357
358 sub stop {
359     my $self = shift;
360     my %params = @_;
361
362     $self->{'stopped'} = 1;
363
364     # abort listening for incoming data
365     if (defined $self->{'rx_source'}) {
366         $self->{'rx_source'}->remove();
367     }
368
369     # and flush any outgoing messages
370     if ($self->{'tx_outstanding_writes'} > 0) {
371         $self->{'tx_finished_cb'} = $params{'finished_cb'};
372     } else {
373         $params{'finished_cb'}->();
374     }
375 }
376
377 sub send {
378     my $self = shift;
379     my ($name, %info) = @_;
380
381     my $msgspec = $self->{'msgspecs'}->{$name};
382     die "No message spec for '$name'" unless defined($msgspec);
383
384     my @line = $msgspec->{'name'};
385
386     my $format = $msgspec->{'format'};
387     $format = $format->{'out'} if (ref $format eq "HASH");
388
389     for my $elt (@$format) {
390         my ($name, $kind)= ($elt =~ /^(.*?)([*?]?)$/);
391         my $val = $info{$name};
392         if (!defined $val) {
393             croak "Value for '$name' is undefined";
394         }
395
396         if ($kind eq "*") {
397             croak "message key '$name' must be an array"
398                 unless defined $val and ref($val) eq "ARRAY";
399             push @line, @$val;
400         } else {
401             croak "message key '$name' is required"
402                 unless defined $val or $kind eq "?";
403             push @line, $val if defined $val;
404         }
405     }
406
407     my $line = join(" ", map { Amanda::Util::quote_string("$_") } @line);
408     debug($self->{'debug'} . " >> $line") if ($self->{'debug'});
409     $line .= "\n";
410
411     ++$self->{'tx_outstanding_writes'};
412     my $write_done_cb = make_cb(write_done_cb => sub {
413         my ($err, $nbytes) = @_;
414
415         if ($err) {
416             # TODO: handle this better
417             die $err;
418         }
419
420         # call the protocol's finished_cb if necessary
421         if (--$self->{'tx_outstanding_writes'} == 0 and $self->{'tx_finished_cb'}) {
422             $self->{'tx_finished_cb'}->();
423         }
424     });
425     $self->{'tx_source'} = Amanda::MainLoop::async_write(
426         fd => $self->{'tx_fh'}->fileno(),
427         data => $line,
428         async_write_cb => $write_done_cb);
429 }
430
431 ##
432 # Handle incoming messages
433
434 sub _find_msgspec {
435     my $self = shift;
436     my ($cmdstr) = @_;
437
438     for my $msgspec (values %{$self->{'msgspecs'}}) {
439         my $match = $msgspec->{'match'};
440         next unless defined($match);
441         return $msgspec if ($cmdstr =~ $match);
442     }
443
444     return undef;
445 }
446
447 sub _parse_line {
448     my $self = shift;
449     my ($msgspec, @line) = @_;
450
451     # parse the message according to the "in" format
452     my $format = $msgspec->{'format'};
453     $format = $format->{'in'} if (ref $format eq "HASH");
454
455     my $args = {};
456     for my $elt (@$format) {
457         my ($name, $kind)= ($elt =~ /^(.*?)([*?]?)$/);
458
459         if ($kind eq "*") {
460             $args->{$name} = [ @line ];
461             @line = ();
462             last;
463         }
464
465         next if ($kind eq "?" and !@line);
466
467         if (!@line) {
468             return "too few arguments to '$msgspec->{name}': first missing argument is $name";
469         }
470
471         $args->{$name} = shift @line;
472     }
473
474     if (@line) {
475         return "too many arguments to '$msgspec->{name}': first unmatched argument is '$line[0]'";
476     }
477
478     return (undef, $args);
479 }
480
481 sub _call_message_cb {
482     my $self = shift;
483     my ($msgspec, $line, $args) = @_;
484
485     # after the user calls stop(), don't call any more callbacks
486     return if $self->{'stopped'};
487
488     # send a bogus line message to the default_cb if there's no msgspec
489     if (!defined $msgspec) {
490         if ($self->{'default_cb'}) {
491             $self->{'default_cb'}->(undef, %$args);
492         } else {
493             debug("IPC: " . ($args->{'error'} or "bogus line '$line'"));
494         }
495         return;
496     }
497
498     # otherwise, call the relevant callback
499     if (exists $self->{'cmd_cbs'}{$msgspec->{'name'}}) {
500         return $self->{'cmd_cbs'}{$msgspec->{'name'}}->($msgspec->{'name'}, %$args);
501     }
502     
503     if (defined $self->{'message_obj'} and $self->{'message_obj'}->can($msgspec->{'methname'})) {
504         my $methname = $msgspec->{'methname'};
505         return $self->{'message_obj'}->$methname($msgspec->{'name'}, %$args);
506     } 
507     
508     if ($self->{'default_cb'}) {
509         return $self->{'default_cb'}->($msgspec->{'name'}, %$args);
510     }
511
512     warn "IPC: Ignored unhandled line '$line'";
513 }
514
515 sub _incoming_line {
516     my $self = shift;
517     my ($line) = @_;
518
519     $line =~ s/\n//g;
520     return unless $line;
521
522     debug($self->{'debug'} . " << $line") if ($self->{'debug'});
523
524     # turn the line into a list of strings..
525     my @line = Amanda::Util::split_quoted_strings($line);
526     return unless @line;
527
528     # get the specification for this message
529     my $msgspec = $self->_find_msgspec(shift @line);
530     if (!defined $msgspec) {
531         $self->_call_message_cb(undef, $line, {error => 'unknown command'});
532         return;
533     }
534
535     my ($parserr, $args) = $self->_parse_line($msgspec, @line);
536     if ($parserr) {
537         $self->_call_message_cb(undef, $line, {error => $parserr});
538         return;
539     }
540
541     $self->_call_message_cb($msgspec, $line, $args);
542 }
543
544 sub _incoming_eof {
545     my $self = shift;
546
547     # handle a final line, even without a newline (is this wise?)
548     if ($self->{'rx_buffer'} ne '') {
549         $self->_incoming_line($self->{'rx_buffer'} . "\n");
550     }
551
552     # find the EOF msgspec and call it
553     for my $msgspec (values %{$self->{'msgspecs'}}) {
554         if ($msgspec->{'on_eof'}) {
555             $self->_call_message_cb($msgspec, "(EOF)", {});
556             last;
557         }
558     }
559 }
560
561 sub _async_read_cb {
562     my $self = shift;
563     my ($err, $data) = @_;
564
565     if (defined $err) {
566         # TODO: call an error_handler given to new()?
567         die $err;
568     }
569
570     if (!$data) {
571         $self->_incoming_eof();
572         return;
573     }
574
575     # set up to read the next chunk
576     $self->{'rx_source'} = Amanda::MainLoop::async_read(
577         fd => $self->{'rx_fh'}->fileno(),
578         async_read_cb => sub { $self->_async_read_cb(@_); });
579
580     # and process this data
581     $self->{'rx_buffer'} .= $data;
582
583     while ($self->{'rx_buffer'} =~ /\n/) {
584         my ($line, $rest) = split '\n', $self->{'rx_buffer'}, 2;
585         $self->{'rx_buffer'} = $rest;
586         $self->_incoming_line($line);
587     }
588 }
589
590 1;