5717bfceaad8a19cd222080f1513b3b37e206766
[debian/amanda] / installcheck / Amanda_IPC_LineProtocol.pl
1 # Copyright (c) 2009 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License version 2 as published
5 # by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
10 # for more details.
11 #
12 # You should have received a copy of the GNU General Public License along
13 # with this program; if not, write to the Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
15 #
16 # Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
18
19 use Test::More tests => 6;
20 use strict;
21 use warnings;
22
23 use lib "@amperldir@";
24 use Installcheck;
25 use Amanda::IPC::LineProtocol;
26 use IO::Handle;
27 use Amanda::MainLoop;
28 use Amanda::Debug;
29 use Data::Dumper;
30 use Carp;
31
32 ##
33 # Define a test protocol
34
35 package TestProtocol;
36 use base "Amanda::IPC::LineProtocol";
37 use Amanda::IPC::LineProtocol;
38
39 use constant SIMPLE => message("SIMPLE",
40     format => [ qw( ) ],
41 );
42
43 use constant FOO => message("FOO",
44     format => [ qw( name? nicknames* ) ],
45 );
46
47 use constant FO => message("FO",  # prefix of "FOO"
48     format => [ qw( ) ],
49 );
50
51 use constant ASSYM => message("ASSYM",
52     format => {
53         in => [ qw( a b ) ],
54         out => [ qw( x ) ],
55     },
56 );
57
58 use constant BAR => message("BAR",
59     match => qr/^BA[Rh]$/i, # more elaborate regex
60     format => [ qw( mandatory optional? ) ],
61 );
62
63 use constant QUIT => message("QUIT",
64     match => qr/^QUIT$/i,
65     on_eof => 1,
66     format => [ qw( reason? ) ],
67 );
68
69 package main;
70
71 # set up debugging so debug output doesn't interfere with test results
72 Amanda::Debug::dbopen("installcheck");
73 Installcheck::log_test_output();
74
75 # and disable Debug's die() and warn() overrides
76 Amanda::Debug::disable_die_override();
77
78 # run $code in a separate process, with read and write handles hooked up, and returns
79 # read and write handles.
80 sub in_fork {
81     my ($code) = @_;
82
83     my ($parent_read, $child_write) = POSIX::pipe();
84     my ($child_read, $parent_write) = POSIX::pipe();
85
86     my $pid = fork();
87     if (!defined($pid) or $pid < 0) {
88         die("Can't fork: $!");
89     }
90
91     if (!$pid) {
92         ## child
93
94         # get our file-handle house in order
95         POSIX::close($parent_read);
96         POSIX::close($parent_write);
97
98         $code->(IO::Handle->new_from_fd($child_read, "r"),
99                 IO::Handle->new_from_fd($child_write, "w"));
100         POSIX::exit(0);
101     }
102
103     ## parent
104
105     POSIX::close($child_read);
106     POSIX::close($child_write);
107
108     return (IO::Handle->new_from_fd($parent_read, "r"),
109             IO::Handle->new_from_fd($parent_write, "w"),
110             $pid);
111 }
112
113 # generic "die" message_cb
114 my $message_cb = make_cb(message_cb => sub {
115     my ($msgtype, %params) = @_;
116     if (defined $msgtype) {
117         diag(Dumper(\%params));
118         die("unhandled message: $msgtype");
119     } else {
120         die("IPC error: $params{'error'}");
121     }
122 });
123
124 ##
125 # Run some tests
126
127 my $proto;
128 my @events;
129 my ($rx_fh, $tx_fh, $pid);
130
131 # on QUIT, stop the protocol and quit the mainloop
132 my $quit_cb = make_cb(quit_cb => sub {
133     push @events, [ @_ ];
134     $proto->stop(finished_cb => sub {
135         Amanda::MainLoop::quit();
136     });
137 });
138
139
140 #
141 # test a simple "QUIT"
142
143 @events = ();
144 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
145     my ($rdh, $wrh) = @_;
146     $wrh->autoflush(1);
147
148     $rdh->getline(); # get 'start\n'
149     $wrh->write("QUIT \"just because\"");
150 });
151
152 $proto = TestProtocol->new(
153     rx_fh => $rx_fh, tx_fh => $tx_fh,
154     message_cb => $message_cb);
155 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
156 Amanda::MainLoop::call_later(sub {
157     $tx_fh->autoflush(1);
158     $tx_fh->write("start\n");
159 });
160 Amanda::MainLoop::run();
161 waitpid($pid, 0);
162
163 is_deeply([ @events ],
164     [
165         [ "QUIT", reason => "just because" ],
166         [ "QUIT" ],
167     ],
168     "correct events for a simple 'QUIT \"just because\"")
169     or diag(Dumper(\@events));
170
171
172 ##
173 # test a bogus message
174
175 @events = ();
176 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
177     my ($rdh, $wrh) = @_;
178     $wrh->autoflush(1);
179
180     $rdh->getline(); # get 'start\n'
181     $wrh->write("SNARSBLAT, yo");
182 });
183
184 $proto = TestProtocol->new(
185     rx_fh => $rx_fh, tx_fh => $tx_fh,
186     message_cb => sub { push @events, [ @_ ]; });
187 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
188 Amanda::MainLoop::call_later(sub {
189     $tx_fh->autoflush(1);
190     $tx_fh->write("start\n");
191 });
192 Amanda::MainLoop::run();
193 waitpid($pid, 0);
194
195 is_deeply([ @events ],
196     [
197         [ undef, 'error' => 'unknown command' ],
198         [ "QUIT" ], # from EOF
199     ],
200     "bogus message handled correctly")
201     or diag(Dumper(\@events));
202
203
204 ##
205 # a more complex conversation
206
207 @events = ();
208 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
209     my ($rdh, $wrh) = @_;
210     $wrh->autoflush(1);
211
212     $wrh->write("FOO\n");
213     $rdh->getline() =~ /SIMPLE/ or die("bad response");
214
215     $wrh->write("FOO one\n");
216     $rdh->getline() =~ /SIMPLE/ or die("bad response");
217
218     $wrh->write("FOO one \"t w o\"\n");
219     $rdh->getline() =~ /SIMPLE/ or die("bad response");
220
221     $wrh->write("FOO one \"t w o\" three\n");
222     $rdh->getline() =~ /SIMPLE/ or die("bad response");
223 });
224
225 $proto = TestProtocol->new(
226     rx_fh => $rx_fh, tx_fh => $tx_fh,
227     message_cb => $message_cb);
228 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
229 $proto->set_message_cb(TestProtocol::FOO, sub {
230     push @events, [ shift @_, { @_ } ];
231     $proto->send(TestProtocol::SIMPLE);
232 });
233 Amanda::MainLoop::run();
234 waitpid($pid, 0);
235
236 is_deeply([ @events ],
237     [
238         [ "FOO", { nicknames => [] } ],
239         [ "FOO", { nicknames => [], name => "one" } ],
240         [ "FOO", { nicknames => [ "t w o" ], name => "one" } ],
241         [ "FOO", { nicknames => [ "t w o", "three" ], name => "one" } ],
242         [ "QUIT" ],
243     ],
244     "correct events for a few conversation steps, parsing")
245     or diag(Dumper(\@events));
246
247 ##
248 # Asymmetrical formats
249
250 @events = ();
251 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
252     my ($rdh, $wrh) = @_;
253     $wrh->autoflush(1);
254
255     $wrh->write("ASSYM 1 2\n");
256     $rdh->getline() =~ /ASSYM a/ or die("bad response");
257 });
258
259 $proto = TestProtocol->new(
260     rx_fh => $rx_fh, tx_fh => $tx_fh,
261     message_cb => $message_cb);
262 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
263 $proto->set_message_cb(TestProtocol::ASSYM, sub {
264         push @events, [ shift @_, { @_ } ];
265         $proto->send(TestProtocol::ASSYM, x => "a");
266     });
267 Amanda::MainLoop::run();
268 waitpid($pid, 0);
269
270 is_deeply([ @events ],
271     [
272         [ "ASSYM", { a => "1", b => "2" } ],
273         [ "QUIT" ],
274     ],
275     "correct events for asymmetric message format")
276     or diag(Dumper(\@events));
277
278
279 ##
280 # test queueing up of messages on writing.
281
282 # The idea here is to write more than a pipe buffer can hold, while the child
283 # process does not read that data, and then to signal the child process,
284 # causing it to read all of that data, write a reply, and exit.  Recent linuxes
285 # have a pipe buffer of 64k, so we exceed that threshold.  We use an 'alarm' to
286 # fail in the case that this blocks.
287
288 my $NMSGS = 10000;
289
290 @events = ();
291 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
292     my ($rdh, $wrh) = @_;
293     $wrh->autoflush(1);
294
295     # on USR1, read lots of inputs
296     $SIG{'USR1'} = sub {
297         for (my $i = 0; $i < $NMSGS; $i++) {
298             $rdh->getline();
299         }
300
301         # send a message that the parent can hope to get
302         $wrh->write("BAR \"got your inputs\"\n");
303
304         # and bail out
305         POSIX::exit(0);
306     };
307
308     # and sleep forever, or until killed.
309     while (1) { sleep(100); }
310 });
311
312 $proto = TestProtocol->new(
313     rx_fh => $rx_fh, tx_fh => $tx_fh,
314     message_cb => $message_cb);
315 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
316 $proto->set_message_cb(TestProtocol::BAR, sub {
317         push @events, [ shift @_, { @_ } ];
318     });
319
320 # die after 10 minutes
321 alarm 600;
322
323 # send $NMSGS messages to the child, which isn't listening yet!
324 for (my $i = 0; $i < $NMSGS; $i++) {
325     $proto->send(TestProtocol::SIMPLE);
326 }
327 # and then send it SIGUSR1, so it reads those
328 kill USR1 => $pid;
329
330 Amanda::MainLoop::run();
331 waitpid($pid, 0);
332 alarm 0; # cancel the alarm
333
334 is_deeply([ @events ],
335     [
336         [ "BAR", { mandatory => "got your inputs" } ],
337         [ "QUIT" ],
338     ],
339     "write buffering handled correctly")
340     or diag(Dumper(\@events));
341
342 ##
343 # test the message_obj functionality
344
345 package main::MessageObj;
346
347 sub msg_FOO {
348     my $self = shift;
349     push @{$self}, [ shift @_, { @_ } ];
350     $proto->send(TestProtocol::SIMPLE);
351 }
352
353 sub msg_BAR {
354     my $self = shift;
355     push @{$self}, [ shift @_, { @_ } ];
356     $proto->send(TestProtocol::SIMPLE);
357 }
358
359 package main;
360
361 @events = ();
362 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
363     my ($rdh, $wrh) = @_;
364     $wrh->autoflush(1);
365
366     $wrh->write("FOO\n");
367     $rdh->getline() =~ /SIMPLE/ or die("bad response");
368
369     $wrh->write("BAR one\n");
370     $rdh->getline() =~ /SIMPLE/ or die("bad response");
371
372     $wrh->write("BAH one \"t w o\"\n"); # note alternate spelling "BAH"
373     $rdh->getline() =~ /SIMPLE/ or die("bad response");
374
375     $wrh->write("FOO one \"t w o\" three\n");
376     $rdh->getline() =~ /SIMPLE/ or die("bad response");
377 });
378
379 $proto = TestProtocol->new(
380     rx_fh => $rx_fh, tx_fh => $tx_fh,
381     message_obj => bless(\@events, "main::MessageObj"));
382 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
383 Amanda::MainLoop::run();
384 waitpid($pid, 0);
385
386 is_deeply([ @events ],
387     [ [ 'FOO', { 'nicknames' => [] } ],
388       [ 'BAR', { 'mandatory' => 'one' } ],
389       [ 'BAR', { 'mandatory' => 'one', 'optional' => 't w o' } ],
390       [ 'FOO', { 'name' => 'one', 'nicknames' => [ 't w o', 'three' ] } ],
391       [ 'QUIT' ],
392     ],
393     "message_obj works")
394     or diag(Dumper(\@events));
395