1 # Copyright (c) 2010 Zmanda, Inc. All Rights Reserved.
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License version 2 as published
5 # by the Free Software Foundation.
7 # This program is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
12 # You should have received a copy of the GNU General Public License along
13 # with this program; if not, write to the Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
16 # Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
19 package Installcheck::ClientService;
23 Installcheck::ClientService - a harness for testing client services like
24 sendbackup or selfcheck.
28 use Installcheck::ClientService;
30 # fire up a fake amandad
32 my $process_done = sub {
33 my ($wait_status) = @_;
34 Amanda::MainLoop::quit();
36 $service = Installcheck::ClientService->new(
37 service => 'amindexd',
40 process_done => $process_done);
42 $service = Installcheck::ClientService->new(
43 service => 'amindexd',
46 process_done => $process_done);
47 $service->expect('main',
48 [ re => qr/^CONNECT (\d+)\n/, $handle_connect ],
49 [ re => qr/^ERROR (.*)\r\n/, $handle_error ]);
50 $service->expect('stream1',
51 [ eof => $handle_eof ]);
52 $service->expect('stream2',
53 [ header => $handle_header ]);
54 $service->expect('stream3',
55 [ data => $handle_data ]);
56 Amanda::MainLoop::run();
60 The C<Installcheck::ClientService> class re-implements the service-facing side
61 of amandad and inetd. It strips away all of the service-specific hacks and the
62 security API portions. It handles multiple, simultaneous, named, bidirectional
63 data streams with an expect-like interface.
65 When emulating amandad, the service is run with the usual high-numbered file
66 descriptors pre-piped, and with 'amandad' in C<argv[1]> and the C<auth>
67 parameter (which defaults to 'bsdtcp') in C<argv[2]>. The service's stdout and
68 stdin are connected to the 'main' stream, and stderr is available as 'stderr'.
69 The three bidirectional streams on the high-numbered pipes are available as
70 'stream1', 'stream2', and 'stream3'. You should send a request packet on the
71 'main' stream and close it for writing, and read the reply from 'main'. Note
72 that you should omit the 'SERVICE' line of the request, as it is ordinarily
73 consumed by amandad itself.
75 When emulating inetd, the service is run with a TCP socket attached to its
76 stdin and stdout, and 'installcheck' in C<argv[1]>. Additional arguments can
77 be provided in the C<args> parameter. The TCP socket is available as stream
82 See the SYNOPSIS for examples. The constructor's C<service> parameter gives
83 the name of the service to run. The C<emulate> parameter determines how the
84 service is invoked. The C<args> and C<auth> parameters are described above.
85 The C<process_done> parameter gives a sub which is called with the service's
86 wait status when the service exits and all of its file descriptors have been
89 =head2 Killing Subprocess
91 To kill the subprocess, call
95 this will send a SIGINT. Process termination proceeds as normal -
96 C<process_done> will be called.
98 =head2 Handling Streams
100 Streams have simple strings as names; the standard names are described in the
103 To send data on a stream, use C<send>:
105 $service->send('main', 'Hello, service!\n');
107 Note that this method does not block until the data is sent.
109 To close a stream, use C<close>. It takes a stream name and direction, and
110 only closes that direction. For TCP connections, this means half-open
111 connections, while for file descriptors only one of the descriptors is closed.
113 $service->close('data', 'w'); # close for reading
114 $service->close('data', 'r'); # close for writing
115 $service->close('data', 'b'); # close for both
117 When emulating inetd, the C<connect> method can open a new connection to the
118 service, given a port number and a name for the new stream:
120 $service->connect('index', $idx_port);
122 =head2 Handling Incoming Data
124 The read side of each stream has a set of I<expectations>: expected events and
125 subs to call when those events occur. Each expectation comes in the form of an
126 arrayref, and starts with a string indicating its type. The simplest is a
129 [ re => qr/^200 OK/, $got_ok ]
131 In this case the C<$got_ok> sub will be called with the matched text. An
132 expected EOF is written
136 To capture a stream of data, and call C<$got_data> on EOF with the number of
139 [ bytes_to_eof => $got_eof ]
141 To capture a specific amount of data - in this case 32k - and pass it to
144 [ bytes => 32768, $got_header ]
146 The set of expectations for a stream is set with the C<expect> method. This
147 method completely replaces any previous expectations.
149 $service->expect('data',
150 [ re => qr/^200 OK.*\n/, $got_ok ],
151 [ re => qr/^4\d\d .*\n/, $got_err ]);
155 use base qw( Exporter );
158 use Amanda::Constants;
159 use Amanda::MainLoop;
162 use Amanda::Debug qw( debug );
163 use POSIX qw( :fcntl_h );
169 use constant DATA_FD_OFFSET => $Amanda::Constants::DATA_FD_OFFSET;
170 use constant DATA_FD_COUNT => $Amanda::Constants::DATA_FD_COUNT;
171 our @EXPORT_OK = qw(DATA_FD_OFFSET DATA_FD_COUNT);
172 our %EXPORT_TAGS = ( constants => [ @EXPORT_OK ] );
179 emulate => $params{'emulate'},
180 service => $params{'service'},
181 process_done => $params{'process_done'},
182 auth => $params{'auth'} || 'bsdtcp',
183 args => $params{'args'} || [],
185 # all hashes keyed by stream name
187 outstanding_writes => {},
188 close_after_write => {},
196 if ($self->{'emulate'} eq 'amandad') {
197 $self->_start_process_amandad();
198 } elsif ($self->{'emulate'} eq 'inetd') {
199 $self->_start_process_inetd();
201 die "invalid 'emulate' parameter";
209 my ($name, $data) = @_;
211 my $fd = $self->{'stream_fds'}{$name}[1];
212 die "stream '$name' is not writable"
213 unless defined $fd and $fd != -1;
215 return if $data eq '';
217 $self->{'outstanding_writes'}{$name}++;
218 Amanda::MainLoop::async_write(
221 async_write_cb => sub {
222 my ($err, $bytes_written) = @_;
223 die "on stream $name: $err" if $err;
225 $self->_log_data(">>", $name, $data);
227 $self->{'outstanding_writes'}{$name}--;
228 if ($self->{'close_after_write'}{$name}
229 and $self->{'outstanding_writes'}{$name} == 0) {
230 $self->_do_close_write($name);
237 my ($name, $port) = @_;
239 socket(my $child, PF_INET, SOCK_STREAM, getprotobyname('tcp'))
240 or die "error creating connect socket: $!";
241 connect($child, sockaddr_in($port, inet_aton("127.0.0.1")))
242 or die "error connecting: $!";
244 # get our own fd for the socket that Perl won't close for us, and
245 # close the perl socket
246 my $fd = dup(fileno($child));
249 $self->_add_stream($name, $fd, $fd);
254 my ($name, $for) = @_;
256 die "stream '$name' does not exist"
257 unless exists $self->{'stream_fds'}{$name};
259 # translate 'b'oth into 'r'ead and 'w'rite
261 $self->close($name, 'r');
262 $self->close($name, 'w');
267 if ($self->{'outstanding_writes'}{$name}) {
268 # close when the writes are done
269 $self->{'close_after_write'}{$name} = 1;
271 $self->_do_close_write($name);
274 $self->_do_close_read($name);
280 my ($name, @expectations) = @_;
282 for my $exp (@expectations) {
283 # set up a byte counter for bytes_to_eof
284 if ($exp->[0] eq 'bytes_to_eof') {
289 $self->{'expectations'}{$name} = [ @expectations ];
291 $self->_check_expectations($name);
297 kill 'INT', $self->{'pid'};
302 sub _start_process_amandad {
306 my $service = "$amlibexecdir/$self->{service}";
307 die "service '$service' does not exist" unless -x $service;
309 # we'll need some pipes:
310 my ($stdin_c, $stdin_p) = POSIX::pipe();
311 my ($stdout_p, $stdout_c) = POSIX::pipe();
312 my ($stderr_p, $stderr_c) = POSIX::pipe();
314 for ($i = 0; $i < DATA_FD_COUNT; $i++) {
315 my ($in_c, $in_p) = POSIX::pipe();
316 my ($out_p, $out_c) = POSIX::pipe();
317 push @data_fdpairs, [ $in_c, $in_p, $out_p, $out_c ];
321 $self->{'pid'} = POSIX::fork();
322 die "could not fork: $!" if (!defined $self->{'pid'} || $self->{'pid'} < 0);
323 if ($self->{'pid'} == 0) {
329 # First, close all of the fd's we don't need.
330 POSIX::close($stdin_p);
331 POSIX::close($stdout_p);
332 POSIX::close($stderr_p);
333 for $fdpair (@data_fdpairs) {
334 my ($in_c, $in_p, $out_p, $out_c) = @$fdpair;
336 POSIX::close($out_p);
339 # dup our in/out fd's appropriately
340 POSIX::dup2($stdin_c, 0);
341 POSIX::dup2($stdout_c, 1);
342 POSIX::dup2($stderr_c, 2);
343 POSIX::close($stdin_c);
344 POSIX::close($stdout_c);
345 POSIX::close($stderr_c);
347 # then make sure everything is greater than the highest
350 for $fdpair (@data_fdpairs) {
351 my ($in_c, $in_p, $out_p, $out_c) = @$fdpair;
352 while ($in_c < DATA_FD_OFFSET + DATA_FD_COUNT * 2) {
353 push @fds_to_close, $in_c;
354 $in_c = POSIX::dup($in_c);
356 while ($out_c < DATA_FD_OFFSET + DATA_FD_COUNT * 2) {
357 push @fds_to_close, $out_c;
358 $out_c = POSIX::dup($out_c);
360 $fdpair->[0] = $in_c;
361 $fdpair->[3] = $out_c;
364 # close all of the leftovers
365 for $fd (@fds_to_close) {
369 # and now use dup2 to move everything to its final location (whew!)
370 for ($i = 0; $i < DATA_FD_COUNT; $i++) {
371 my ($in_c, $in_p, $out_p, $out_c) = @{$data_fdpairs[$i]};
372 POSIX::dup2($out_c, DATA_FD_OFFSET + $i*2);
373 POSIX::dup2($in_c, DATA_FD_OFFSET + $i*2 + 1);
374 POSIX::close($out_c);
379 # braces avoid warning
380 { exec { $service } $service, 'amandad', $self->{'auth'}; }
381 my $err = "could not execute $service; $!\n";
382 POSIX::write(2, $err, length($err));
388 # watch for the child to die
389 Amanda::MainLoop::call_on_child_termination($self->{'pid'},
390 sub { $self->_process_done(@_); });
392 # close all of the fd's we don't need, and make notes of the fd's
393 # we want to keep around
395 POSIX::close($stdin_c);
396 POSIX::close($stdout_c);
397 $self->_add_stream('main', $stdout_p, $stdin_p);
399 POSIX::close($stderr_c);
400 $self->_add_stream('stderr', $stderr_p, -1);
402 for ($i = 0; $i < DATA_FD_COUNT; $i++) {
403 my ($in_c, $in_p, $out_p, $out_c) = @{$data_fdpairs[$i]};
405 POSIX::close($out_c);
407 $self->_add_stream('stream'.($i+1), $out_p, $in_p);
411 sub _start_process_inetd {
415 # figure out the service
416 my $service = "$amlibexecdir/$self->{service}";
417 die "service '$service' does not exist" unless -x $service;
419 # set up and bind a listening socket on localhost
420 socket(SERVER, PF_INET, SOCK_STREAM, getprotobyname('tcp'))
421 or die "creating socket: $!";
422 bind(SERVER, sockaddr_in(0, inet_aton("127.0.0.1")))
423 or die "binding socket: $!";
425 my ($port, $addr) = sockaddr_in(getsockname(SERVER));
428 $self->{'pid'} = POSIX::fork();
429 die "could not fork: $!" if ($self->{'pid'} < 0);
430 if ($self->{'pid'} == 0) {
433 # send stderr to debug
434 Amanda::Debug::debug_dup_stderr_to_debug();
436 # wait for a connection on the socket, waiting a long time
438 alarm 60*60*24; # one day
439 my $paddr = accept(CLIENT, SERVER);
443 # dup that into stdio
444 POSIX::dup2(fileno(CLIENT), 0);
445 POSIX::dup2(fileno(CLIENT), 1);
449 # braces avoid warning
450 { exec { $service } $service, 'installcheck', @{$self->{'args'}}; }
451 my $err = "could not execute $service; $!\n";
452 POSIX::write(2, $err, length($err));
458 # watch for the child to die
459 Amanda::MainLoop::call_on_child_termination($self->{'pid'},
460 sub { $self->_process_done(@_); });
462 # close the server socket
465 # connect to the child
466 $self->connect('main', $port);
471 my ($name, $rfd, $wfd) = @_;
473 if (exists $self->{'stream_fds'}{$name}) {
474 die "stream $name already exists";
477 $self->{'stream_fds'}{$name} = [ $rfd, $wfd ];
478 $self->{'read_sources'}{$name} = undef;
479 $self->{'outstanding_writes'}{$name} = 0;
480 $self->{'close_after_write'}{$name} = 0;
482 # start an async read on every read_fd we set up, after making it not-blocking
486 Amanda::Util::set_blocking($rfd, 0);
487 $self->{'read_buf'}{$name} = '';
488 $self->{'got_eof'}{$name} = 0;
490 $async_read_cb = sub {
491 my ($err, $data) = @_;
492 die "on stream $name: $err" if $err;
495 $self->_log_data("<<", $name, $data);
499 $self->{'read_sources'}{$name} =
500 Amanda::MainLoop::async_read(
502 async_read_cb => $async_read_cb);
504 delete $self->{'read_sources'}{$name};
505 $self->_do_close_read($name);
508 # add the data to the buffer, or signal EOF
510 $self->{'read_buf'}{$name} .= $data;
512 $self->{'got_eof'}{$name} = 1;
515 # and call the user function
516 $self->_check_expectations($name);
519 $self->{'read_sources'}{$name} =
520 Amanda::MainLoop::async_read(
522 async_read_cb => $async_read_cb);
525 # set all the write_fd's to non-blocking too.
527 Amanda::Util::set_blocking($wfd, 0);
535 my $fds = $self->{'stream_fds'}{$name};
537 if ($fds->[0] == -1) {
538 die "$name is already closed for reading";
541 debug("XX closing $name for reading");
543 # remove any ongoing reads
544 if ($self->{'read_sources'}{$name}) {
545 $self->{'read_sources'}{$name}->remove();
546 delete $self->{'read_sources'}{$name};
549 # if both fd's are the same, then this is probably a socket, so shut down
551 if ($fds->[0] == $fds->[1]) {
552 # perl doesn't provide a fd-compatible shutdown, but luckily shudown
553 # affects dup'd file descriptors, too! So create a new handle and shut
554 # it down. When the handle is garbage collected, it will be closed,
555 # but that will not affect the original. This will look strange in an
556 # strace, but it works without SWIGging shutdown()
557 shutdown(IO::Handle->new_from_fd(POSIX::dup($fds->[0]), "r"), 0);
559 POSIX::close($fds->[0]);
563 if ($fds->[1] == -1) {
564 delete $self->{'stream_fds'}{$name};
568 sub _do_close_write {
570 my ($name, $for) = @_;
572 my $fds = $self->{'stream_fds'}{$name};
574 if ($fds->[1] == -1) {
575 die "$name is already closed for writing";
578 debug("XX closing $name for writing");
580 # if both fd's are the same, then this is probably a socket, so shut down
582 if ($fds->[1] == $fds->[0]) {
584 shutdown(IO::Handle->new_from_fd(POSIX::dup($fds->[1]), "w"), 1);
586 POSIX::close($fds->[1]);
590 if ($fds->[0] == -1) {
591 delete $self->{'stream_fds'}{$name};
593 delete $self->{'outstanding_writes'}{$name};
594 delete $self->{'close_after_write'}{$name};
599 my ($exitstatus) = @_;
601 debug("service exit: $exitstatus");
603 # delay this to the next trip around the MainLoop, in case data is available
605 Amanda::MainLoop::call_later(\&_do_process_done, $self, $exitstatus);
608 sub _do_process_done {
610 my ($exitstatus) = @_;
612 $self->{'process_done_loops'} = ($self->{'process_done_loops'} || 0) + 1;
614 # defer with call_after if there are still read fd's open or data in a read
615 # buffer. Since the process just died, presumably these will close in this
616 # trip around the MainLoop, so this will be a very short busywait. The upper
617 # bound on the wait is 1 second.
618 if ($self->{'process_done_loops'} < 100) {
620 for my $name (keys %{$self->{'stream_fds'}}) {
621 my $fds = $self->{'stream_fds'}{$name};
622 # if we're still expecting something on this stream..
623 if ($self->{'expectations'}{$name}) {
626 # or the stream's not closed yet..
627 if ($fds->[0] != -1) {
632 return Amanda::MainLoop::call_after(10, \&_do_process_done, $self, $exitstatus);
636 # close all of the write_fd's. If there are pending writes, they
637 # were going to get a SIGPIPE anyway.
638 for my $name (keys %{$self->{'stream_fds'}}) {
639 my $fds = $self->{'stream_fds'}{$name};
640 if ($fds->[1] != -1) {
641 $self->_do_close_write($name);
645 $self->{'process_done'}->($exitstatus);
650 my ($dir, $name, $data) = @_;
653 if (length($data) < 300) {
654 my $printable = $data;
655 $printable =~ s/[^\r\n[:print:]]+//g;
656 $printable =~ s/\n/\\n/g;
657 $printable =~ s/\r/\\r/g;
658 debug("$dir$name: [$printable]");
660 debug(sprintf("$dir$name: %d bytes", length($data)));
663 debug("$dir$name: EOF");
667 sub _check_expectations {
671 my $expectations = $self->{'expectations'}{$name};
672 return unless defined $expectations and @$expectations;
676 # if we got EOF and have no more pending data, look for a matching
678 if ($self->{'got_eof'}{$name} and !$self->{'read_buf'}{$name}) {
679 for my $exp (@$expectations) {
680 if ($exp->[0] eq 'eof') {
684 } elsif ($exp->[0] eq 'bytes_to_eof') {
686 @args = ($exp->[2],); # byte count
692 debug("Expected on $name: " . Dumper($expectations));
693 die "Unexpected EOF on $name";
695 } elsif ($self->{'read_buf'}{$name}) {
696 my $buf = $self->{'read_buf'}{$name};
698 for my $exp (@$expectations) {
699 if ($exp->[0] eq 'eof') {
700 die "Expected EOF but got data on $name";
701 } elsif ($exp->[0] eq 'bytes_to_eof') {
702 # store the ongoing byte count in the expectation itself
703 $exp->[2] = ($exp->[2] || 0) + length($buf);
704 $self->{'read_buf'}{$name} = '';
705 # and if this stream *also* has EOF, call back
706 if ($self->{'got_eof'}{$name}) {
708 @args = ($exp->[2],); # byte count
711 } elsif ($exp->[0] eq 'bytes') {
712 if (length($buf) >= $exp->[1]) {
714 @args = (substr($buf, 0, $exp->[1]),);
715 $self->{'read_buf'}{$name} = substr($buf, $exp->[1]);
717 last; # done searching, even if we don't call a sub
718 } elsif ($exp->[0] eq 're') {
719 if ($buf =~ $exp->[1]) {
721 @args = ($&,); # matched section of $buf
722 $self->{'read_buf'}{$name} = $'; # remainder of $buf
729 # if there's a callback to make, then remove the expectations *before*
732 delete $self->{'expectations'}{$name};