X-Git-Url: https://git.gag.com/?a=blobdiff_plain;f=installcheck%2FInstallcheck%2FClientService.pm;fp=installcheck%2FInstallcheck%2FClientService.pm;h=942921a5d0ef0ea0b835b7f493cb06b16919d1bf;hb=fd48f3e498442f0cbff5f3606c7c403d0566150e;hp=0000000000000000000000000000000000000000;hpb=96f35b20267e8b1a1c846d476f27fcd330e0b018;p=debian%2Famanda diff --git a/installcheck/Installcheck/ClientService.pm b/installcheck/Installcheck/ClientService.pm new file mode 100644 index 0000000..942921a --- /dev/null +++ b/installcheck/Installcheck/ClientService.pm @@ -0,0 +1,737 @@ +# Copyright (c) 2010 Zmanda, Inc. All Rights Reserved. +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License version 2 as published +# by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300 +# Sunnyvale, CA 94086, USA, or: http://www.zmanda.com + +package Installcheck::ClientService; + +=head1 NAME + +Installcheck::ClientService - a harness for testing client services like +sendbackup or selfcheck. + +=head1 SYNOPSIS + + use Installcheck::ClientService; + + # fire up a fake amandad + my $service; + my $process_done = sub { + my ($wait_status) = @_; + Amanda::MainLoop::quit(); + }; + $service = Installcheck::ClientService->new( + service => 'amindexd', + emulate => 'amandad', + auth => 'bsdtcp', + process_done => $process_done); + # or + $service = Installcheck::ClientService->new( + service => 'amindexd', + emulate => 'inetd', + args => [ @args ], + process_done => $process_done); + $service->expect('main', + [ re => qr/^CONNECT (\d+)\n/, $handle_connect ], + [ re => qr/^ERROR (.*)\r\n/, $handle_error ]); + $service->expect('stream1', + [ eof => $handle_eof ]); + $service->expect('stream2', + [ header => $handle_header ]); + $service->expect('stream3', + [ data => $handle_data ]); + Amanda::MainLoop::run(); + +=head1 DESCRIPTION + +The C class re-implements the service-facing side +of amandad and inetd. It strips away all of the service-specific hacks and the +security API portions. It handles multiple, simultaneous, named, bidirectional +data streams with an expect-like interface. + +When emulating amandad, the service is run with the usual high-numbered file +descriptors pre-piped, and with 'amandad' in C and the C +parameter (which defaults to 'bsdtcp') in C. The service's stdout and +stdin are connected to the 'main' stream, and stderr is available as 'stderr'. +The three bidirectional streams on the high-numbered pipes are available as +'stream1', 'stream2', and 'stream3'. You should send a request packet on the +'main' stream and close it for writing, and read the reply from 'main'. Note +that you should omit the 'SERVICE' line of the request, as it is ordinarily +consumed by amandad itself. + +When emulating inetd, the service is run with a TCP socket attached to its +stdin and stdout, and 'installcheck' in C. Additional arguments can +be provided in the C parameter. The TCP socket is available as stream +'main'. + +=head2 Constructor + +See the SYNOPSIS for examples. The constructor's C parameter gives +the name of the service to run. The C parameter determines how the +service is invoked. The C and C parameters are described above. +The C parameter gives a sub which is called with the service's +wait status when the service exits and all of its file descriptors have been +drained. + +=head2 Killing Subprocess + +To kill the subprocess, call + + $service->kill(); + +this will send a SIGINT. Process termination proceeds as normal - +C will be called. + +=head2 Handling Streams + +Streams have simple strings as names; the standard names are described in the +DESCRIPTION section. + +To send data on a stream, use C: + + $service->send('main', 'Hello, service!\n'); + +Note that this method does not block until the data is sent. + +To close a stream, use C. It takes a stream name and direction, and +only closes that direction. For TCP connections, this means half-open +connections, while for file descriptors only one of the descriptors is closed. + + $service->close('data', 'w'); # close for reading + $service->close('data', 'r'); # close for writing + $service->close('data', 'b'); # close for both + +When emulating inetd, the C method can open a new connection to the +service, given a port number and a name for the new stream: + + $service->connect('index', $idx_port); + +=head2 Handling Incoming Data + +The read side of each stream has a set of I: expected events and +subs to call when those events occur. Each expectation comes in the form of an +arrayref, and starts with a string indicating its type. The simplest is a +regular expression: + + [ re => qr/^200 OK/, $got_ok ] + +In this case the C<$got_ok> sub will be called with the matched text. An +expected EOF is written + + [ eof => $got_eof ] + +To capture a stream of data, and call C<$got_data> on EOF with the number of +bytes consumed, use + + [ bytes_to_eof => $got_eof ] + +To capture a specific amount of data - in this case 32k - and pass it to +C<$got_header>, use + + [ bytes => 32768, $got_header ] + +The set of expectations for a stream is set with the C method. This +method completely replaces any previous expectations. + + $service->expect('data', + [ re => qr/^200 OK.*\n/, $got_ok ], + [ re => qr/^4\d\d .*\n/, $got_err ]); + +=cut + +use base qw( Exporter ); +use warnings; +use strict; +use Amanda::Constants; +use Amanda::MainLoop; +use Amanda::Paths; +use Amanda::Util; +use Amanda::Debug qw( debug ); +use POSIX qw( :fcntl_h ); +use POSIX; +use Data::Dumper; +use IO::Handle; +use Socket; + +use constant DATA_FD_OFFSET => $Amanda::Constants::DATA_FD_OFFSET; +use constant DATA_FD_COUNT => $Amanda::Constants::DATA_FD_COUNT; +our @EXPORT_OK = qw(DATA_FD_OFFSET DATA_FD_COUNT); +our %EXPORT_TAGS = ( constants => [ @EXPORT_OK ] ); + +sub new { + my $class = shift; + my %params = @_; + + my $self = bless { + emulate => $params{'emulate'}, + service => $params{'service'}, + process_done => $params{'process_done'}, + auth => $params{'auth'} || 'bsdtcp', + args => $params{'args'} || [], + + # all hashes keyed by stream name + stream_fds => {}, + outstanding_writes => {}, + close_after_write => {}, + + read_buf => {}, + got_eof => {}, + + expectations => {}, + }, $class; + + if ($self->{'emulate'} eq 'amandad') { + $self->_start_process_amandad(); + } elsif ($self->{'emulate'} eq 'inetd') { + $self->_start_process_inetd(); + } else { + die "invalid 'emulate' parameter"; + } + + return $self; +} + +sub send { + my $self = shift; + my ($name, $data) = @_; + + my $fd = $self->{'stream_fds'}{$name}[1]; + die "stream '$name' is not writable" + unless defined $fd and $fd != -1; + + return if $data eq ''; + + $self->{'outstanding_writes'}{$name}++; + Amanda::MainLoop::async_write( + fd => $fd, + data => $data, + async_write_cb => sub { + my ($err, $bytes_written) = @_; + die "on stream $name: $err" if $err; + + $self->_log_data(">>", $name, $data); + + $self->{'outstanding_writes'}{$name}--; + if ($self->{'close_after_write'}{$name} + and $self->{'outstanding_writes'}{$name} == 0) { + $self->_do_close_write($name); + } + }); +} + +sub connect { + my $self = shift; + my ($name, $port) = @_; + + socket(my $child, PF_INET, SOCK_STREAM, getprotobyname('tcp')) + or die "error creating connect socket: $!"; + connect($child, sockaddr_in($port, inet_aton("127.0.0.1"))) + or die "error connecting: $!"; + + # get our own fd for the socket that Perl won't close for us, and + # close the perl socket + my $fd = dup(fileno($child)); + close($child); + + $self->_add_stream($name, $fd, $fd); +} + +sub close { + my $self = shift; + my ($name, $for) = @_; + + die "stream '$name' does not exist" + unless exists $self->{'stream_fds'}{$name}; + + # translate 'b'oth into 'r'ead and 'w'rite + if ($for eq 'b') { + $self->close($name, 'r'); + $self->close($name, 'w'); + return; + } + + if ($for eq 'w') { + if ($self->{'outstanding_writes'}{$name}) { + # close when the writes are done + $self->{'close_after_write'}{$name} = 1; + } else { + $self->_do_close_write($name); + } + } else { + $self->_do_close_read($name); + } +} + +sub expect { + my $self = shift; + my ($name, @expectations) = @_; + + for my $exp (@expectations) { + # set up a byte counter for bytes_to_eof + if ($exp->[0] eq 'bytes_to_eof') { + $exp->[2] = 0; + } + } + + $self->{'expectations'}{$name} = [ @expectations ]; + + $self->_check_expectations($name); +} + +sub kill { + my $self = shift; + + kill 'INT', $self->{'pid'}; +} + +# private methods + +sub _start_process_amandad { + my $self = shift; + my $i; + + my $service = "$amlibexecdir/$self->{service}"; + die "service '$service' does not exist" unless -x $service; + + # we'll need some pipes: + my ($stdin_c, $stdin_p) = POSIX::pipe(); + my ($stdout_p, $stdout_c) = POSIX::pipe(); + my ($stderr_p, $stderr_c) = POSIX::pipe(); + my @data_fdpairs; + for ($i = 0; $i < DATA_FD_COUNT; $i++) { + my ($in_c, $in_p) = POSIX::pipe(); + my ($out_p, $out_c) = POSIX::pipe(); + push @data_fdpairs, [ $in_c, $in_p, $out_p, $out_c ]; + } + + # fork and execute! + $self->{'pid'} = POSIX::fork(); + die "could not fork: $!" if (!defined $self->{'pid'} || $self->{'pid'} < 0); + if ($self->{'pid'} == 0) { + # child + + my $fd; + my $fdpair; + + # First, close all of the fd's we don't need. + POSIX::close($stdin_p); + POSIX::close($stdout_p); + POSIX::close($stderr_p); + for $fdpair (@data_fdpairs) { + my ($in_c, $in_p, $out_p, $out_c) = @$fdpair; + POSIX::close($in_p); + POSIX::close($out_p); + } + + # dup our in/out fd's appropriately + POSIX::dup2($stdin_c, 0); + POSIX::dup2($stdout_c, 1); + POSIX::dup2($stderr_c, 2); + POSIX::close($stdin_c); + POSIX::close($stdout_c); + POSIX::close($stderr_c); + + # then make sure everything is greater than the highest + # fd we'll need + my @fds_to_close; + for $fdpair (@data_fdpairs) { + my ($in_c, $in_p, $out_p, $out_c) = @$fdpair; + while ($in_c < DATA_FD_OFFSET + DATA_FD_COUNT * 2) { + push @fds_to_close, $in_c; + $in_c = POSIX::dup($in_c); + } + while ($out_c < DATA_FD_OFFSET + DATA_FD_COUNT * 2) { + push @fds_to_close, $out_c; + $out_c = POSIX::dup($out_c); + } + $fdpair->[0] = $in_c; + $fdpair->[3] = $out_c; + } + + # close all of the leftovers + for $fd (@fds_to_close) { + POSIX::close($fd); + } + + # and now use dup2 to move everything to its final location (whew!) + for ($i = 0; $i < DATA_FD_COUNT; $i++) { + my ($in_c, $in_p, $out_p, $out_c) = @{$data_fdpairs[$i]}; + POSIX::dup2($out_c, DATA_FD_OFFSET + $i*2); + POSIX::dup2($in_c, DATA_FD_OFFSET + $i*2 + 1); + POSIX::close($out_c); + POSIX::close($in_c); + } + + # finally, execute! + # braces avoid warning + { exec { $service } $service, 'amandad', $self->{'auth'}; } + my $err = "could not execute $service; $!\n"; + POSIX::write(2, $err, length($err)); + exit 2; + } + + # parent + + # watch for the child to die + Amanda::MainLoop::call_on_child_termination($self->{'pid'}, + sub { $self->_process_done(@_); }); + + # close all of the fd's we don't need, and make notes of the fd's + # we want to keep around + + POSIX::close($stdin_c); + POSIX::close($stdout_c); + $self->_add_stream('main', $stdout_p, $stdin_p); + + POSIX::close($stderr_c); + $self->_add_stream('stderr', $stderr_p, -1); + + for ($i = 0; $i < DATA_FD_COUNT; $i++) { + my ($in_c, $in_p, $out_p, $out_c) = @{$data_fdpairs[$i]}; + POSIX::close($in_c); + POSIX::close($out_c); + + $self->_add_stream('stream'.($i+1), $out_p, $in_p); + } +} + +sub _start_process_inetd { + my $self = shift; + my $i; + + # figure out the service + my $service = "$amlibexecdir/$self->{service}"; + die "service '$service' does not exist" unless -x $service; + + # set up and bind a listening socket on localhost + socket(SERVER, PF_INET, SOCK_STREAM, getprotobyname('tcp')) + or die "creating socket: $!"; + bind(SERVER, sockaddr_in(0, inet_aton("127.0.0.1"))) + or die "binding socket: $!"; + listen(SERVER, 1); + my ($port, $addr) = sockaddr_in(getsockname(SERVER)); + + # fork and execute! + $self->{'pid'} = POSIX::fork(); + die "could not fork: $!" if ($self->{'pid'} < 0); + if ($self->{'pid'} == 0) { + # child + + # send stderr to debug + Amanda::Debug::debug_dup_stderr_to_debug(); + + # wait for a connection on the socket, waiting a long time + # but not forever.. + alarm 60*60*24; # one day + my $paddr = accept(CLIENT, SERVER); + CORE::close(SERVER); + alarm 0; + + # dup that into stdio + POSIX::dup2(fileno(CLIENT), 0); + POSIX::dup2(fileno(CLIENT), 1); + CORE::close(CLIENT); + + # finally, execute! + # braces avoid warning + { exec { $service } $service, 'installcheck', @{$self->{'args'}}; } + my $err = "could not execute $service; $!\n"; + POSIX::write(2, $err, length($err)); + exit 2; + } + + # parent + + # watch for the child to die + Amanda::MainLoop::call_on_child_termination($self->{'pid'}, + sub { $self->_process_done(@_); }); + + # close the server socket + CORE::close(SERVER); + + # connect to the child + $self->connect('main', $port); +} + +sub _add_stream { + my $self = shift; + my ($name, $rfd, $wfd) = @_; + + if (exists $self->{'stream_fds'}{$name}) { + die "stream $name already exists"; + } + + $self->{'stream_fds'}{$name} = [ $rfd, $wfd ]; + $self->{'read_sources'}{$name} = undef; + $self->{'outstanding_writes'}{$name} = 0; + $self->{'close_after_write'}{$name} = 0; + + # start an async read on every read_fd we set up, after making it not-blocking + if ($rfd != -1) { + my $async_read_cb; + + Amanda::Util::set_blocking($rfd, 0); + $self->{'read_buf'}{$name} = ''; + $self->{'got_eof'}{$name} = 0; + + $async_read_cb = sub { + my ($err, $data) = @_; + die "on stream $name: $err" if $err; + + # log it + $self->_log_data("<<", $name, $data); + + # prep for next time + if ($data) { + $self->{'read_sources'}{$name} = + Amanda::MainLoop::async_read( + fd => $rfd, + async_read_cb => $async_read_cb); + } else { + delete $self->{'read_sources'}{$name}; + $self->_do_close_read($name); + } + + # add the data to the buffer, or signal EOF + if ($data) { + $self->{'read_buf'}{$name} .= $data; + } else { + $self->{'got_eof'}{$name} = 1; + } + + # and call the user function + $self->_check_expectations($name); + }; + + $self->{'read_sources'}{$name} = + Amanda::MainLoop::async_read( + fd => $rfd, + async_read_cb => $async_read_cb); + } + + # set all the write_fd's to non-blocking too. + if ($wfd != -1) { + Amanda::Util::set_blocking($wfd, 0); + } +} + +sub _do_close_read { + my $self = shift; + my ($name) = @_; + + my $fds = $self->{'stream_fds'}{$name}; + + if ($fds->[0] == -1) { + die "$name is already closed for reading"; + } + + debug("XX closing $name for reading"); + + # remove any ongoing reads + if ($self->{'read_sources'}{$name}) { + $self->{'read_sources'}{$name}->remove(); + delete $self->{'read_sources'}{$name}; + } + + # if both fd's are the same, then this is probably a socket, so shut down + # the read side + if ($fds->[0] == $fds->[1]) { + # perl doesn't provide a fd-compatible shutdown, but luckily shudown + # affects dup'd file descriptors, too! So create a new handle and shut + # it down. When the handle is garbage collected, it will be closed, + # but that will not affect the original. This will look strange in an + # strace, but it works without SWIGging shutdown() + shutdown(IO::Handle->new_from_fd(POSIX::dup($fds->[0]), "r"), 0); + } else { + POSIX::close($fds->[0]); + } + $fds->[0] = -1; + + if ($fds->[1] == -1) { + delete $self->{'stream_fds'}{$name}; + } +} + +sub _do_close_write { + my $self = shift; + my ($name, $for) = @_; + + my $fds = $self->{'stream_fds'}{$name}; + + if ($fds->[1] == -1) { + die "$name is already closed for writing"; + } + + debug("XX closing $name for writing"); + + # if both fd's are the same, then this is probably a socket, so shut down + # the write side + if ($fds->[1] == $fds->[0]) { + # (see above) + shutdown(IO::Handle->new_from_fd(POSIX::dup($fds->[1]), "w"), 1); + } else { + POSIX::close($fds->[1]); + } + $fds->[1] = -1; + + if ($fds->[0] == -1) { + delete $self->{'stream_fds'}{$name}; + } + delete $self->{'outstanding_writes'}{$name}; + delete $self->{'close_after_write'}{$name}; +} + +sub _process_done { + my $self = shift; + my ($exitstatus) = @_; + + debug("service exit: $exitstatus"); + + # delay this to the next trip around the MainLoop, in case data is available + # on any fd's + Amanda::MainLoop::call_later(\&_do_process_done, $self, $exitstatus); +} + +sub _do_process_done { + my $self = shift; + my ($exitstatus) = @_; + + $self->{'process_done_loops'} = ($self->{'process_done_loops'} || 0) + 1; + + # defer with call_after if there are still read fd's open or data in a read + # buffer. Since the process just died, presumably these will close in this + # trip around the MainLoop, so this will be a very short busywait. The upper + # bound on the wait is 1 second. + if ($self->{'process_done_loops'} < 100) { + my $still_busy = 0; + for my $name (keys %{$self->{'stream_fds'}}) { + my $fds = $self->{'stream_fds'}{$name}; + # if we're still expecting something on this stream.. + if ($self->{'expectations'}{$name}) { + $still_busy = 1; + } + # or the stream's not closed yet.. + if ($fds->[0] != -1) { + $still_busy = 1; + } + } + if ($still_busy) { + return Amanda::MainLoop::call_after(10, \&_do_process_done, $self, $exitstatus); + } + } + + # close all of the write_fd's. If there are pending writes, they + # were going to get a SIGPIPE anyway. + for my $name (keys %{$self->{'stream_fds'}}) { + my $fds = $self->{'stream_fds'}{$name}; + if ($fds->[1] != -1) { + $self->_do_close_write($name); + } + } + + $self->{'process_done'}->($exitstatus); +} + +sub _log_data { + my $self = shift; + my ($dir, $name, $data) = @_; + + if ($data) { + if (length($data) < 300) { + my $printable = $data; + $printable =~ s/[^\r\n[:print:]]+//g; + $printable =~ s/\n/\\n/g; + $printable =~ s/\r/\\r/g; + debug("$dir$name: [$printable]"); + } else { + debug(sprintf("$dir$name: %d bytes", length($data))); + } + } else { + debug("$dir$name: EOF"); + } +} + +sub _check_expectations { + my $self = shift; + my ($name) = @_; + + my $expectations = $self->{'expectations'}{$name}; + return unless defined $expectations and @$expectations; + + my $cb = undef; + my @args = undef; + # if we got EOF and have no more pending data, look for a matching + # expectation + if ($self->{'got_eof'}{$name} and !$self->{'read_buf'}{$name}) { + for my $exp (@$expectations) { + if ($exp->[0] eq 'eof') { + $cb = $exp->[1]; + @args = (); + last; + } elsif ($exp->[0] eq 'bytes_to_eof') { + $cb = $exp->[1]; + @args = ($exp->[2],); # byte count + last; + } + } + + if (!$cb) { + debug("Expected on $name: " . Dumper($expectations)); + die "Unexpected EOF on $name"; + } + } elsif ($self->{'read_buf'}{$name}) { + my $buf = $self->{'read_buf'}{$name}; + + for my $exp (@$expectations) { + if ($exp->[0] eq 'eof') { + die "Expected EOF but got data on $name"; + } elsif ($exp->[0] eq 'bytes_to_eof') { + # store the ongoing byte count in the expectation itself + $exp->[2] = ($exp->[2] || 0) + length($buf); + $self->{'read_buf'}{$name} = ''; + # and if this stream *also* has EOF, call back + if ($self->{'got_eof'}{$name}) { + $cb = $exp->[1]; + @args = ($exp->[2],); # byte count + } + last; + } elsif ($exp->[0] eq 'bytes') { + if (length($buf) >= $exp->[1]) { + $cb = $exp->[2]; + @args = (substr($buf, 0, $exp->[1]),); + $self->{'read_buf'}{$name} = substr($buf, $exp->[1]); + } + last; # done searching, even if we don't call a sub + } elsif ($exp->[0] eq 're') { + if ($buf =~ $exp->[1]) { + $cb = $exp->[2]; + @args = ($&,); # matched section of $buf + $self->{'read_buf'}{$name} = $'; # remainder of $buf + last; + } + } + } + } + + # if there's a callback to make, then remove the expectations *before* + # calling it + if ($cb) { + delete $self->{'expectations'}{$name}; + $cb->(@args); + } +} + +1;