Imported Upstream version 3.2.0
[debian/amanda] / installcheck / Installcheck / ClientService.pm
1 # Copyright (c) 2010 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License version 2 as published
5 # by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
10 # for more details.
11 #
12 # You should have received a copy of the GNU General Public License along
13 # with this program; if not, write to the Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
15 #
16 # Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
18
19 package Installcheck::ClientService;
20
21 =head1 NAME
22
23 Installcheck::ClientService - a harness for testing client services like
24 sendbackup or selfcheck.
25
26 =head1 SYNOPSIS
27
28     use Installcheck::ClientService;
29
30     # fire up a fake amandad
31     my $service;
32     my $process_done = sub {
33         my ($wait_status) = @_;
34         Amanda::MainLoop::quit();
35     };
36     $service = Installcheck::ClientService->new(
37                             service => 'amindexd',
38                             emulate => 'amandad',
39                             auth => 'bsdtcp',
40                             auth_peer => 'localhost',
41                             process_done => $process_done);
42     # or
43     $service = Installcheck::ClientService->new(
44                             service => 'amindexd',
45                             emulate => 'inetd',
46                             args => [ @args ],
47                             process_done => $process_done);
48     $service->expect('main',
49         [ re => qr/^CONNECT (\d+)\n/, $handle_connect ],
50         [ re => qr/^ERROR (.*)\r\n/, $handle_error ]);
51     $service->expect('stream1',
52         [ eof => $handle_eof ]);
53     $service->expect('stream2',
54         [ header => $handle_header ]);
55     $service->expect('stream3',
56         [ data => $handle_data ]);
57     Amanda::MainLoop::run();
58
59 =head1 DESCRIPTION
60
61 The C<Installcheck::ClientService> class re-implements the service-facing side
62 of amandad and inetd.  It strips away all of the service-specific hacks and the
63 security API portions.  It handles multiple, simultaneous, named, bidirectional
64 data streams with an expect-like interface.
65
66 When emulating amandad, the service is run with the usual high-numbered file
67 descriptors pre-piped, and with 'amandad' in C<argv[1]> and the C<auth>
68 parameter (which defaults to 'bsdtcp') in C<argv[2]>.  The service's stdout and
69 stdin are connected to the 'main' stream, and stderr is available as 'stderr'.
70 The three bidirectional streams on the high-numbered pipes are available as
71 'stream1', 'stream2', and 'stream3'.  You should send a request packet on the
72 'main' stream and close it for writing, and read the reply from 'main'.  Note
73 that you should omit the 'SERVICE' line of the request, as it is ordinarily
74 consumed by amandad itself.
75
76 When emulating inetd, the service is run with a TCP socket attached to its
77 stdin and stdout, and 'installcheck' in C<argv[1]>.  Additional arguments can
78 be provided in the C<args> parameter.  The TCP socket is available as stream
79 'main'.
80
81 =head2 Constructor
82
83 See the SYNOPSIS for examples.  The constructor's C<service> parameter gives
84 the name of the service to run.  The C<emulate> parameter determines how the
85 service is invoked.  The C<args> and C<auth> parameters are described above.
86 The C<process_done> parameter gives a sub which is called with the service's
87 wait status when the service exits and all of its file descriptors have been
88 drained.  The C<auth_peer> parameter gives the value for
89 C<$AMANDA_AUTHENTICATED_PEER> when emulating amandad.
90
91 =head2 Killing Subprocess
92
93 To kill the subprocess, call
94
95   $service->kill();
96
97 this will send a SIGINT.  Process termination proceeds as normal -
98 C<process_done> will be called.
99
100 =head2 Handling Streams
101
102 Streams have simple strings as names; the standard names are described in the
103 DESCRIPTION section.
104
105 To send data on a stream, use C<send>:
106
107     $service->send('main', 'Hello, service!\n');
108
109 Note that this method does not block until the data is sent.
110
111 To close a stream, use C<close>.  It takes a stream name and direction, and
112 only closes that direction.  For TCP connections, this means half-open
113 connections, while for file descriptors only one of the descriptors is closed.
114
115     $service->close('data', 'w'); # close for reading
116     $service->close('data', 'r'); # close for writing
117     $service->close('data', 'b'); # close for both
118
119 When emulating inetd, the C<connect> method can open a new connection to the
120 service, given a port number and a name for the new stream:
121
122     $service->connect('index', $idx_port);
123
124 =head2 Handling Incoming Data
125
126 The read side of each stream has a set of I<expectations>: expected events and
127 subs to call when those events occur.  Each expectation comes in the form of an
128 arrayref, and starts with a string indicating its type.  The simplest is a
129 regular expression:
130
131     [ re => qr/^200 OK/, $got_ok ]
132
133 In this case the C<$got_ok> sub will be called with the matched text.  An
134 expected EOF is written
135
136     [ eof => $got_eof ]
137
138 To capture a stream of data, and call C<$got_data> on EOF with the number of
139 bytes consumed, use
140
141     [ bytes_to_eof => $got_eof ]
142
143 To capture a specific amount of data - in this case 32k - and pass it to
144 C<$got_header>, use
145
146     [ bytes => 32768, $got_header ]
147
148 The set of expectations for a stream is set with the C<expect> method.  This
149 method completely replaces any previous expectations.
150
151     $service->expect('data',
152         [ re => qr/^200 OK.*\n/, $got_ok ],
153         [ re => qr/^4\d\d .*\n/, $got_err ]);
154
155 =cut
156
157 use base qw( Exporter );
158 use warnings;
159 use strict;
160 use Amanda::Constants;
161 use Amanda::MainLoop;
162 use Amanda::Paths;
163 use Amanda::Util;
164 use Amanda::Debug qw( debug );
165 use POSIX qw( :fcntl_h );
166 use POSIX;
167 use Data::Dumper;
168 use IO::Handle;
169 use Socket;
170
171 use constant DATA_FD_OFFSET => $Amanda::Constants::DATA_FD_OFFSET;
172 use constant DATA_FD_COUNT => $Amanda::Constants::DATA_FD_COUNT;
173 our @EXPORT_OK = qw(DATA_FD_OFFSET DATA_FD_COUNT);
174 our %EXPORT_TAGS = ( constants => [ @EXPORT_OK ] );
175
176 sub new {
177     my $class = shift;
178     my %params = @_;
179
180     my $self = bless {
181         emulate => $params{'emulate'},
182         service => $params{'service'},
183         process_done => $params{'process_done'},
184         auth => $params{'auth'} || 'bsdtcp',
185         args => $params{'args'} || [],
186         auth_peer => $params{'auth_peer'},
187
188         # all hashes keyed by stream name
189         stream_fds => {},
190         outstanding_writes => {},
191         close_after_write => {},
192
193         read_buf => {},
194         got_eof => {},
195
196         expectations => {},
197     }, $class;
198
199     if ($self->{'emulate'} eq 'amandad') {
200         $self->_start_process_amandad();
201     } elsif ($self->{'emulate'} eq 'inetd') {
202         $self->_start_process_inetd();
203     } else {
204         die "invalid 'emulate' parameter";
205     }
206
207     return $self;
208 }
209
210 sub send {
211     my $self = shift;
212     my ($name, $data) = @_;
213
214     my $fd = $self->{'stream_fds'}{$name}[1];
215     die "stream '$name' is not writable"
216         unless defined $fd and $fd != -1;
217
218     return if $data eq '';
219
220     $self->{'outstanding_writes'}{$name}++;
221     Amanda::MainLoop::async_write(
222         fd => $fd,
223         data => $data,
224         async_write_cb => sub {
225             my ($err, $bytes_written) = @_;
226             die "on stream $name: $err" if $err;
227
228             $self->_log_data(">>", $name, $data);
229
230             $self->{'outstanding_writes'}{$name}--;
231             if ($self->{'close_after_write'}{$name}
232                     and $self->{'outstanding_writes'}{$name} == 0) {
233                 $self->_do_close_write($name);
234             }
235         });
236 }
237
238 sub connect {
239     my $self = shift;
240     my ($name, $port) = @_;
241
242     socket(my $child, PF_INET, SOCK_STREAM, getprotobyname('tcp'))
243         or die "error creating connect socket: $!";
244     connect($child, sockaddr_in($port, inet_aton("127.0.0.1")))
245         or die "error connecting: $!";
246
247     # get our own fd for the socket that Perl won't close for us, and
248     # close the perl socket
249     my $fd = dup(fileno($child));
250     close($child);
251
252     $self->_add_stream($name, $fd, $fd);
253 }
254
255 sub close {
256     my $self = shift;
257     my ($name, $for) = @_;
258
259     die "stream '$name' does not exist"
260         unless exists $self->{'stream_fds'}{$name};
261
262     # translate 'b'oth into 'r'ead and 'w'rite
263     if ($for eq 'b') {
264         $self->close($name, 'r');
265         $self->close($name, 'w');
266         return;
267     }
268
269     if ($for eq 'w') {
270         if ($self->{'outstanding_writes'}{$name}) {
271             # close when the writes are done
272             $self->{'close_after_write'}{$name} = 1;
273         } else {
274             $self->_do_close_write($name);
275         }
276     } else {
277         $self->_do_close_read($name);
278     }
279 }
280
281 sub expect {
282     my $self = shift;
283     my ($name, @expectations) = @_;
284
285     for my $exp (@expectations) {
286         # set up a byte counter for bytes_to_eof
287         if ($exp->[0] eq 'bytes_to_eof') {
288             $exp->[2] = 0;
289         }
290     }
291
292     $self->{'expectations'}{$name} = [ @expectations ];
293
294     $self->_check_expectations($name);
295 }
296
297 sub kill {
298     my $self = shift;
299
300     kill 'INT', $self->{'pid'};
301 }
302
303 # private methods
304
305 sub _start_process_amandad {
306     my $self = shift;
307     my $i;
308
309     my $service = "$amlibexecdir/$self->{service}";
310     die "service '$service' does not exist" unless -x $service;
311
312     # we'll need some pipes:
313     my ($stdin_c, $stdin_p) = POSIX::pipe();
314     my ($stdout_p, $stdout_c) = POSIX::pipe();
315     my ($stderr_p, $stderr_c) = POSIX::pipe();
316     my @data_fdpairs;
317     for ($i = 0; $i < DATA_FD_COUNT; $i++) {
318         my ($in_c, $in_p) = POSIX::pipe();
319         my ($out_p, $out_c) = POSIX::pipe();
320         push @data_fdpairs, [ $in_c, $in_p, $out_p, $out_c ];
321     }
322
323     # fork and execute!
324     $self->{'pid'} = POSIX::fork();
325     die "could not fork: $!" if (!defined $self->{'pid'} || $self->{'pid'} < 0);
326     if ($self->{'pid'} == 0) {
327         # child
328
329         my $fd;
330         my $fdpair;
331
332         # First, close all of the fd's we don't need.
333         POSIX::close($stdin_p);
334         POSIX::close($stdout_p);
335         POSIX::close($stderr_p);
336         for $fdpair (@data_fdpairs) {
337             my ($in_c, $in_p, $out_p, $out_c) = @$fdpair;
338             POSIX::close($in_p);
339             POSIX::close($out_p);
340         }
341
342         # dup our in/out fd's appropriately
343         POSIX::dup2($stdin_c, 0);
344         POSIX::dup2($stdout_c, 1);
345         POSIX::dup2($stderr_c, 2);
346         POSIX::close($stdin_c);
347         POSIX::close($stdout_c);
348         POSIX::close($stderr_c);
349
350         # then make sure everything is greater than the highest
351         # fd we'll need
352         my @fds_to_close;
353         for $fdpair (@data_fdpairs) {
354             my ($in_c, $in_p, $out_p, $out_c) = @$fdpair;
355             while ($in_c < DATA_FD_OFFSET + DATA_FD_COUNT * 2) {
356                 push @fds_to_close, $in_c;
357                 $in_c = POSIX::dup($in_c);
358             }
359             while ($out_c < DATA_FD_OFFSET + DATA_FD_COUNT * 2) {
360                 push @fds_to_close, $out_c;
361                 $out_c = POSIX::dup($out_c);
362             }
363             $fdpair->[0] = $in_c;
364             $fdpair->[3] = $out_c;
365         }
366
367         # close all of the leftovers
368         for $fd (@fds_to_close) {
369             POSIX::close($fd);
370         }
371
372         # and now use dup2 to move everything to its final location (whew!)
373         for ($i = 0; $i < DATA_FD_COUNT; $i++) {
374             my ($in_c, $in_p, $out_p, $out_c) = @{$data_fdpairs[$i]};
375             POSIX::dup2($out_c, DATA_FD_OFFSET + $i*2);
376             POSIX::dup2($in_c, DATA_FD_OFFSET + $i*2 + 1);
377             POSIX::close($out_c);
378             POSIX::close($in_c);
379         }
380
381         delete $ENV{'AMANDA_AUTHENTICATED_PEER'};
382         $ENV{'AMANDA_AUTHENTICATED_PEER'} = $self->{'auth_peer'} if $self->{'auth_peer'};
383
384         # finally, execute!
385         # braces avoid warning
386         { exec { $service } $service, 'amandad', $self->{'auth'}; }
387         my $err = "could not execute $service; $!\n";
388         POSIX::write(2, $err, length($err));
389         exit 2;
390     }
391
392     # parent
393
394     # watch for the child to die
395     Amanda::MainLoop::call_on_child_termination($self->{'pid'},
396             sub { $self->_process_done(@_); });
397
398     # close all of the fd's we don't need, and make notes of the fd's
399     # we want to keep around
400
401     POSIX::close($stdin_c);
402     POSIX::close($stdout_c);
403     $self->_add_stream('main', $stdout_p, $stdin_p);
404
405     POSIX::close($stderr_c);
406     $self->_add_stream('stderr', $stderr_p, -1);
407
408     for ($i = 0; $i < DATA_FD_COUNT; $i++) {
409         my ($in_c, $in_p, $out_p, $out_c) = @{$data_fdpairs[$i]};
410         POSIX::close($in_c);
411         POSIX::close($out_c);
412
413         $self->_add_stream('stream'.($i+1), $out_p, $in_p);
414     }
415 }
416
417 sub _start_process_inetd {
418     my $self = shift;
419     my $i;
420
421     # figure out the service
422     my $service = "$amlibexecdir/$self->{service}";
423     die "service '$service' does not exist" unless -x $service;
424
425     # set up and bind a listening socket on localhost
426     socket(SERVER, PF_INET, SOCK_STREAM, getprotobyname('tcp'))
427         or die "creating socket: $!";
428     bind(SERVER, sockaddr_in(0, inet_aton("127.0.0.1")))
429         or die "binding socket: $!";
430     listen(SERVER, 1);
431     my ($port, $addr) = sockaddr_in(getsockname(SERVER));
432
433     # fork and execute!
434     $self->{'pid'} = POSIX::fork();
435     die "could not fork: $!" if ($self->{'pid'} < 0);
436     if ($self->{'pid'} == 0) {
437         # child
438
439         # send stderr to debug
440         Amanda::Debug::debug_dup_stderr_to_debug();
441
442         # wait for a connection on the socket, waiting a long time
443         # but not forever..
444         alarm 60*60*24; # one day
445         my $paddr = accept(CLIENT, SERVER);
446         CORE::close(SERVER);
447         alarm 0;
448
449         # dup that into stdio
450         POSIX::dup2(fileno(CLIENT), 0);
451         POSIX::dup2(fileno(CLIENT), 1);
452         CORE::close(CLIENT);
453
454         # finally, execute!
455         # braces avoid warning
456         { exec { $service } $service, 'installcheck', @{$self->{'args'}}; }
457         my $err = "could not execute $service; $!\n";
458         POSIX::write(2, $err, length($err));
459         exit 2;
460     }
461
462     # parent
463
464     # watch for the child to die
465     Amanda::MainLoop::call_on_child_termination($self->{'pid'},
466             sub { $self->_process_done(@_); });
467
468     # close the server socket
469     CORE::close(SERVER);
470
471     # connect to the child
472     $self->connect('main', $port);
473 }
474
475 sub _add_stream {
476     my $self = shift;
477     my ($name, $rfd, $wfd) = @_;
478
479     if (exists $self->{'stream_fds'}{$name}) {
480         die "stream $name already exists";
481     }
482
483     $self->{'stream_fds'}{$name} = [ $rfd, $wfd ];
484     $self->{'read_sources'}{$name} = undef;
485     $self->{'outstanding_writes'}{$name} = 0;
486     $self->{'close_after_write'}{$name} = 0;
487
488     # start an async read on every read_fd we set up, after making it not-blocking
489     if ($rfd != -1) {
490         my $async_read_cb;
491
492         Amanda::Util::set_blocking($rfd, 0);
493         $self->{'read_buf'}{$name} = '';
494         $self->{'got_eof'}{$name} = 0;
495
496         $async_read_cb = sub {
497             my ($err, $data) = @_;
498             die "on stream $name: $err" if $err;
499
500             # log it
501             $self->_log_data("<<", $name, $data);
502
503             # prep for next time
504             if ($data) {
505                 $self->{'read_sources'}{$name} =
506                     Amanda::MainLoop::async_read(
507                         fd => $rfd,
508                         async_read_cb => $async_read_cb);
509             } else {
510                 delete $self->{'read_sources'}{$name};
511                 $self->_do_close_read($name);
512             }
513
514             # add the data to the buffer, or signal EOF
515             if ($data) {
516                 $self->{'read_buf'}{$name} .= $data;
517             } else {
518                 $self->{'got_eof'}{$name} = 1;
519             }
520
521             # and call the user function
522             $self->_check_expectations($name);
523         };
524
525         $self->{'read_sources'}{$name} =
526             Amanda::MainLoop::async_read(
527                 fd => $rfd,
528                 async_read_cb => $async_read_cb);
529     }
530
531     # set all the write_fd's to non-blocking too.
532     if ($wfd != -1) {
533         Amanda::Util::set_blocking($wfd, 0);
534     }
535 }
536
537 sub _do_close_read {
538     my $self = shift;
539     my ($name) = @_;
540
541     my $fds = $self->{'stream_fds'}{$name};
542
543     if ($fds->[0] == -1) {
544         die "$name is already closed for reading";
545     }
546
547     debug("XX closing $name for reading");
548
549     # remove any ongoing reads
550     if ($self->{'read_sources'}{$name}) {
551         $self->{'read_sources'}{$name}->remove();
552         delete $self->{'read_sources'}{$name};
553     }
554
555     # if both fd's are the same, then this is probably a socket, so shut down
556     # the read side
557     if ($fds->[0] == $fds->[1]) {
558         # perl doesn't provide a fd-compatible shutdown, but luckily shudown
559         # affects dup'd file descriptors, too!  So create a new handle and shut
560         # it down.  When the handle is garbage collected, it will be closed,
561         # but that will not affect the original.  This will look strange in an
562         # strace, but it works without SWIGging shutdown()
563         shutdown(IO::Handle->new_from_fd(POSIX::dup($fds->[0]), "r"), 0);
564     } else {
565         POSIX::close($fds->[0]);
566     }
567     $fds->[0] = -1;
568
569     if ($fds->[1] == -1) {
570         delete $self->{'stream_fds'}{$name};
571     }
572 }
573
574 sub _do_close_write {
575     my $self = shift;
576     my ($name, $for) = @_;
577
578     my $fds = $self->{'stream_fds'}{$name};
579
580     if ($fds->[1] == -1) {
581         die "$name is already closed for writing";
582     }
583
584     debug("XX closing $name for writing");
585
586     # if both fd's are the same, then this is probably a socket, so shut down
587     # the write side
588     if ($fds->[1] == $fds->[0]) {
589         # (see above)
590         shutdown(IO::Handle->new_from_fd(POSIX::dup($fds->[1]), "w"), 1);
591     } else {
592         POSIX::close($fds->[1]);
593     }
594     $fds->[1] = -1;
595
596     if ($fds->[0] == -1) {
597         delete $self->{'stream_fds'}{$name};
598     }
599     delete $self->{'outstanding_writes'}{$name};
600     delete $self->{'close_after_write'}{$name};
601 }
602
603 sub _process_done {
604     my $self = shift;
605     my ($exitstatus) = @_;
606
607     debug("service exit: $exitstatus");
608
609     # delay this to the next trip around the MainLoop, in case data is available
610     # on any fd's
611     Amanda::MainLoop::call_later(\&_do_process_done, $self, $exitstatus);
612 }
613
614 sub _do_process_done {
615     my $self = shift;
616     my ($exitstatus) = @_;
617
618     $self->{'process_done_loops'} = ($self->{'process_done_loops'} || 0) + 1;
619
620     # defer with call_after if there are still read fd's open or data in a read
621     # buffer.  Since the process just died, presumably these will close in this
622     # trip around the MainLoop, so this will be a very short busywait.  The upper
623     # bound on the wait is 1 second.
624     if ($self->{'process_done_loops'} < 100) {
625         my $still_busy = 0;
626         for my $name (keys %{$self->{'stream_fds'}}) {
627             my $fds = $self->{'stream_fds'}{$name};
628             # if we're still expecting something on this stream..
629             if ($self->{'expectations'}{$name}) {
630                 $still_busy = 1;
631             }
632             # or the stream's not closed yet..
633             if ($fds->[0] != -1) {
634                 $still_busy = 1;
635             }
636         }
637         if ($still_busy) {
638             return Amanda::MainLoop::call_after(10, \&_do_process_done, $self, $exitstatus);
639         }
640     }
641
642     # close all of the write_fd's.  If there are pending writes, they
643     # were going to get a SIGPIPE anyway.
644     for my $name (keys %{$self->{'stream_fds'}}) {
645         my $fds = $self->{'stream_fds'}{$name};
646         if ($fds->[1] != -1) {
647             $self->_do_close_write($name);
648         }
649     }
650
651     $self->{'process_done'}->($exitstatus);
652 }
653
654 sub _log_data {
655     my $self = shift;
656     my ($dir, $name, $data) = @_;
657
658     if ($data) {
659         if (length($data) < 300) {
660             my $printable = $data;
661             $printable =~ s/[^\r\n[:print:]]+//g;
662             $printable =~ s/\n/\\n/g;
663             $printable =~ s/\r/\\r/g;
664             debug("$dir$name: [$printable]");
665         } else {
666             debug(sprintf("$dir$name: %d bytes", length($data)));
667         }
668     } else {
669         debug("$dir$name: EOF");
670     }
671 }
672
673 sub _check_expectations {
674     my $self = shift;
675     my ($name) = @_;
676
677     my $expectations = $self->{'expectations'}{$name};
678     return unless defined $expectations and @$expectations;
679
680     my $cb = undef;
681     my @args = undef;
682     # if we got EOF and have no more pending data, look for a matching
683     # expectation
684     if ($self->{'got_eof'}{$name} and !$self->{'read_buf'}{$name}) {
685         for my $exp (@$expectations) {
686             if ($exp->[0] eq 'eof') {
687                 $cb = $exp->[1];
688                 @args = ();
689                 last;
690             } elsif ($exp->[0] eq 'bytes_to_eof') {
691                 $cb = $exp->[1];
692                 @args = ($exp->[2],); # byte count
693                 last;
694             }
695         }
696
697         if (!$cb) {
698             debug("Expected on $name: " . Dumper($expectations));
699             die "Unexpected EOF on $name";
700         }
701     } elsif ($self->{'read_buf'}{$name}) {
702         my $buf = $self->{'read_buf'}{$name};
703
704         for my $exp (@$expectations) {
705             if ($exp->[0] eq 'eof') {
706                 die "Expected EOF but got data on $name";
707             } elsif ($exp->[0] eq 'bytes_to_eof') {
708                 # store the ongoing byte count in the expectation itself
709                 $exp->[2] = ($exp->[2] || 0) + length($buf);
710                 $self->{'read_buf'}{$name} = '';
711                 # and if this stream *also* has EOF, call back
712                 if ($self->{'got_eof'}{$name}) {
713                     $cb = $exp->[1];
714                     @args = ($exp->[2],); # byte count
715                 }
716                 last;
717             } elsif ($exp->[0] eq 'bytes') {
718                 if (length($buf) >= $exp->[1]) {
719                     $cb = $exp->[2];
720                     @args = (substr($buf, 0, $exp->[1]),);
721                     $self->{'read_buf'}{$name} = substr($buf, $exp->[1]);
722                 }
723                 last; # done searching, even if we don't call a sub
724             } elsif ($exp->[0] eq 're') {
725                 if ($buf =~ $exp->[1]) {
726                     $cb = $exp->[2];
727                     @args = ($&,); # matched section of $buf
728                     $self->{'read_buf'}{$name} = $'; # remainder of $buf
729                     last;
730                 }
731             }
732         }
733     }
734
735     # if there's a callback to make, then remove the expectations *before*
736     # calling it
737     if ($cb) {
738         delete $self->{'expectations'}{$name};
739         $cb->(@args);
740     }
741 }
742
743 1;