1 # Copyright (c) 2010 Zmanda, Inc. All Rights Reserved.
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License version 2 as published
5 # by the Free Software Foundation.
7 # This program is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 # You should have received a copy of the GNU General Public License along
13 # with this program; if not, write to the Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 # Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
19 package Installcheck::ClientService;
23 Installcheck::ClientService - a harness for testing client services like
24 sendbackup or selfcheck.
28 use Installcheck::ClientService;
30 # fire up a fake amandad
32 my $process_done = sub {
33 my ($wait_status) = @_;
34 Amanda::MainLoop::quit();
36 $service = Installcheck::ClientService->new(
37 service => 'amindexd',
40 auth_peer => 'localhost',
41 process_done => $process_done);
43 $service = Installcheck::ClientService->new(
44 service => 'amindexd',
47 process_done => $process_done);
48 $service->expect('main',
49 [ re => qr/^CONNECT (\d+)\n/, $handle_connect ],
50 [ re => qr/^ERROR (.*)\r\n/, $handle_error ]);
51 $service->expect('stream1',
52 [ eof => $handle_eof ]);
53 $service->expect('stream2',
54 [ header => $handle_header ]);
55 $service->expect('stream3',
56 [ data => $handle_data ]);
57 Amanda::MainLoop::run();
61 The C<Installcheck::ClientService> class re-implements the service-facing side
62 of amandad and inetd. It strips away all of the service-specific hacks and the
63 security API portions. It handles multiple, simultaneous, named, bidirectional
64 data streams with an expect-like interface.
66 When emulating amandad, the service is run with the usual high-numbered file
67 descriptors pre-piped, and with 'amandad' in C<argv[1]> and the C<auth>
68 parameter (which defaults to 'bsdtcp') in C<argv[2]>. The service's stdout and
69 stdin are connected to the 'main' stream, and stderr is available as 'stderr'.
70 The three bidirectional streams on the high-numbered pipes are available as
71 'stream1', 'stream2', and 'stream3'. You should send a request packet on the
72 'main' stream and close it for writing, and read the reply from 'main'. Note
73 that you should omit the 'SERVICE' line of the request, as it is ordinarily
74 consumed by amandad itself.
76 When emulating inetd, the service is run with a TCP socket attached to its
77 stdin and stdout, and 'installcheck' in C<argv[1]>. Additional arguments can
78 be provided in the C<args> parameter. The TCP socket is available as stream
83 See the SYNOPSIS for examples. The constructor's C<service> parameter gives
84 the name of the service to run. The C<emulate> parameter determines how the
85 service is invoked. The C<args> and C<auth> parameters are described above.
86 The C<process_done> parameter gives a sub which is called with the service's
87 wait status when the service exits and all of its file descriptors have been
88 drained. The C<auth_peer> parameter gives the value for
89 C<$AMANDA_AUTHENTICATED_PEER> when emulating amandad.
91 =head2 Killing Subprocess
93 To kill the subprocess, call
97 this will send a SIGINT. Process termination proceeds as normal -
98 C<process_done> will be called.
100 =head2 Handling Streams
102 Streams have simple strings as names; the standard names are described in the
105 To send data on a stream, use C<send>:
107 $service->send('main', 'Hello, service!\n');
109 Note that this method does not block until the data is sent.
111 To close a stream, use C<close>. It takes a stream name and direction, and
112 only closes that direction. For TCP connections, this means half-open
113 connections, while for file descriptors only one of the descriptors is closed.
115 $service->close('data', 'w'); # close for reading
116 $service->close('data', 'r'); # close for writing
117 $service->close('data', 'b'); # close for both
119 When emulating inetd, the C<connect> method can open a new connection to the
120 service, given a port number and a name for the new stream:
122 $service->connect('index', $idx_port);
124 =head2 Handling Incoming Data
126 The read side of each stream has a set of I<expectations>: expected events and
127 subs to call when those events occur. Each expectation comes in the form of an
128 arrayref, and starts with a string indicating its type. The simplest is a
131 [ re => qr/^200 OK/, $got_ok ]
133 In this case the C<$got_ok> sub will be called with the matched text. An
134 expected EOF is written
138 To capture a stream of data, and call C<$got_data> on EOF with the number of
141 [ bytes_to_eof => $got_eof ]
143 To capture a specific amount of data - in this case 32k - and pass it to
146 [ bytes => 32768, $got_header ]
148 The set of expectations for a stream is set with the C<expect> method. This
149 method completely replaces any previous expectations.
151 $service->expect('data',
152 [ re => qr/^200 OK.*\n/, $got_ok ],
153 [ re => qr/^4\d\d .*\n/, $got_err ]);
157 use base qw( Exporter );
160 use Amanda::Constants;
161 use Amanda::MainLoop;
164 use Amanda::Debug qw( debug );
165 use POSIX qw( :fcntl_h );
171 use constant DATA_FD_OFFSET => $Amanda::Constants::DATA_FD_OFFSET;
172 use constant DATA_FD_COUNT => $Amanda::Constants::DATA_FD_COUNT;
173 our @EXPORT_OK = qw(DATA_FD_OFFSET DATA_FD_COUNT);
174 our %EXPORT_TAGS = ( constants => [ @EXPORT_OK ] );
181 emulate => $params{'emulate'},
182 service => $params{'service'},
183 process_done => $params{'process_done'},
184 auth => $params{'auth'} || 'bsdtcp',
185 args => $params{'args'} || [],
186 auth_peer => $params{'auth_peer'},
188 # all hashes keyed by stream name
190 outstanding_writes => {},
191 close_after_write => {},
199 if ($self->{'emulate'} eq 'amandad') {
200 $self->_start_process_amandad();
201 } elsif ($self->{'emulate'} eq 'inetd') {
202 $self->_start_process_inetd();
204 die "invalid 'emulate' parameter";
212 my ($name, $data) = @_;
214 my $fd = $self->{'stream_fds'}{$name}[1];
215 die "stream '$name' is not writable"
216 unless defined $fd and $fd != -1;
218 return if $data eq '';
220 $self->{'outstanding_writes'}{$name}++;
221 Amanda::MainLoop::async_write(
224 async_write_cb => sub {
225 my ($err, $bytes_written) = @_;
226 die "on stream $name: $err" if $err;
228 $self->_log_data(">>", $name, $data);
230 $self->{'outstanding_writes'}{$name}--;
231 if ($self->{'close_after_write'}{$name}
232 and $self->{'outstanding_writes'}{$name} == 0) {
233 $self->_do_close_write($name);
240 my ($name, $port) = @_;
242 socket(my $child, PF_INET, SOCK_STREAM, getprotobyname('tcp'))
243 or die "error creating connect socket: $!";
244 connect($child, sockaddr_in($port, inet_aton("127.0.0.1")))
245 or die "error connecting: $!";
247 # get our own fd for the socket that Perl won't close for us, and
248 # close the perl socket
249 my $fd = dup(fileno($child));
252 $self->_add_stream($name, $fd, $fd);
257 my ($name, $for) = @_;
259 die "stream '$name' does not exist"
260 unless exists $self->{'stream_fds'}{$name};
262 # translate 'b'oth into 'r'ead and 'w'rite
264 $self->close($name, 'r');
265 $self->close($name, 'w');
270 if ($self->{'outstanding_writes'}{$name}) {
271 # close when the writes are done
272 $self->{'close_after_write'}{$name} = 1;
274 $self->_do_close_write($name);
277 $self->_do_close_read($name);
283 my ($name, @expectations) = @_;
285 for my $exp (@expectations) {
286 # set up a byte counter for bytes_to_eof
287 if ($exp->[0] eq 'bytes_to_eof') {
292 $self->{'expectations'}{$name} = [ @expectations ];
294 $self->_check_expectations($name);
300 kill 'INT', $self->{'pid'};
305 sub _start_process_amandad {
309 my $service = "$amlibexecdir/$self->{service}";
310 die "service '$service' does not exist" unless -x $service;
312 # we'll need some pipes:
313 my ($stdin_c, $stdin_p) = POSIX::pipe();
314 my ($stdout_p, $stdout_c) = POSIX::pipe();
315 my ($stderr_p, $stderr_c) = POSIX::pipe();
317 for ($i = 0; $i < DATA_FD_COUNT; $i++) {
318 my ($in_c, $in_p) = POSIX::pipe();
319 my ($out_p, $out_c) = POSIX::pipe();
320 push @data_fdpairs, [ $in_c, $in_p, $out_p, $out_c ];
324 $self->{'pid'} = POSIX::fork();
325 die "could not fork: $!" if (!defined $self->{'pid'} || $self->{'pid'} < 0);
326 if ($self->{'pid'} == 0) {
332 # First, close all of the fd's we don't need.
333 POSIX::close($stdin_p);
334 POSIX::close($stdout_p);
335 POSIX::close($stderr_p);
336 for $fdpair (@data_fdpairs) {
337 my ($in_c, $in_p, $out_p, $out_c) = @$fdpair;
339 POSIX::close($out_p);
342 # dup our in/out fd's appropriately
343 POSIX::dup2($stdin_c, 0);
344 POSIX::dup2($stdout_c, 1);
345 POSIX::dup2($stderr_c, 2);
346 POSIX::close($stdin_c);
347 POSIX::close($stdout_c);
348 POSIX::close($stderr_c);
350 # then make sure everything is greater than the highest
353 for $fdpair (@data_fdpairs) {
354 my ($in_c, $in_p, $out_p, $out_c) = @$fdpair;
355 while ($in_c < DATA_FD_OFFSET + DATA_FD_COUNT * 2) {
356 push @fds_to_close, $in_c;
357 $in_c = POSIX::dup($in_c);
359 while ($out_c < DATA_FD_OFFSET + DATA_FD_COUNT * 2) {
360 push @fds_to_close, $out_c;
361 $out_c = POSIX::dup($out_c);
363 $fdpair->[0] = $in_c;
364 $fdpair->[3] = $out_c;
367 # close all of the leftovers
368 for $fd (@fds_to_close) {
372 # and now use dup2 to move everything to its final location (whew!)
373 for ($i = 0; $i < DATA_FD_COUNT; $i++) {
374 my ($in_c, $in_p, $out_p, $out_c) = @{$data_fdpairs[$i]};
375 POSIX::dup2($out_c, DATA_FD_OFFSET + $i*2);
376 POSIX::dup2($in_c, DATA_FD_OFFSET + $i*2 + 1);
377 POSIX::close($out_c);
381 delete $ENV{'AMANDA_AUTHENTICATED_PEER'};
382 $ENV{'AMANDA_AUTHENTICATED_PEER'} = $self->{'auth_peer'} if $self->{'auth_peer'};
385 # braces avoid warning
386 { exec { $service } $service, 'amandad', $self->{'auth'}; }
387 my $err = "could not execute $service; $!\n";
388 POSIX::write(2, $err, length($err));
394 # watch for the child to die
395 Amanda::MainLoop::call_on_child_termination($self->{'pid'},
396 sub { $self->_process_done(@_); });
398 # close all of the fd's we don't need, and make notes of the fd's
399 # we want to keep around
401 POSIX::close($stdin_c);
402 POSIX::close($stdout_c);
403 $self->_add_stream('main', $stdout_p, $stdin_p);
405 POSIX::close($stderr_c);
406 $self->_add_stream('stderr', $stderr_p, -1);
408 for ($i = 0; $i < DATA_FD_COUNT; $i++) {
409 my ($in_c, $in_p, $out_p, $out_c) = @{$data_fdpairs[$i]};
411 POSIX::close($out_c);
413 $self->_add_stream('stream'.($i+1), $out_p, $in_p);
417 sub _start_process_inetd {
421 # figure out the service
422 my $service = "$amlibexecdir/$self->{service}";
423 die "service '$service' does not exist" unless -x $service;
425 # set up and bind a listening socket on localhost
426 socket(SERVER, PF_INET, SOCK_STREAM, getprotobyname('tcp'))
427 or die "creating socket: $!";
428 bind(SERVER, sockaddr_in(0, inet_aton("127.0.0.1")))
429 or die "binding socket: $!";
431 my ($port, $addr) = sockaddr_in(getsockname(SERVER));
434 $self->{'pid'} = POSIX::fork();
435 die "could not fork: $!" if ($self->{'pid'} < 0);
436 if ($self->{'pid'} == 0) {
439 # send stderr to debug
440 Amanda::Debug::debug_dup_stderr_to_debug();
442 # wait for a connection on the socket, waiting a long time
444 alarm 60*60*24; # one day
445 my $paddr = accept(CLIENT, SERVER);
449 # dup that into stdio
450 POSIX::dup2(fileno(CLIENT), 0);
451 POSIX::dup2(fileno(CLIENT), 1);
455 # braces avoid warning
456 { exec { $service } $service, 'installcheck', @{$self->{'args'}}; }
457 my $err = "could not execute $service; $!\n";
458 POSIX::write(2, $err, length($err));
464 # watch for the child to die
465 Amanda::MainLoop::call_on_child_termination($self->{'pid'},
466 sub { $self->_process_done(@_); });
468 # close the server socket
471 # connect to the child
472 $self->connect('main', $port);
477 my ($name, $rfd, $wfd) = @_;
479 if (exists $self->{'stream_fds'}{$name}) {
480 die "stream $name already exists";
483 $self->{'stream_fds'}{$name} = [ $rfd, $wfd ];
484 $self->{'read_sources'}{$name} = undef;
485 $self->{'outstanding_writes'}{$name} = 0;
486 $self->{'close_after_write'}{$name} = 0;
488 # start an async read on every read_fd we set up, after making it not-blocking
492 Amanda::Util::set_blocking($rfd, 0);
493 $self->{'read_buf'}{$name} = '';
494 $self->{'got_eof'}{$name} = 0;
496 $async_read_cb = sub {
497 my ($err, $data) = @_;
498 die "on stream $name: $err" if $err;
501 $self->_log_data("<<", $name, $data);
505 $self->{'read_sources'}{$name} =
506 Amanda::MainLoop::async_read(
508 async_read_cb => $async_read_cb);
510 delete $self->{'read_sources'}{$name};
511 $self->_do_close_read($name);
514 # add the data to the buffer, or signal EOF
516 $self->{'read_buf'}{$name} .= $data;
518 $self->{'got_eof'}{$name} = 1;
521 # and call the user function
522 $self->_check_expectations($name);
525 $self->{'read_sources'}{$name} =
526 Amanda::MainLoop::async_read(
528 async_read_cb => $async_read_cb);
531 # set all the write_fd's to non-blocking too.
533 Amanda::Util::set_blocking($wfd, 0);
541 my $fds = $self->{'stream_fds'}{$name};
543 if ($fds->[0] == -1) {
544 die "$name is already closed for reading";
547 debug("XX closing $name for reading");
549 # remove any ongoing reads
550 if ($self->{'read_sources'}{$name}) {
551 $self->{'read_sources'}{$name}->remove();
552 delete $self->{'read_sources'}{$name};
555 # if both fd's are the same, then this is probably a socket, so shut down
557 if ($fds->[0] == $fds->[1]) {
558 # perl doesn't provide a fd-compatible shutdown, but luckily shudown
559 # affects dup'd file descriptors, too! So create a new handle and shut
560 # it down. When the handle is garbage collected, it will be closed,
561 # but that will not affect the original. This will look strange in an
562 # strace, but it works without SWIGging shutdown()
563 shutdown(IO::Handle->new_from_fd(POSIX::dup($fds->[0]), "r"), 0);
565 POSIX::close($fds->[0]);
569 if ($fds->[1] == -1) {
570 delete $self->{'stream_fds'}{$name};
574 sub _do_close_write {
576 my ($name, $for) = @_;
578 my $fds = $self->{'stream_fds'}{$name};
580 if ($fds->[1] == -1) {
581 die "$name is already closed for writing";
584 debug("XX closing $name for writing");
586 # if both fd's are the same, then this is probably a socket, so shut down
588 if ($fds->[1] == $fds->[0]) {
590 shutdown(IO::Handle->new_from_fd(POSIX::dup($fds->[1]), "w"), 1);
592 POSIX::close($fds->[1]);
596 if ($fds->[0] == -1) {
597 delete $self->{'stream_fds'}{$name};
599 delete $self->{'outstanding_writes'}{$name};
600 delete $self->{'close_after_write'}{$name};
605 my ($exitstatus) = @_;
607 debug("service exit: $exitstatus");
609 # delay this to the next trip around the MainLoop, in case data is available
611 Amanda::MainLoop::call_later(\&_do_process_done, $self, $exitstatus);
614 sub _do_process_done {
616 my ($exitstatus) = @_;
618 $self->{'process_done_loops'} = ($self->{'process_done_loops'} || 0) + 1;
620 # defer with call_after if there are still read fd's open or data in a read
621 # buffer. Since the process just died, presumably these will close in this
622 # trip around the MainLoop, so this will be a very short busywait. The upper
623 # bound on the wait is 1 second.
624 if ($self->{'process_done_loops'} < 100) {
626 for my $name (keys %{$self->{'stream_fds'}}) {
627 my $fds = $self->{'stream_fds'}{$name};
628 # if we're still expecting something on this stream..
629 if ($self->{'expectations'}{$name}) {
632 # or the stream's not closed yet..
633 if ($fds->[0] != -1) {
638 return Amanda::MainLoop::call_after(10, \&_do_process_done, $self, $exitstatus);
642 # close all of the write_fd's. If there are pending writes, they
643 # were going to get a SIGPIPE anyway.
644 for my $name (keys %{$self->{'stream_fds'}}) {
645 my $fds = $self->{'stream_fds'}{$name};
646 if ($fds->[1] != -1) {
647 $self->_do_close_write($name);
651 $self->{'process_done'}->($exitstatus);
656 my ($dir, $name, $data) = @_;
659 if (length($data) < 300) {
660 my $printable = $data;
661 $printable =~ s/[^\r\n[:print:]]+//g;
662 $printable =~ s/\n/\\n/g;
663 $printable =~ s/\r/\\r/g;
664 debug("$dir$name: [$printable]");
666 debug(sprintf("$dir$name: %d bytes", length($data)));
669 debug("$dir$name: EOF");
673 sub _check_expectations {
677 my $expectations = $self->{'expectations'}{$name};
678 return unless defined $expectations and @$expectations;
682 # if we got EOF and have no more pending data, look for a matching
684 if ($self->{'got_eof'}{$name} and !$self->{'read_buf'}{$name}) {
685 for my $exp (@$expectations) {
686 if ($exp->[0] eq 'eof') {
690 } elsif ($exp->[0] eq 'bytes_to_eof') {
692 @args = ($exp->[2],); # byte count
698 debug("Expected on $name: " . Dumper($expectations));
699 die "Unexpected EOF on $name";
701 } elsif ($self->{'read_buf'}{$name}) {
702 my $buf = $self->{'read_buf'}{$name};
704 for my $exp (@$expectations) {
705 if ($exp->[0] eq 'eof') {
706 die "Expected EOF but got data on $name";
707 } elsif ($exp->[0] eq 'bytes_to_eof') {
708 # store the ongoing byte count in the expectation itself
709 $exp->[2] = ($exp->[2] || 0) + length($buf);
710 $self->{'read_buf'}{$name} = '';
711 # and if this stream *also* has EOF, call back
712 if ($self->{'got_eof'}{$name}) {
714 @args = ($exp->[2],); # byte count
717 } elsif ($exp->[0] eq 'bytes') {
718 if (length($buf) >= $exp->[1]) {
720 @args = (substr($buf, 0, $exp->[1]),);
721 $self->{'read_buf'}{$name} = substr($buf, $exp->[1]);
723 last; # done searching, even if we don't call a sub
724 } elsif ($exp->[0] eq 're') {
725 if ($buf =~ $exp->[1]) {
727 @args = ($&,); # matched section of $buf
728 $self->{'read_buf'}{$name} = $'; # remainder of $buf
735 # if there's a callback to make, then remove the expectations *before*
738 delete $self->{'expectations'}{$name};