Imported Upstream version 3.3.3
[debian/amanda] / installcheck / Amanda_IPC_LineProtocol.pl
1 # Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11 # for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 #
17 # Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
18 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
19
20 use Test::More tests => 6;
21 use strict;
22 use warnings;
23
24 use lib "@amperldir@";
25 use Installcheck;
26 use Amanda::IPC::LineProtocol;
27 use IO::Handle;
28 use Amanda::MainLoop;
29 use Amanda::Debug;
30 use Data::Dumper;
31 use Carp;
32
33 ##
34 # Define a test protocol
35
36 package TestProtocol;
37 use base "Amanda::IPC::LineProtocol";
38 use Amanda::IPC::LineProtocol;
39
40 use constant SIMPLE => message("SIMPLE",
41     format => [ qw( ) ],
42 );
43
44 use constant FOO => message("FOO",
45     format => [ qw( name? nicknames* ) ],
46 );
47
48 use constant FO => message("FO",  # prefix of "FOO"
49     format => [ qw( ) ],
50 );
51
52 use constant ASSYM => message("ASSYM",
53     format => {
54         in => [ qw( a b ) ],
55         out => [ qw( x ) ],
56     },
57 );
58
59 use constant BAR => message("BAR",
60     match => qr/^BA[Rh]$/i, # more elaborate regex
61     format => [ qw( mandatory optional? ) ],
62 );
63
64 use constant QUIT => message("QUIT",
65     match => qr/^QUIT$/i,
66     on_eof => 1,
67     format => [ qw( reason? ) ],
68 );
69
70 package main;
71
72 # set up debugging so debug output doesn't interfere with test results
73 Amanda::Debug::dbopen("installcheck");
74 Installcheck::log_test_output();
75
76 # and disable Debug's die() and warn() overrides
77 Amanda::Debug::disable_die_override();
78
79 # run $code in a separate process, with read and write handles hooked up, and returns
80 # read and write handles.
81 sub in_fork {
82     my ($code) = @_;
83
84     my ($parent_read, $child_write) = POSIX::pipe();
85     my ($child_read, $parent_write) = POSIX::pipe();
86
87     my $pid = fork();
88     if (!defined($pid) or $pid < 0) {
89         die("Can't fork: $!");
90     }
91
92     if (!$pid) {
93         ## child
94
95         # get our file-handle house in order
96         POSIX::close($parent_read);
97         POSIX::close($parent_write);
98
99         $code->(IO::Handle->new_from_fd($child_read, "r"),
100                 IO::Handle->new_from_fd($child_write, "w"));
101         POSIX::exit(0);
102     }
103
104     ## parent
105
106     POSIX::close($child_read);
107     POSIX::close($child_write);
108
109     return (IO::Handle->new_from_fd($parent_read, "r"),
110             IO::Handle->new_from_fd($parent_write, "w"),
111             $pid);
112 }
113
114 # generic "die" message_cb
115 my $message_cb = make_cb(message_cb => sub {
116     my ($msgtype, %params) = @_;
117     if (defined $msgtype) {
118         diag(Dumper(\%params));
119         die("unhandled message: $msgtype");
120     } else {
121         die("IPC error: $params{'error'}");
122     }
123 });
124
125 ##
126 # Run some tests
127
128 my $proto;
129 my @events;
130 my ($rx_fh, $tx_fh, $pid);
131
132 # on QUIT, stop the protocol and quit the mainloop
133 my $quit_cb = make_cb(quit_cb => sub {
134     push @events, [ @_ ];
135     $proto->stop(finished_cb => sub {
136         Amanda::MainLoop::quit();
137     });
138 });
139
140
141 #
142 # test a simple "QUIT"
143
144 @events = ();
145 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
146     my ($rdh, $wrh) = @_;
147     $wrh->autoflush(1);
148
149     $rdh->getline(); # get 'start\n'
150     $wrh->write("QUIT \"just because\"");
151 });
152
153 $proto = TestProtocol->new(
154     rx_fh => $rx_fh, tx_fh => $tx_fh,
155     message_cb => $message_cb);
156 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
157 Amanda::MainLoop::call_later(sub {
158     $tx_fh->autoflush(1);
159     $tx_fh->write("start\n");
160 });
161 Amanda::MainLoop::run();
162 waitpid($pid, 0);
163
164 is_deeply([ @events ],
165     [
166         [ "QUIT", reason => "just because" ],
167         [ "QUIT" ],
168     ],
169     "correct events for a simple 'QUIT \"just because\"")
170     or diag(Dumper(\@events));
171
172
173 ##
174 # test a bogus message
175
176 @events = ();
177 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
178     my ($rdh, $wrh) = @_;
179     $wrh->autoflush(1);
180
181     $rdh->getline(); # get 'start\n'
182     $wrh->write("SNARSBLAT, yo");
183 });
184
185 $proto = TestProtocol->new(
186     rx_fh => $rx_fh, tx_fh => $tx_fh,
187     message_cb => sub { push @events, [ @_ ]; });
188 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
189 Amanda::MainLoop::call_later(sub {
190     $tx_fh->autoflush(1);
191     $tx_fh->write("start\n");
192 });
193 Amanda::MainLoop::run();
194 waitpid($pid, 0);
195
196 is_deeply([ @events ],
197     [
198         [ undef, 'error' => 'unknown command' ],
199         [ "QUIT" ], # from EOF
200     ],
201     "bogus message handled correctly")
202     or diag(Dumper(\@events));
203
204
205 ##
206 # a more complex conversation
207
208 @events = ();
209 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
210     my ($rdh, $wrh) = @_;
211     $wrh->autoflush(1);
212
213     $wrh->write("FOO\n");
214     $rdh->getline() =~ /SIMPLE/ or die("bad response");
215
216     $wrh->write("FOO one\n");
217     $rdh->getline() =~ /SIMPLE/ or die("bad response");
218
219     $wrh->write("FOO one \"t w o\"\n");
220     $rdh->getline() =~ /SIMPLE/ or die("bad response");
221
222     $wrh->write("FOO one \"t w o\" three\n");
223     $rdh->getline() =~ /SIMPLE/ or die("bad response");
224 });
225
226 $proto = TestProtocol->new(
227     rx_fh => $rx_fh, tx_fh => $tx_fh,
228     message_cb => $message_cb);
229 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
230 $proto->set_message_cb(TestProtocol::FOO, sub {
231     push @events, [ shift @_, { @_ } ];
232     $proto->send(TestProtocol::SIMPLE);
233 });
234 Amanda::MainLoop::run();
235 waitpid($pid, 0);
236
237 is_deeply([ @events ],
238     [
239         [ "FOO", { nicknames => [] } ],
240         [ "FOO", { nicknames => [], name => "one" } ],
241         [ "FOO", { nicknames => [ "t w o" ], name => "one" } ],
242         [ "FOO", { nicknames => [ "t w o", "three" ], name => "one" } ],
243         [ "QUIT" ],
244     ],
245     "correct events for a few conversation steps, parsing")
246     or diag(Dumper(\@events));
247
248 ##
249 # Asymmetrical formats
250
251 @events = ();
252 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
253     my ($rdh, $wrh) = @_;
254     $wrh->autoflush(1);
255
256     $wrh->write("ASSYM 1 2\n");
257     $rdh->getline() =~ /ASSYM a/ or die("bad response");
258 });
259
260 $proto = TestProtocol->new(
261     rx_fh => $rx_fh, tx_fh => $tx_fh,
262     message_cb => $message_cb);
263 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
264 $proto->set_message_cb(TestProtocol::ASSYM, sub {
265         push @events, [ shift @_, { @_ } ];
266         $proto->send(TestProtocol::ASSYM, x => "a");
267     });
268 Amanda::MainLoop::run();
269 waitpid($pid, 0);
270
271 is_deeply([ @events ],
272     [
273         [ "ASSYM", { a => "1", b => "2" } ],
274         [ "QUIT" ],
275     ],
276     "correct events for asymmetric message format")
277     or diag(Dumper(\@events));
278
279
280 ##
281 # test queueing up of messages on writing.
282
283 # The idea here is to write more than a pipe buffer can hold, while the child
284 # process does not read that data, and then to signal the child process,
285 # causing it to read all of that data, write a reply, and exit.  Recent linuxes
286 # have a pipe buffer of 64k, so we exceed that threshold.  We use an 'alarm' to
287 # fail in the case that this blocks.
288
289 my $NMSGS = 10000;
290
291 @events = ();
292 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
293     my ($rdh, $wrh) = @_;
294     $wrh->autoflush(1);
295
296     # on USR1, read lots of inputs
297     $SIG{'USR1'} = sub {
298         for (my $i = 0; $i < $NMSGS; $i++) {
299             $rdh->getline();
300         }
301
302         # send a message that the parent can hope to get
303         $wrh->write("BAR \"got your inputs\"\n");
304
305         # and bail out
306         POSIX::exit(0);
307     };
308
309     $wrh->write("SIMPLE\n");
310
311     # and sleep forever, or until killed.
312     while (1) { sleep(100); }
313 });
314
315 $proto = TestProtocol->new(
316     rx_fh => $rx_fh, tx_fh => $tx_fh,
317     message_cb => $message_cb);
318 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
319 $proto->set_message_cb(TestProtocol::SIMPLE, sub {
320         push @events, [ shift @_ ];
321         # send $NMSGS messages to the child, which isn't listening yet!
322         for (my $i = 0; $i < $NMSGS; $i++) {
323             $proto->send(TestProtocol::SIMPLE);
324         }
325         # and then send it SIGUSR1, so it reads those
326         kill USR1 => $pid;
327     });
328 $proto->set_message_cb(TestProtocol::BAR, sub {
329         push @events, [ shift @_, { @_ } ];
330     });
331
332 # die after 10 minutes
333 alarm 600;
334
335 Amanda::MainLoop::run();
336 waitpid($pid, 0);
337 alarm 0; # cancel the alarm
338
339 is_deeply([ @events ],
340     [
341         [ "SIMPLE" ],
342         [ "BAR", { mandatory => "got your inputs" } ],
343         [ "QUIT" ],
344     ],
345     "write buffering handled correctly")
346     or diag(Dumper(\@events));
347
348 ##
349 # test the message_obj functionality
350
351 package main::MessageObj;
352
353 sub msg_FOO {
354     my $self = shift;
355     push @{$self}, [ shift @_, { @_ } ];
356     $proto->send(TestProtocol::SIMPLE);
357 }
358
359 sub msg_BAR {
360     my $self = shift;
361     push @{$self}, [ shift @_, { @_ } ];
362     $proto->send(TestProtocol::SIMPLE);
363 }
364
365 package main;
366
367 @events = ();
368 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
369     my ($rdh, $wrh) = @_;
370     $wrh->autoflush(1);
371
372     $wrh->write("FOO\n");
373     $rdh->getline() =~ /SIMPLE/ or die("bad response");
374
375     $wrh->write("BAR one\n");
376     $rdh->getline() =~ /SIMPLE/ or die("bad response");
377
378     $wrh->write("BAH one \"t w o\"\n"); # note alternate spelling "BAH"
379     $rdh->getline() =~ /SIMPLE/ or die("bad response");
380
381     $wrh->write("FOO one \"t w o\" three\n");
382     $rdh->getline() =~ /SIMPLE/ or die("bad response");
383 });
384
385 $proto = TestProtocol->new(
386     rx_fh => $rx_fh, tx_fh => $tx_fh,
387     message_obj => bless(\@events, "main::MessageObj"));
388 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
389 Amanda::MainLoop::run();
390 waitpid($pid, 0);
391
392 is_deeply([ @events ],
393     [ [ 'FOO', { 'nicknames' => [] } ],
394       [ 'BAR', { 'mandatory' => 'one' } ],
395       [ 'BAR', { 'mandatory' => 'one', 'optional' => 't w o' } ],
396       [ 'FOO', { 'name' => 'one', 'nicknames' => [ 't w o', 'three' ] } ],
397       [ 'QUIT' ],
398     ],
399     "message_obj works")
400     or diag(Dumper(\@events));
401