Imported Upstream version 3.3.3
[debian/amanda] / installcheck / Installcheck / ClientService.pm
1 # Copyright (c) 2010-2012 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11 # for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 #
17 # Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
18 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
19
20 package Installcheck::ClientService;
21
22 =head1 NAME
23
24 Installcheck::ClientService - a harness for testing client services like
25 sendbackup or selfcheck.
26
27 =head1 SYNOPSIS
28
29     use Installcheck::ClientService;
30
31     # fire up a fake amandad
32     my $service;
33     my $process_done = sub {
34         my ($wait_status) = @_;
35         Amanda::MainLoop::quit();
36     };
37     $service = Installcheck::ClientService->new(
38                             service => 'amindexd',
39                             emulate => 'amandad',
40                             auth => 'bsdtcp',
41                             auth_peer => 'localhost',
42                             process_done => $process_done);
43     # or
44     $service = Installcheck::ClientService->new(
45                             service => 'amindexd',
46                             emulate => 'inetd',
47                             args => [ @args ],
48                             process_done => $process_done);
49     $service->expect('main',
50         [ re => qr/^CONNECT (\d+)\n/, $handle_connect ],
51         [ re => qr/^ERROR (.*)\r\n/, $handle_error ]);
52     $service->expect('stream1',
53         [ eof => $handle_eof ]);
54     $service->expect('stream2',
55         [ header => $handle_header ]);
56     $service->expect('stream3',
57         [ data => $handle_data ]);
58     Amanda::MainLoop::run();
59
60 =head1 DESCRIPTION
61
62 The C<Installcheck::ClientService> class re-implements the service-facing side
63 of amandad and inetd.  It strips away all of the service-specific hacks and the
64 security API portions.  It handles multiple, simultaneous, named, bidirectional
65 data streams with an expect-like interface.
66
67 When emulating amandad, the service is run with the usual high-numbered file
68 descriptors pre-piped, and with 'amandad' in C<argv[1]> and the C<auth>
69 parameter (which defaults to 'bsdtcp') in C<argv[2]>.  The service's stdout and
70 stdin are connected to the 'main' stream, and stderr is available as 'stderr'.
71 The three bidirectional streams on the high-numbered pipes are available as
72 'stream1', 'stream2', and 'stream3'.  You should send a request packet on the
73 'main' stream and close it for writing, and read the reply from 'main'.  Note
74 that you should omit the 'SERVICE' line of the request, as it is ordinarily
75 consumed by amandad itself.
76
77 When emulating inetd, the service is run with a TCP socket attached to its
78 stdin and stdout, and 'installcheck' in C<argv[1]>.  Additional arguments can
79 be provided in the C<args> parameter.  The TCP socket is available as stream
80 'main'.
81
82 =head2 Constructor
83
84 See the SYNOPSIS for examples.  The constructor's C<service> parameter gives
85 the name of the service to run.  The C<emulate> parameter determines how the
86 service is invoked.  The C<args> and C<auth> parameters are described above.
87 The C<process_done> parameter gives a sub which is called with the service's
88 wait status when the service exits and all of its file descriptors have been
89 drained.  The C<auth_peer> parameter gives the value for
90 C<$AMANDA_AUTHENTICATED_PEER> when emulating amandad.
91
92 =head2 Killing Subprocess
93
94 To kill the subprocess, call
95
96   $service->kill();
97
98 this will send a SIGINT.  Process termination proceeds as normal -
99 C<process_done> will be called.
100
101 =head2 Handling Streams
102
103 Streams have simple strings as names; the standard names are described in the
104 DESCRIPTION section.
105
106 To send data on a stream, use C<send>:
107
108     $service->send('main', 'Hello, service!\n');
109
110 Note that this method does not block until the data is sent.
111
112 To close a stream, use C<close>.  It takes a stream name and direction, and
113 only closes that direction.  For TCP connections, this means half-open
114 connections, while for file descriptors only one of the descriptors is closed.
115
116     $service->close('data', 'w'); # close for reading
117     $service->close('data', 'r'); # close for writing
118     $service->close('data', 'b'); # close for both
119
120 When emulating inetd, the C<connect> method can open a new connection to the
121 service, given a port number and a name for the new stream:
122
123     $service->connect('index', $idx_port);
124
125 =head2 Handling Incoming Data
126
127 The read side of each stream has a set of I<expectations>: expected events and
128 subs to call when those events occur.  Each expectation comes in the form of an
129 arrayref, and starts with a string indicating its type.  The simplest is a
130 regular expression:
131
132     [ re => qr/^200 OK/, $got_ok ]
133
134 In this case the C<$got_ok> sub will be called with the matched text.  An
135 expected EOF is written
136
137     [ eof => $got_eof ]
138
139 To capture a stream of data, and call C<$got_data> on EOF with the number of
140 bytes consumed, use
141
142     [ bytes_to_eof => $got_eof ]
143
144 To capture a specific amount of data - in this case 32k - and pass it to
145 C<$got_header>, use
146
147     [ bytes => 32768, $got_header ]
148
149 The set of expectations for a stream is set with the C<expect> method.  This
150 method completely replaces any previous expectations.
151
152     $service->expect('data',
153         [ re => qr/^200 OK.*\n/, $got_ok ],
154         [ re => qr/^4\d\d .*\n/, $got_err ]);
155
156 =cut
157
158 use base qw( Exporter );
159 use warnings;
160 use strict;
161 use Amanda::Constants;
162 use Amanda::MainLoop;
163 use Amanda::Paths;
164 use Amanda::Util;
165 use Amanda::Debug qw( debug );
166 use POSIX qw( :fcntl_h );
167 use POSIX;
168 use Data::Dumper;
169 use IO::Handle;
170 use Socket;
171
172 use constant DATA_FD_OFFSET => $Amanda::Constants::DATA_FD_OFFSET;
173 use constant DATA_FD_COUNT => $Amanda::Constants::DATA_FD_COUNT;
174 our @EXPORT_OK = qw(DATA_FD_OFFSET DATA_FD_COUNT);
175 our %EXPORT_TAGS = ( constants => [ @EXPORT_OK ] );
176
177 sub new {
178     my $class = shift;
179     my %params = @_;
180
181     my $self = bless {
182         emulate => $params{'emulate'},
183         service => $params{'service'},
184         process_done => $params{'process_done'},
185         auth => $params{'auth'} || 'bsdtcp',
186         args => $params{'args'} || [],
187         auth_peer => $params{'auth_peer'},
188
189         # all hashes keyed by stream name
190         stream_fds => {},
191         outstanding_writes => {},
192         close_after_write => {},
193
194         read_buf => {},
195         got_eof => {},
196
197         expectations => {},
198     }, $class;
199
200     if ($self->{'emulate'} eq 'amandad') {
201         $self->_start_process_amandad();
202     } elsif ($self->{'emulate'} eq 'inetd') {
203         $self->_start_process_inetd();
204     } else {
205         die "invalid 'emulate' parameter";
206     }
207
208     return $self;
209 }
210
211 sub send {
212     my $self = shift;
213     my ($name, $data) = @_;
214
215     my $fd = $self->{'stream_fds'}{$name}[1];
216     die "stream '$name' is not writable"
217         unless defined $fd and $fd != -1;
218
219     return if $data eq '';
220
221     $self->{'outstanding_writes'}{$name}++;
222     Amanda::MainLoop::async_write(
223         fd => $fd,
224         data => $data,
225         async_write_cb => sub {
226             my ($err, $bytes_written) = @_;
227             die "on stream $name: $err" if $err;
228
229             $self->_log_data(">>", $name, $data);
230
231             $self->{'outstanding_writes'}{$name}--;
232             if ($self->{'close_after_write'}{$name}
233                     and $self->{'outstanding_writes'}{$name} == 0) {
234                 $self->_do_close_write($name);
235             }
236         });
237 }
238
239 sub connect {
240     my $self = shift;
241     my ($name, $port) = @_;
242
243     socket(my $child, PF_INET, SOCK_STREAM, getprotobyname('tcp'))
244         or die "error creating connect socket: $!";
245     connect($child, sockaddr_in($port, inet_aton("127.0.0.1")))
246         or die "error connecting: $!";
247
248     # get our own fd for the socket that Perl won't close for us, and
249     # close the perl socket
250     my $fd = dup(fileno($child));
251     close($child);
252
253     $self->_add_stream($name, $fd, $fd);
254 }
255
256 sub close {
257     my $self = shift;
258     my ($name, $for) = @_;
259
260     die "stream '$name' does not exist"
261         unless exists $self->{'stream_fds'}{$name};
262
263     # translate 'b'oth into 'r'ead and 'w'rite
264     if ($for eq 'b') {
265         $self->close($name, 'r');
266         $self->close($name, 'w');
267         return;
268     }
269
270     if ($for eq 'w') {
271         if ($self->{'outstanding_writes'}{$name}) {
272             # close when the writes are done
273             $self->{'close_after_write'}{$name} = 1;
274         } else {
275             $self->_do_close_write($name);
276         }
277     } else {
278         $self->_do_close_read($name);
279     }
280 }
281
282 sub expect {
283     my $self = shift;
284     my ($name, @expectations) = @_;
285
286     for my $exp (@expectations) {
287         # set up a byte counter for bytes_to_eof
288         if ($exp->[0] eq 'bytes_to_eof') {
289             $exp->[2] = 0;
290         }
291     }
292
293     $self->{'expectations'}{$name} = [ @expectations ];
294
295     $self->_check_expectations($name);
296 }
297
298 sub kill {
299     my $self = shift;
300
301     kill 'INT', $self->{'pid'};
302 }
303
304 # private methods
305
306 sub _start_process_amandad {
307     my $self = shift;
308     my $i;
309
310     my $service = "$amlibexecdir/$self->{service}";
311     die "service '$service' does not exist" unless -x $service;
312
313     # we'll need some pipes:
314     my ($stdin_c, $stdin_p) = POSIX::pipe();
315     my ($stdout_p, $stdout_c) = POSIX::pipe();
316     my ($stderr_p, $stderr_c) = POSIX::pipe();
317     my @data_fdpairs;
318     for ($i = 0; $i < DATA_FD_COUNT; $i++) {
319         my ($in_c, $in_p) = POSIX::pipe();
320         my ($out_p, $out_c) = POSIX::pipe();
321         push @data_fdpairs, [ $in_c, $in_p, $out_p, $out_c ];
322     }
323
324     # fork and execute!
325     $self->{'pid'} = POSIX::fork();
326     die "could not fork: $!" if (!defined $self->{'pid'} || $self->{'pid'} < 0);
327     if ($self->{'pid'} == 0) {
328         # child
329
330         my $fd;
331         my $fdpair;
332
333         # First, close all of the fd's we don't need.
334         POSIX::close($stdin_p);
335         POSIX::close($stdout_p);
336         POSIX::close($stderr_p);
337         for $fdpair (@data_fdpairs) {
338             my ($in_c, $in_p, $out_p, $out_c) = @$fdpair;
339             POSIX::close($in_p);
340             POSIX::close($out_p);
341         }
342
343         # dup our in/out fd's appropriately
344         POSIX::dup2($stdin_c, 0);
345         POSIX::dup2($stdout_c, 1);
346         POSIX::dup2($stderr_c, 2);
347         POSIX::close($stdin_c);
348         POSIX::close($stdout_c);
349         POSIX::close($stderr_c);
350
351         # then make sure everything is greater than the highest
352         # fd we'll need
353         my @fds_to_close;
354         for $fdpair (@data_fdpairs) {
355             my ($in_c, $in_p, $out_p, $out_c) = @$fdpair;
356             while ($in_c < DATA_FD_OFFSET + DATA_FD_COUNT * 2) {
357                 push @fds_to_close, $in_c;
358                 $in_c = POSIX::dup($in_c);
359             }
360             while ($out_c < DATA_FD_OFFSET + DATA_FD_COUNT * 2) {
361                 push @fds_to_close, $out_c;
362                 $out_c = POSIX::dup($out_c);
363             }
364             $fdpair->[0] = $in_c;
365             $fdpair->[3] = $out_c;
366         }
367
368         # close all of the leftovers
369         for $fd (@fds_to_close) {
370             POSIX::close($fd);
371         }
372
373         # and now use dup2 to move everything to its final location (whew!)
374         for ($i = 0; $i < DATA_FD_COUNT; $i++) {
375             my ($in_c, $in_p, $out_p, $out_c) = @{$data_fdpairs[$i]};
376             POSIX::dup2($out_c, DATA_FD_OFFSET + $i*2);
377             POSIX::dup2($in_c, DATA_FD_OFFSET + $i*2 + 1);
378             POSIX::close($out_c);
379             POSIX::close($in_c);
380         }
381
382         delete $ENV{'AMANDA_AUTHENTICATED_PEER'};
383         $ENV{'AMANDA_AUTHENTICATED_PEER'} = $self->{'auth_peer'} if $self->{'auth_peer'};
384
385         # finally, execute!
386         # braces avoid warning
387         { exec { $service } $service, 'amandad', $self->{'auth'}; }
388         my $err = "could not execute $service; $!\n";
389         POSIX::write(2, $err, length($err));
390         exit 2;
391     }
392
393     # parent
394
395     # watch for the child to die
396     Amanda::MainLoop::call_on_child_termination($self->{'pid'},
397             sub { $self->_process_done(@_); });
398
399     # close all of the fd's we don't need, and make notes of the fd's
400     # we want to keep around
401
402     POSIX::close($stdin_c);
403     POSIX::close($stdout_c);
404     $self->_add_stream('main', $stdout_p, $stdin_p);
405
406     POSIX::close($stderr_c);
407     $self->_add_stream('stderr', $stderr_p, -1);
408
409     for ($i = 0; $i < DATA_FD_COUNT; $i++) {
410         my ($in_c, $in_p, $out_p, $out_c) = @{$data_fdpairs[$i]};
411         POSIX::close($in_c);
412         POSIX::close($out_c);
413
414         $self->_add_stream('stream'.($i+1), $out_p, $in_p);
415     }
416 }
417
418 sub _start_process_inetd {
419     my $self = shift;
420     my $i;
421
422     # figure out the service
423     my $service = "$amlibexecdir/$self->{service}";
424     die "service '$service' does not exist" unless -x $service;
425
426     # set up and bind a listening socket on localhost
427     socket(SERVER, PF_INET, SOCK_STREAM, getprotobyname('tcp'))
428         or die "creating socket: $!";
429     bind(SERVER, sockaddr_in(0, inet_aton("127.0.0.1")))
430         or die "binding socket: $!";
431     listen(SERVER, 1);
432     my ($port, $addr) = sockaddr_in(getsockname(SERVER));
433
434     # fork and execute!
435     $self->{'pid'} = POSIX::fork();
436     die "could not fork: $!" if ($self->{'pid'} < 0);
437     if ($self->{'pid'} == 0) {
438         # child
439
440         # send stderr to debug
441         Amanda::Debug::debug_dup_stderr_to_debug();
442
443         # wait for a connection on the socket, waiting a long time
444         # but not forever..
445         alarm 60*60*24; # one day
446         my $paddr = accept(CLIENT, SERVER);
447         CORE::close(SERVER);
448         alarm 0;
449
450         # dup that into stdio
451         POSIX::dup2(fileno(CLIENT), 0);
452         POSIX::dup2(fileno(CLIENT), 1);
453         CORE::close(CLIENT);
454
455         # finally, execute!
456         # braces avoid warning
457         { exec { $service } $service, 'installcheck', @{$self->{'args'}}; }
458         my $err = "could not execute $service; $!\n";
459         POSIX::write(2, $err, length($err));
460         exit 2;
461     }
462
463     # parent
464
465     # watch for the child to die
466     Amanda::MainLoop::call_on_child_termination($self->{'pid'},
467             sub { $self->_process_done(@_); });
468
469     # close the server socket
470     CORE::close(SERVER);
471
472     # connect to the child
473     $self->connect('main', $port);
474 }
475
476 sub _add_stream {
477     my $self = shift;
478     my ($name, $rfd, $wfd) = @_;
479
480     if (exists $self->{'stream_fds'}{$name}) {
481         die "stream $name already exists";
482     }
483
484     $self->{'stream_fds'}{$name} = [ $rfd, $wfd ];
485     $self->{'read_sources'}{$name} = undef;
486     $self->{'outstanding_writes'}{$name} = 0;
487     $self->{'close_after_write'}{$name} = 0;
488
489     # start an async read on every read_fd we set up, after making it not-blocking
490     if ($rfd != -1) {
491         my $async_read_cb;
492
493         Amanda::Util::set_blocking($rfd, 0);
494         $self->{'read_buf'}{$name} = '';
495         $self->{'got_eof'}{$name} = 0;
496
497         $async_read_cb = sub {
498             my ($err, $data) = @_;
499             die "on stream $name: $err" if $err;
500
501             # log it
502             $self->_log_data("<<", $name, $data);
503
504             # prep for next time
505             if ($data) {
506                 $self->{'read_sources'}{$name} =
507                     Amanda::MainLoop::async_read(
508                         fd => $rfd,
509                         async_read_cb => $async_read_cb);
510             } else {
511                 delete $self->{'read_sources'}{$name};
512                 $self->_do_close_read($name);
513             }
514
515             # add the data to the buffer, or signal EOF
516             if ($data) {
517                 $self->{'read_buf'}{$name} .= $data;
518             } else {
519                 $self->{'got_eof'}{$name} = 1;
520             }
521
522             # and call the user function
523             $self->_check_expectations($name);
524         };
525
526         $self->{'read_sources'}{$name} =
527             Amanda::MainLoop::async_read(
528                 fd => $rfd,
529                 async_read_cb => $async_read_cb);
530     }
531
532     # set all the write_fd's to non-blocking too.
533     if ($wfd != -1) {
534         Amanda::Util::set_blocking($wfd, 0);
535     }
536 }
537
538 sub _do_close_read {
539     my $self = shift;
540     my ($name) = @_;
541
542     my $fds = $self->{'stream_fds'}{$name};
543
544     if ($fds->[0] == -1) {
545         die "$name is already closed for reading";
546     }
547
548     debug("XX closing $name for reading");
549
550     # remove any ongoing reads
551     if ($self->{'read_sources'}{$name}) {
552         $self->{'read_sources'}{$name}->remove();
553         delete $self->{'read_sources'}{$name};
554     }
555
556     # if both fd's are the same, then this is probably a socket, so shut down
557     # the read side
558     if ($fds->[0] == $fds->[1]) {
559         # perl doesn't provide a fd-compatible shutdown, but luckily shudown
560         # affects dup'd file descriptors, too!  So create a new handle and shut
561         # it down.  When the handle is garbage collected, it will be closed,
562         # but that will not affect the original.  This will look strange in an
563         # strace, but it works without SWIGging shutdown()
564         shutdown(IO::Handle->new_from_fd(POSIX::dup($fds->[0]), "r"), 0);
565     } else {
566         POSIX::close($fds->[0]);
567     }
568     $fds->[0] = -1;
569
570     if ($fds->[1] == -1) {
571         delete $self->{'stream_fds'}{$name};
572     }
573 }
574
575 sub _do_close_write {
576     my $self = shift;
577     my ($name, $for) = @_;
578
579     my $fds = $self->{'stream_fds'}{$name};
580
581     if ($fds->[1] == -1) {
582         die "$name is already closed for writing";
583     }
584
585     debug("XX closing $name for writing");
586
587     # if both fd's are the same, then this is probably a socket, so shut down
588     # the write side
589     if ($fds->[1] == $fds->[0]) {
590         # (see above)
591         shutdown(IO::Handle->new_from_fd(POSIX::dup($fds->[1]), "w"), 1);
592     } else {
593         POSIX::close($fds->[1]);
594     }
595     $fds->[1] = -1;
596
597     if ($fds->[0] == -1) {
598         delete $self->{'stream_fds'}{$name};
599     }
600     delete $self->{'outstanding_writes'}{$name};
601     delete $self->{'close_after_write'}{$name};
602 }
603
604 sub _process_done {
605     my $self = shift;
606     my ($exitstatus) = @_;
607
608     debug("service exit: $exitstatus");
609
610     # delay this to the next trip around the MainLoop, in case data is available
611     # on any fd's
612     Amanda::MainLoop::call_later(\&_do_process_done, $self, $exitstatus);
613 }
614
615 sub _do_process_done {
616     my $self = shift;
617     my ($exitstatus) = @_;
618
619     $self->{'process_done_loops'} = ($self->{'process_done_loops'} || 0) + 1;
620
621     # defer with call_after if there are still read fd's open or data in a read
622     # buffer.  Since the process just died, presumably these will close in this
623     # trip around the MainLoop, so this will be a very short busywait.  The upper
624     # bound on the wait is 1 second.
625     if ($self->{'process_done_loops'} < 100) {
626         my $still_busy = 0;
627         for my $name (keys %{$self->{'stream_fds'}}) {
628             my $fds = $self->{'stream_fds'}{$name};
629             # if we're still expecting something on this stream..
630             if ($self->{'expectations'}{$name}) {
631                 $still_busy = 1;
632             }
633             # or the stream's not closed yet..
634             if ($fds->[0] != -1) {
635                 $still_busy = 1;
636             }
637         }
638         if ($still_busy) {
639             return Amanda::MainLoop::call_after(10, \&_do_process_done, $self, $exitstatus);
640         }
641     }
642
643     # close all of the write_fd's.  If there are pending writes, they
644     # were going to get a SIGPIPE anyway.
645     for my $name (keys %{$self->{'stream_fds'}}) {
646         my $fds = $self->{'stream_fds'}{$name};
647         if ($fds->[1] != -1) {
648             $self->_do_close_write($name);
649         }
650     }
651
652     $self->{'process_done'}->($exitstatus);
653 }
654
655 sub _log_data {
656     my $self = shift;
657     my ($dir, $name, $data) = @_;
658
659     if ($data) {
660         if (length($data) < 300) {
661             my $printable = $data;
662             $printable =~ s/[^\r\n[:print:]]+//g;
663             $printable =~ s/\n/\\n/g;
664             $printable =~ s/\r/\\r/g;
665             debug("$dir$name: [$printable]");
666         } else {
667             debug(sprintf("$dir$name: %d bytes", length($data)));
668         }
669     } else {
670         debug("$dir$name: EOF");
671     }
672 }
673
674 sub _check_expectations {
675     my $self = shift;
676     my ($name) = @_;
677
678     my $expectations = $self->{'expectations'}{$name};
679     return unless defined $expectations and @$expectations;
680
681     my $cb = undef;
682     my @args = undef;
683     # if we got EOF and have no more pending data, look for a matching
684     # expectation
685     if ($self->{'got_eof'}{$name} and !$self->{'read_buf'}{$name}) {
686         for my $exp (@$expectations) {
687             if ($exp->[0] eq 'eof') {
688                 $cb = $exp->[1];
689                 @args = ();
690                 last;
691             } elsif ($exp->[0] eq 'bytes_to_eof') {
692                 $cb = $exp->[1];
693                 @args = ($exp->[2],); # byte count
694                 last;
695             }
696         }
697
698         if (!$cb) {
699             debug("Expected on $name: " . Dumper($expectations));
700             die "Unexpected EOF on $name";
701         }
702     } elsif ($self->{'read_buf'}{$name}) {
703         my $buf = $self->{'read_buf'}{$name};
704
705         for my $exp (@$expectations) {
706             if ($exp->[0] eq 'eof') {
707                 die "Expected EOF but got data on $name";
708             } elsif ($exp->[0] eq 'bytes_to_eof') {
709                 # store the ongoing byte count in the expectation itself
710                 $exp->[2] = ($exp->[2] || 0) + length($buf);
711                 $self->{'read_buf'}{$name} = '';
712                 # and if this stream *also* has EOF, call back
713                 if ($self->{'got_eof'}{$name}) {
714                     $cb = $exp->[1];
715                     @args = ($exp->[2],); # byte count
716                 }
717                 last;
718             } elsif ($exp->[0] eq 'bytes') {
719                 if (length($buf) >= $exp->[1]) {
720                     $cb = $exp->[2];
721                     @args = (substr($buf, 0, $exp->[1]),);
722                     $self->{'read_buf'}{$name} = substr($buf, $exp->[1]);
723                 }
724                 last; # done searching, even if we don't call a sub
725             } elsif ($exp->[0] eq 're') {
726                 if ($buf =~ $exp->[1]) {
727                     $cb = $exp->[2];
728                     @args = ($&,); # matched section of $buf
729                     $self->{'read_buf'}{$name} = $'; # remainder of $buf
730                     last;
731                 }
732             }
733         }
734     }
735
736     # if there's a callback to make, then remove the expectations *before*
737     # calling it
738     if ($cb) {
739         delete $self->{'expectations'}{$name};
740         $cb->(@args);
741     }
742 }
743
744 1;