Imported Upstream version 3.3.2
[debian/amanda] / installcheck / Amanda_IPC_LineProtocol.pl
1 # Copyright (c) 2009-2012 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License version 2 as published
5 # by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
10 # for more details.
11 #
12 # You should have received a copy of the GNU General Public License along
13 # with this program; if not, write to the Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
15 #
16 # Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
18
19 use Test::More tests => 6;
20 use strict;
21 use warnings;
22
23 use lib "@amperldir@";
24 use Installcheck;
25 use Amanda::IPC::LineProtocol;
26 use IO::Handle;
27 use Amanda::MainLoop;
28 use Amanda::Debug;
29 use Data::Dumper;
30 use Carp;
31
32 ##
33 # Define a test protocol
34
35 package TestProtocol;
36 use base "Amanda::IPC::LineProtocol";
37 use Amanda::IPC::LineProtocol;
38
39 use constant SIMPLE => message("SIMPLE",
40     format => [ qw( ) ],
41 );
42
43 use constant FOO => message("FOO",
44     format => [ qw( name? nicknames* ) ],
45 );
46
47 use constant FO => message("FO",  # prefix of "FOO"
48     format => [ qw( ) ],
49 );
50
51 use constant ASSYM => message("ASSYM",
52     format => {
53         in => [ qw( a b ) ],
54         out => [ qw( x ) ],
55     },
56 );
57
58 use constant BAR => message("BAR",
59     match => qr/^BA[Rh]$/i, # more elaborate regex
60     format => [ qw( mandatory optional? ) ],
61 );
62
63 use constant QUIT => message("QUIT",
64     match => qr/^QUIT$/i,
65     on_eof => 1,
66     format => [ qw( reason? ) ],
67 );
68
69 package main;
70
71 # set up debugging so debug output doesn't interfere with test results
72 Amanda::Debug::dbopen("installcheck");
73 Installcheck::log_test_output();
74
75 # and disable Debug's die() and warn() overrides
76 Amanda::Debug::disable_die_override();
77
78 # run $code in a separate process, with read and write handles hooked up, and returns
79 # read and write handles.
80 sub in_fork {
81     my ($code) = @_;
82
83     my ($parent_read, $child_write) = POSIX::pipe();
84     my ($child_read, $parent_write) = POSIX::pipe();
85
86     my $pid = fork();
87     if (!defined($pid) or $pid < 0) {
88         die("Can't fork: $!");
89     }
90
91     if (!$pid) {
92         ## child
93
94         # get our file-handle house in order
95         POSIX::close($parent_read);
96         POSIX::close($parent_write);
97
98         $code->(IO::Handle->new_from_fd($child_read, "r"),
99                 IO::Handle->new_from_fd($child_write, "w"));
100         POSIX::exit(0);
101     }
102
103     ## parent
104
105     POSIX::close($child_read);
106     POSIX::close($child_write);
107
108     return (IO::Handle->new_from_fd($parent_read, "r"),
109             IO::Handle->new_from_fd($parent_write, "w"),
110             $pid);
111 }
112
113 # generic "die" message_cb
114 my $message_cb = make_cb(message_cb => sub {
115     my ($msgtype, %params) = @_;
116     if (defined $msgtype) {
117         diag(Dumper(\%params));
118         die("unhandled message: $msgtype");
119     } else {
120         die("IPC error: $params{'error'}");
121     }
122 });
123
124 ##
125 # Run some tests
126
127 my $proto;
128 my @events;
129 my ($rx_fh, $tx_fh, $pid);
130
131 # on QUIT, stop the protocol and quit the mainloop
132 my $quit_cb = make_cb(quit_cb => sub {
133     push @events, [ @_ ];
134     $proto->stop(finished_cb => sub {
135         Amanda::MainLoop::quit();
136     });
137 });
138
139
140 #
141 # test a simple "QUIT"
142
143 @events = ();
144 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
145     my ($rdh, $wrh) = @_;
146     $wrh->autoflush(1);
147
148     $rdh->getline(); # get 'start\n'
149     $wrh->write("QUIT \"just because\"");
150 });
151
152 $proto = TestProtocol->new(
153     rx_fh => $rx_fh, tx_fh => $tx_fh,
154     message_cb => $message_cb);
155 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
156 Amanda::MainLoop::call_later(sub {
157     $tx_fh->autoflush(1);
158     $tx_fh->write("start\n");
159 });
160 Amanda::MainLoop::run();
161 waitpid($pid, 0);
162
163 is_deeply([ @events ],
164     [
165         [ "QUIT", reason => "just because" ],
166         [ "QUIT" ],
167     ],
168     "correct events for a simple 'QUIT \"just because\"")
169     or diag(Dumper(\@events));
170
171
172 ##
173 # test a bogus message
174
175 @events = ();
176 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
177     my ($rdh, $wrh) = @_;
178     $wrh->autoflush(1);
179
180     $rdh->getline(); # get 'start\n'
181     $wrh->write("SNARSBLAT, yo");
182 });
183
184 $proto = TestProtocol->new(
185     rx_fh => $rx_fh, tx_fh => $tx_fh,
186     message_cb => sub { push @events, [ @_ ]; });
187 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
188 Amanda::MainLoop::call_later(sub {
189     $tx_fh->autoflush(1);
190     $tx_fh->write("start\n");
191 });
192 Amanda::MainLoop::run();
193 waitpid($pid, 0);
194
195 is_deeply([ @events ],
196     [
197         [ undef, 'error' => 'unknown command' ],
198         [ "QUIT" ], # from EOF
199     ],
200     "bogus message handled correctly")
201     or diag(Dumper(\@events));
202
203
204 ##
205 # a more complex conversation
206
207 @events = ();
208 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
209     my ($rdh, $wrh) = @_;
210     $wrh->autoflush(1);
211
212     $wrh->write("FOO\n");
213     $rdh->getline() =~ /SIMPLE/ or die("bad response");
214
215     $wrh->write("FOO one\n");
216     $rdh->getline() =~ /SIMPLE/ or die("bad response");
217
218     $wrh->write("FOO one \"t w o\"\n");
219     $rdh->getline() =~ /SIMPLE/ or die("bad response");
220
221     $wrh->write("FOO one \"t w o\" three\n");
222     $rdh->getline() =~ /SIMPLE/ or die("bad response");
223 });
224
225 $proto = TestProtocol->new(
226     rx_fh => $rx_fh, tx_fh => $tx_fh,
227     message_cb => $message_cb);
228 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
229 $proto->set_message_cb(TestProtocol::FOO, sub {
230     push @events, [ shift @_, { @_ } ];
231     $proto->send(TestProtocol::SIMPLE);
232 });
233 Amanda::MainLoop::run();
234 waitpid($pid, 0);
235
236 is_deeply([ @events ],
237     [
238         [ "FOO", { nicknames => [] } ],
239         [ "FOO", { nicknames => [], name => "one" } ],
240         [ "FOO", { nicknames => [ "t w o" ], name => "one" } ],
241         [ "FOO", { nicknames => [ "t w o", "three" ], name => "one" } ],
242         [ "QUIT" ],
243     ],
244     "correct events for a few conversation steps, parsing")
245     or diag(Dumper(\@events));
246
247 ##
248 # Asymmetrical formats
249
250 @events = ();
251 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
252     my ($rdh, $wrh) = @_;
253     $wrh->autoflush(1);
254
255     $wrh->write("ASSYM 1 2\n");
256     $rdh->getline() =~ /ASSYM a/ or die("bad response");
257 });
258
259 $proto = TestProtocol->new(
260     rx_fh => $rx_fh, tx_fh => $tx_fh,
261     message_cb => $message_cb);
262 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
263 $proto->set_message_cb(TestProtocol::ASSYM, sub {
264         push @events, [ shift @_, { @_ } ];
265         $proto->send(TestProtocol::ASSYM, x => "a");
266     });
267 Amanda::MainLoop::run();
268 waitpid($pid, 0);
269
270 is_deeply([ @events ],
271     [
272         [ "ASSYM", { a => "1", b => "2" } ],
273         [ "QUIT" ],
274     ],
275     "correct events for asymmetric message format")
276     or diag(Dumper(\@events));
277
278
279 ##
280 # test queueing up of messages on writing.
281
282 # The idea here is to write more than a pipe buffer can hold, while the child
283 # process does not read that data, and then to signal the child process,
284 # causing it to read all of that data, write a reply, and exit.  Recent linuxes
285 # have a pipe buffer of 64k, so we exceed that threshold.  We use an 'alarm' to
286 # fail in the case that this blocks.
287
288 my $NMSGS = 10000;
289
290 @events = ();
291 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
292     my ($rdh, $wrh) = @_;
293     $wrh->autoflush(1);
294
295     # on USR1, read lots of inputs
296     $SIG{'USR1'} = sub {
297         for (my $i = 0; $i < $NMSGS; $i++) {
298             $rdh->getline();
299         }
300
301         # send a message that the parent can hope to get
302         $wrh->write("BAR \"got your inputs\"\n");
303
304         # and bail out
305         POSIX::exit(0);
306     };
307
308     $wrh->write("SIMPLE\n");
309
310     # and sleep forever, or until killed.
311     while (1) { sleep(100); }
312 });
313
314 $proto = TestProtocol->new(
315     rx_fh => $rx_fh, tx_fh => $tx_fh,
316     message_cb => $message_cb);
317 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
318 $proto->set_message_cb(TestProtocol::SIMPLE, sub {
319         push @events, [ shift @_ ];
320         # send $NMSGS messages to the child, which isn't listening yet!
321         for (my $i = 0; $i < $NMSGS; $i++) {
322             $proto->send(TestProtocol::SIMPLE);
323         }
324         # and then send it SIGUSR1, so it reads those
325         kill USR1 => $pid;
326     });
327 $proto->set_message_cb(TestProtocol::BAR, sub {
328         push @events, [ shift @_, { @_ } ];
329     });
330
331 # die after 10 minutes
332 alarm 600;
333
334 Amanda::MainLoop::run();
335 waitpid($pid, 0);
336 alarm 0; # cancel the alarm
337
338 is_deeply([ @events ],
339     [
340         [ "SIMPLE" ],
341         [ "BAR", { mandatory => "got your inputs" } ],
342         [ "QUIT" ],
343     ],
344     "write buffering handled correctly")
345     or diag(Dumper(\@events));
346
347 ##
348 # test the message_obj functionality
349
350 package main::MessageObj;
351
352 sub msg_FOO {
353     my $self = shift;
354     push @{$self}, [ shift @_, { @_ } ];
355     $proto->send(TestProtocol::SIMPLE);
356 }
357
358 sub msg_BAR {
359     my $self = shift;
360     push @{$self}, [ shift @_, { @_ } ];
361     $proto->send(TestProtocol::SIMPLE);
362 }
363
364 package main;
365
366 @events = ();
367 ($rx_fh, $tx_fh, $pid) = in_fork(sub {
368     my ($rdh, $wrh) = @_;
369     $wrh->autoflush(1);
370
371     $wrh->write("FOO\n");
372     $rdh->getline() =~ /SIMPLE/ or die("bad response");
373
374     $wrh->write("BAR one\n");
375     $rdh->getline() =~ /SIMPLE/ or die("bad response");
376
377     $wrh->write("BAH one \"t w o\"\n"); # note alternate spelling "BAH"
378     $rdh->getline() =~ /SIMPLE/ or die("bad response");
379
380     $wrh->write("FOO one \"t w o\" three\n");
381     $rdh->getline() =~ /SIMPLE/ or die("bad response");
382 });
383
384 $proto = TestProtocol->new(
385     rx_fh => $rx_fh, tx_fh => $tx_fh,
386     message_obj => bless(\@events, "main::MessageObj"));
387 $proto->set_message_cb(TestProtocol::QUIT, $quit_cb);
388 Amanda::MainLoop::run();
389 waitpid($pid, 0);
390
391 is_deeply([ @events ],
392     [ [ 'FOO', { 'nicknames' => [] } ],
393       [ 'BAR', { 'mandatory' => 'one' } ],
394       [ 'BAR', { 'mandatory' => 'one', 'optional' => 't w o' } ],
395       [ 'FOO', { 'name' => 'one', 'nicknames' => [ 't w o', 'three' ] } ],
396       [ 'QUIT' ],
397     ],
398     "message_obj works")
399     or diag(Dumper(\@events));
400