1 # Copyright (c) 2010-2012 Zmanda, Inc. All Rights Reserved.
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 # Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
18 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
20 package Installcheck::ClientService;
24 Installcheck::ClientService - a harness for testing client services like
25 sendbackup or selfcheck.
29 use Installcheck::ClientService;
31 # fire up a fake amandad
33 my $process_done = sub {
34 my ($wait_status) = @_;
35 Amanda::MainLoop::quit();
37 $service = Installcheck::ClientService->new(
38 service => 'amindexd',
41 auth_peer => 'localhost',
42 process_done => $process_done);
44 $service = Installcheck::ClientService->new(
45 service => 'amindexd',
48 process_done => $process_done);
49 $service->expect('main',
50 [ re => qr/^CONNECT (\d+)\n/, $handle_connect ],
51 [ re => qr/^ERROR (.*)\r\n/, $handle_error ]);
52 $service->expect('stream1',
53 [ eof => $handle_eof ]);
54 $service->expect('stream2',
55 [ header => $handle_header ]);
56 $service->expect('stream3',
57 [ data => $handle_data ]);
58 Amanda::MainLoop::run();
62 The C<Installcheck::ClientService> class re-implements the service-facing side
63 of amandad and inetd. It strips away all of the service-specific hacks and the
64 security API portions. It handles multiple, simultaneous, named, bidirectional
65 data streams with an expect-like interface.
67 When emulating amandad, the service is run with the usual high-numbered file
68 descriptors pre-piped, and with 'amandad' in C<argv[1]> and the C<auth>
69 parameter (which defaults to 'bsdtcp') in C<argv[2]>. The service's stdout and
70 stdin are connected to the 'main' stream, and stderr is available as 'stderr'.
71 The three bidirectional streams on the high-numbered pipes are available as
72 'stream1', 'stream2', and 'stream3'. You should send a request packet on the
73 'main' stream and close it for writing, and read the reply from 'main'. Note
74 that you should omit the 'SERVICE' line of the request, as it is ordinarily
75 consumed by amandad itself.
77 When emulating inetd, the service is run with a TCP socket attached to its
78 stdin and stdout, and 'installcheck' in C<argv[1]>. Additional arguments can
79 be provided in the C<args> parameter. The TCP socket is available as stream
84 See the SYNOPSIS for examples. The constructor's C<service> parameter gives
85 the name of the service to run. The C<emulate> parameter determines how the
86 service is invoked. The C<args> and C<auth> parameters are described above.
87 The C<process_done> parameter gives a sub which is called with the service's
88 wait status when the service exits and all of its file descriptors have been
89 drained. The C<auth_peer> parameter gives the value for
90 C<$AMANDA_AUTHENTICATED_PEER> when emulating amandad.
92 =head2 Killing Subprocess
94 To kill the subprocess, call
98 this will send a SIGINT. Process termination proceeds as normal -
99 C<process_done> will be called.
101 =head2 Handling Streams
103 Streams have simple strings as names; the standard names are described in the
106 To send data on a stream, use C<send>:
108 $service->send('main', 'Hello, service!\n');
110 Note that this method does not block until the data is sent.
112 To close a stream, use C<close>. It takes a stream name and direction, and
113 only closes that direction. For TCP connections, this means half-open
114 connections, while for file descriptors only one of the descriptors is closed.
116 $service->close('data', 'w'); # close for reading
117 $service->close('data', 'r'); # close for writing
118 $service->close('data', 'b'); # close for both
120 When emulating inetd, the C<connect> method can open a new connection to the
121 service, given a port number and a name for the new stream:
123 $service->connect('index', $idx_port);
125 =head2 Handling Incoming Data
127 The read side of each stream has a set of I<expectations>: expected events and
128 subs to call when those events occur. Each expectation comes in the form of an
129 arrayref, and starts with a string indicating its type. The simplest is a
132 [ re => qr/^200 OK/, $got_ok ]
134 In this case the C<$got_ok> sub will be called with the matched text. An
135 expected EOF is written
139 To capture a stream of data, and call C<$got_data> on EOF with the number of
142 [ bytes_to_eof => $got_eof ]
144 To capture a specific amount of data - in this case 32k - and pass it to
147 [ bytes => 32768, $got_header ]
149 The set of expectations for a stream is set with the C<expect> method. This
150 method completely replaces any previous expectations.
152 $service->expect('data',
153 [ re => qr/^200 OK.*\n/, $got_ok ],
154 [ re => qr/^4\d\d .*\n/, $got_err ]);
158 use base qw( Exporter );
161 use Amanda::Constants;
162 use Amanda::MainLoop;
165 use Amanda::Debug qw( debug );
166 use POSIX qw( :fcntl_h );
172 use constant DATA_FD_OFFSET => $Amanda::Constants::DATA_FD_OFFSET;
173 use constant DATA_FD_COUNT => $Amanda::Constants::DATA_FD_COUNT;
174 our @EXPORT_OK = qw(DATA_FD_OFFSET DATA_FD_COUNT);
175 our %EXPORT_TAGS = ( constants => [ @EXPORT_OK ] );
182 emulate => $params{'emulate'},
183 service => $params{'service'},
184 process_done => $params{'process_done'},
185 auth => $params{'auth'} || 'bsdtcp',
186 args => $params{'args'} || [],
187 auth_peer => $params{'auth_peer'},
189 # all hashes keyed by stream name
191 outstanding_writes => {},
192 close_after_write => {},
200 if ($self->{'emulate'} eq 'amandad') {
201 $self->_start_process_amandad();
202 } elsif ($self->{'emulate'} eq 'inetd') {
203 $self->_start_process_inetd();
205 die "invalid 'emulate' parameter";
213 my ($name, $data) = @_;
215 my $fd = $self->{'stream_fds'}{$name}[1];
216 die "stream '$name' is not writable"
217 unless defined $fd and $fd != -1;
219 return if $data eq '';
221 $self->{'outstanding_writes'}{$name}++;
222 Amanda::MainLoop::async_write(
225 async_write_cb => sub {
226 my ($err, $bytes_written) = @_;
227 die "on stream $name: $err" if $err;
229 $self->_log_data(">>", $name, $data);
231 $self->{'outstanding_writes'}{$name}--;
232 if ($self->{'close_after_write'}{$name}
233 and $self->{'outstanding_writes'}{$name} == 0) {
234 $self->_do_close_write($name);
241 my ($name, $port) = @_;
243 socket(my $child, PF_INET, SOCK_STREAM, getprotobyname('tcp'))
244 or die "error creating connect socket: $!";
245 connect($child, sockaddr_in($port, inet_aton("127.0.0.1")))
246 or die "error connecting: $!";
248 # get our own fd for the socket that Perl won't close for us, and
249 # close the perl socket
250 my $fd = dup(fileno($child));
253 $self->_add_stream($name, $fd, $fd);
258 my ($name, $for) = @_;
260 die "stream '$name' does not exist"
261 unless exists $self->{'stream_fds'}{$name};
263 # translate 'b'oth into 'r'ead and 'w'rite
265 $self->close($name, 'r');
266 $self->close($name, 'w');
271 if ($self->{'outstanding_writes'}{$name}) {
272 # close when the writes are done
273 $self->{'close_after_write'}{$name} = 1;
275 $self->_do_close_write($name);
278 $self->_do_close_read($name);
284 my ($name, @expectations) = @_;
286 for my $exp (@expectations) {
287 # set up a byte counter for bytes_to_eof
288 if ($exp->[0] eq 'bytes_to_eof') {
293 $self->{'expectations'}{$name} = [ @expectations ];
295 $self->_check_expectations($name);
301 kill 'INT', $self->{'pid'};
306 sub _start_process_amandad {
310 my $service = "$amlibexecdir/$self->{service}";
311 die "service '$service' does not exist" unless -x $service;
313 # we'll need some pipes:
314 my ($stdin_c, $stdin_p) = POSIX::pipe();
315 my ($stdout_p, $stdout_c) = POSIX::pipe();
316 my ($stderr_p, $stderr_c) = POSIX::pipe();
318 for ($i = 0; $i < DATA_FD_COUNT; $i++) {
319 my ($in_c, $in_p) = POSIX::pipe();
320 my ($out_p, $out_c) = POSIX::pipe();
321 push @data_fdpairs, [ $in_c, $in_p, $out_p, $out_c ];
325 $self->{'pid'} = POSIX::fork();
326 die "could not fork: $!" if (!defined $self->{'pid'} || $self->{'pid'} < 0);
327 if ($self->{'pid'} == 0) {
333 # First, close all of the fd's we don't need.
334 POSIX::close($stdin_p);
335 POSIX::close($stdout_p);
336 POSIX::close($stderr_p);
337 for $fdpair (@data_fdpairs) {
338 my ($in_c, $in_p, $out_p, $out_c) = @$fdpair;
340 POSIX::close($out_p);
343 # dup our in/out fd's appropriately
344 POSIX::dup2($stdin_c, 0);
345 POSIX::dup2($stdout_c, 1);
346 POSIX::dup2($stderr_c, 2);
347 POSIX::close($stdin_c);
348 POSIX::close($stdout_c);
349 POSIX::close($stderr_c);
351 # then make sure everything is greater than the highest
354 for $fdpair (@data_fdpairs) {
355 my ($in_c, $in_p, $out_p, $out_c) = @$fdpair;
356 while ($in_c < DATA_FD_OFFSET + DATA_FD_COUNT * 2) {
357 push @fds_to_close, $in_c;
358 $in_c = POSIX::dup($in_c);
360 while ($out_c < DATA_FD_OFFSET + DATA_FD_COUNT * 2) {
361 push @fds_to_close, $out_c;
362 $out_c = POSIX::dup($out_c);
364 $fdpair->[0] = $in_c;
365 $fdpair->[3] = $out_c;
368 # close all of the leftovers
369 for $fd (@fds_to_close) {
373 # and now use dup2 to move everything to its final location (whew!)
374 for ($i = 0; $i < DATA_FD_COUNT; $i++) {
375 my ($in_c, $in_p, $out_p, $out_c) = @{$data_fdpairs[$i]};
376 POSIX::dup2($out_c, DATA_FD_OFFSET + $i*2);
377 POSIX::dup2($in_c, DATA_FD_OFFSET + $i*2 + 1);
378 POSIX::close($out_c);
382 delete $ENV{'AMANDA_AUTHENTICATED_PEER'};
383 $ENV{'AMANDA_AUTHENTICATED_PEER'} = $self->{'auth_peer'} if $self->{'auth_peer'};
386 # braces avoid warning
387 { exec { $service } $service, 'amandad', $self->{'auth'}; }
388 my $err = "could not execute $service; $!\n";
389 POSIX::write(2, $err, length($err));
395 # watch for the child to die
396 Amanda::MainLoop::call_on_child_termination($self->{'pid'},
397 sub { $self->_process_done(@_); });
399 # close all of the fd's we don't need, and make notes of the fd's
400 # we want to keep around
402 POSIX::close($stdin_c);
403 POSIX::close($stdout_c);
404 $self->_add_stream('main', $stdout_p, $stdin_p);
406 POSIX::close($stderr_c);
407 $self->_add_stream('stderr', $stderr_p, -1);
409 for ($i = 0; $i < DATA_FD_COUNT; $i++) {
410 my ($in_c, $in_p, $out_p, $out_c) = @{$data_fdpairs[$i]};
412 POSIX::close($out_c);
414 $self->_add_stream('stream'.($i+1), $out_p, $in_p);
418 sub _start_process_inetd {
422 # figure out the service
423 my $service = "$amlibexecdir/$self->{service}";
424 die "service '$service' does not exist" unless -x $service;
426 # set up and bind a listening socket on localhost
427 socket(SERVER, PF_INET, SOCK_STREAM, getprotobyname('tcp'))
428 or die "creating socket: $!";
429 bind(SERVER, sockaddr_in(0, inet_aton("127.0.0.1")))
430 or die "binding socket: $!";
432 my ($port, $addr) = sockaddr_in(getsockname(SERVER));
435 $self->{'pid'} = POSIX::fork();
436 die "could not fork: $!" if ($self->{'pid'} < 0);
437 if ($self->{'pid'} == 0) {
440 # send stderr to debug
441 Amanda::Debug::debug_dup_stderr_to_debug();
443 # wait for a connection on the socket, waiting a long time
445 alarm 60*60*24; # one day
446 my $paddr = accept(CLIENT, SERVER);
450 # dup that into stdio
451 POSIX::dup2(fileno(CLIENT), 0);
452 POSIX::dup2(fileno(CLIENT), 1);
456 # braces avoid warning
457 { exec { $service } $service, 'installcheck', @{$self->{'args'}}; }
458 my $err = "could not execute $service; $!\n";
459 POSIX::write(2, $err, length($err));
465 # watch for the child to die
466 Amanda::MainLoop::call_on_child_termination($self->{'pid'},
467 sub { $self->_process_done(@_); });
469 # close the server socket
472 # connect to the child
473 $self->connect('main', $port);
478 my ($name, $rfd, $wfd) = @_;
480 if (exists $self->{'stream_fds'}{$name}) {
481 die "stream $name already exists";
484 $self->{'stream_fds'}{$name} = [ $rfd, $wfd ];
485 $self->{'read_sources'}{$name} = undef;
486 $self->{'outstanding_writes'}{$name} = 0;
487 $self->{'close_after_write'}{$name} = 0;
489 # start an async read on every read_fd we set up, after making it not-blocking
493 Amanda::Util::set_blocking($rfd, 0);
494 $self->{'read_buf'}{$name} = '';
495 $self->{'got_eof'}{$name} = 0;
497 $async_read_cb = sub {
498 my ($err, $data) = @_;
499 die "on stream $name: $err" if $err;
502 $self->_log_data("<<", $name, $data);
506 $self->{'read_sources'}{$name} =
507 Amanda::MainLoop::async_read(
509 async_read_cb => $async_read_cb);
511 delete $self->{'read_sources'}{$name};
512 $self->_do_close_read($name);
515 # add the data to the buffer, or signal EOF
517 $self->{'read_buf'}{$name} .= $data;
519 $self->{'got_eof'}{$name} = 1;
522 # and call the user function
523 $self->_check_expectations($name);
526 $self->{'read_sources'}{$name} =
527 Amanda::MainLoop::async_read(
529 async_read_cb => $async_read_cb);
532 # set all the write_fd's to non-blocking too.
534 Amanda::Util::set_blocking($wfd, 0);
542 my $fds = $self->{'stream_fds'}{$name};
544 if ($fds->[0] == -1) {
545 die "$name is already closed for reading";
548 debug("XX closing $name for reading");
550 # remove any ongoing reads
551 if ($self->{'read_sources'}{$name}) {
552 $self->{'read_sources'}{$name}->remove();
553 delete $self->{'read_sources'}{$name};
556 # if both fd's are the same, then this is probably a socket, so shut down
558 if ($fds->[0] == $fds->[1]) {
559 # perl doesn't provide a fd-compatible shutdown, but luckily shudown
560 # affects dup'd file descriptors, too! So create a new handle and shut
561 # it down. When the handle is garbage collected, it will be closed,
562 # but that will not affect the original. This will look strange in an
563 # strace, but it works without SWIGging shutdown()
564 shutdown(IO::Handle->new_from_fd(POSIX::dup($fds->[0]), "r"), 0);
566 POSIX::close($fds->[0]);
570 if ($fds->[1] == -1) {
571 delete $self->{'stream_fds'}{$name};
575 sub _do_close_write {
577 my ($name, $for) = @_;
579 my $fds = $self->{'stream_fds'}{$name};
581 if ($fds->[1] == -1) {
582 die "$name is already closed for writing";
585 debug("XX closing $name for writing");
587 # if both fd's are the same, then this is probably a socket, so shut down
589 if ($fds->[1] == $fds->[0]) {
591 shutdown(IO::Handle->new_from_fd(POSIX::dup($fds->[1]), "w"), 1);
593 POSIX::close($fds->[1]);
597 if ($fds->[0] == -1) {
598 delete $self->{'stream_fds'}{$name};
600 delete $self->{'outstanding_writes'}{$name};
601 delete $self->{'close_after_write'}{$name};
606 my ($exitstatus) = @_;
608 debug("service exit: $exitstatus");
610 # delay this to the next trip around the MainLoop, in case data is available
612 Amanda::MainLoop::call_later(\&_do_process_done, $self, $exitstatus);
615 sub _do_process_done {
617 my ($exitstatus) = @_;
619 $self->{'process_done_loops'} = ($self->{'process_done_loops'} || 0) + 1;
621 # defer with call_after if there are still read fd's open or data in a read
622 # buffer. Since the process just died, presumably these will close in this
623 # trip around the MainLoop, so this will be a very short busywait. The upper
624 # bound on the wait is 1 second.
625 if ($self->{'process_done_loops'} < 100) {
627 for my $name (keys %{$self->{'stream_fds'}}) {
628 my $fds = $self->{'stream_fds'}{$name};
629 # if we're still expecting something on this stream..
630 if ($self->{'expectations'}{$name}) {
633 # or the stream's not closed yet..
634 if ($fds->[0] != -1) {
639 return Amanda::MainLoop::call_after(10, \&_do_process_done, $self, $exitstatus);
643 # close all of the write_fd's. If there are pending writes, they
644 # were going to get a SIGPIPE anyway.
645 for my $name (keys %{$self->{'stream_fds'}}) {
646 my $fds = $self->{'stream_fds'}{$name};
647 if ($fds->[1] != -1) {
648 $self->_do_close_write($name);
652 $self->{'process_done'}->($exitstatus);
657 my ($dir, $name, $data) = @_;
660 if (length($data) < 300) {
661 my $printable = $data;
662 $printable =~ s/[^\r\n[:print:]]+//g;
663 $printable =~ s/\n/\\n/g;
664 $printable =~ s/\r/\\r/g;
665 debug("$dir$name: [$printable]");
667 debug(sprintf("$dir$name: %d bytes", length($data)));
670 debug("$dir$name: EOF");
674 sub _check_expectations {
678 my $expectations = $self->{'expectations'}{$name};
679 return unless defined $expectations and @$expectations;
683 # if we got EOF and have no more pending data, look for a matching
685 if ($self->{'got_eof'}{$name} and !$self->{'read_buf'}{$name}) {
686 for my $exp (@$expectations) {
687 if ($exp->[0] eq 'eof') {
691 } elsif ($exp->[0] eq 'bytes_to_eof') {
693 @args = ($exp->[2],); # byte count
699 debug("Expected on $name: " . Dumper($expectations));
700 die "Unexpected EOF on $name";
702 } elsif ($self->{'read_buf'}{$name}) {
703 my $buf = $self->{'read_buf'}{$name};
705 for my $exp (@$expectations) {
706 if ($exp->[0] eq 'eof') {
707 die "Expected EOF but got data on $name";
708 } elsif ($exp->[0] eq 'bytes_to_eof') {
709 # store the ongoing byte count in the expectation itself
710 $exp->[2] = ($exp->[2] || 0) + length($buf);
711 $self->{'read_buf'}{$name} = '';
712 # and if this stream *also* has EOF, call back
713 if ($self->{'got_eof'}{$name}) {
715 @args = ($exp->[2],); # byte count
718 } elsif ($exp->[0] eq 'bytes') {
719 if (length($buf) >= $exp->[1]) {
721 @args = (substr($buf, 0, $exp->[1]),);
722 $self->{'read_buf'}{$name} = substr($buf, $exp->[1]);
724 last; # done searching, even if we don't call a sub
725 } elsif ($exp->[0] eq 're') {
726 if ($buf =~ $exp->[1]) {
728 @args = ($&,); # matched section of $buf
729 $self->{'read_buf'}{$name} = $'; # remainder of $buf
736 # if there's a callback to make, then remove the expectations *before*
739 delete $self->{'expectations'}{$name};