7a97c378e51976e679e48a9819852ff751434e0d
[debian/amanda] / installcheck / Amanda_Xfer.pl
1 # Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License version 2 as published
5 # by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
10 # for more details.
11 #
12 # You should have received a copy of the GNU General Public License along
13 # with this program; if not, write to the Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
15 #
16 # Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
18
19 use Test::More tests => 46;
20 use File::Path;
21 use Data::Dumper;
22 use strict;
23 use warnings;
24
25 use lib "@amperldir@";
26 use Installcheck;
27 use Installcheck::Run;
28 use Installcheck::Mock;
29 use Amanda::Xfer qw( :constants );
30 use Amanda::Header;
31 use Amanda::Debug;
32 use Amanda::MainLoop;
33 use Amanda::Paths;
34 use Amanda::Config;
35 use Amanda::Constants;
36
37 # get Amanda::Device only when we're building for server
38 BEGIN {
39     use Amanda::Util;
40     if (Amanda::Util::built_with_component("server")) {
41         eval "use Amanda::Device;";
42         die $@ if $@;
43     }
44 }
45
46 # set up debugging so debug output doesn't interfere with test results
47 Amanda::Debug::dbopen("installcheck");
48 Installcheck::log_test_output();
49
50 # and disable Debug's die() and warn() overrides
51 Amanda::Debug::disable_die_override();
52
53 {
54     my $RANDOM_SEED = 0xD00D;
55
56     my $xfer = Amanda::Xfer->new([
57         Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
58         Amanda::Xfer::Filter::Xor->new(0), # key of 0 -> no change, so random seeds match
59         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
60     ]);
61
62     pass("Creating a transfer doesn't crash"); # hey, it's a start..
63
64     my $got_msg = "(not received)";
65     $xfer->start(sub {
66         my ($src, $msg, $xfer) = @_;
67         if ($msg->{type} == $XMSG_ERROR) {
68             die $msg->{elt} . " failed: " . $msg->{message};
69         }
70         if ($msg->{type} == $XMSG_INFO) {
71             $got_msg = $msg->{message};
72         } elsif ($msg->{'type'} == $XMSG_DONE) {
73             Amanda::MainLoop::quit();
74         }
75     });
76     Amanda::MainLoop::run();
77     pass("A simple transfer runs to completion");
78     is($got_msg, "Is this thing on?",
79         "XMSG_INFO from Amanda::Xfer::Dest::Null has correct message");
80 }
81
82 {
83     my $RANDOM_SEED = 0xDEADBEEF;
84
85     my $xfer1 = Amanda::Xfer->new([
86         Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
87         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
88     ]);
89     my $xfer2 = Amanda::Xfer->new([
90         Amanda::Xfer::Source::Random->new(1024*1024*3, $RANDOM_SEED),
91         Amanda::Xfer::Filter::Xor->new(0xf0),
92         Amanda::Xfer::Filter::Xor->new(0xf0),
93         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
94     ]);
95
96     my $cb = sub {
97         my ($src, $msg, $xfer) = @_;
98         if ($msg->{type} == $XMSG_ERROR) {
99             die $msg->{elt} . " failed: " . $msg->{message};
100         } elsif ($msg->{'type'} == $XMSG_DONE) {
101             if  ($xfer1->get_status() == $Amanda::Xfer::XFER_DONE
102              and $xfer2->get_status() == $Amanda::Xfer::XFER_DONE) {
103                 Amanda::MainLoop::quit();
104             }
105         }
106     };
107
108     $xfer1->start($cb);
109     $xfer2->start($cb);
110 }
111 # let the already-started transfers go out of scope before they 
112 # complete, as a memory management test..
113 Amanda::MainLoop::run();
114 pass("Two simultaneous transfers run to completion");
115
116 {
117     my $RANDOM_SEED = 0xD0DEEDAA;
118     my @elts;
119
120     # note that, because the Xor filter is flexible, assembling
121     # long pipelines can take an exponentially long time.  A 10-elt
122     # pipeline exercises the linking algorithm without wasting
123     # too many CPU cycles
124
125     push @elts, Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED);
126     for my $i (1 .. 4) {
127         push @elts, Amanda::Xfer::Filter::Xor->new($i);
128         push @elts, Amanda::Xfer::Filter::Xor->new($i);
129     }
130     push @elts, Amanda::Xfer::Dest::Null->new($RANDOM_SEED);
131     my $xfer = Amanda::Xfer->new(\@elts);
132
133     my $cb = sub {
134         my ($src, $msg, $xfer) = @_;
135         if ($msg->{type} == $XMSG_ERROR) {
136             die $msg->{elt} . " failed: " . $msg->{message};
137         } elsif ($msg->{'type'} == $XMSG_DONE) {
138             Amanda::MainLoop::quit();
139         }
140     };
141
142     $xfer->start($cb);
143
144     Amanda::MainLoop::run();
145     pass("One 10-element transfer runs to completion");
146 }
147
148
149 {
150     my $read_filename = "$Installcheck::TMP/xfer-junk-src.tmp";
151     my $write_filename = "$Installcheck::TMP/xfer-junk-dest.tmp";
152     my ($rfh, $wfh);
153
154     mkdir($Installcheck::TMP) unless (-e $Installcheck::TMP);
155
156     # fill the file with some stuff
157     open($wfh, ">", $read_filename) or die("Could not open '$read_filename' for writing");
158     for my $i (1 .. 100) { print $wfh "line $i\n"; }
159     close($wfh);
160
161     open($rfh, "<", $read_filename) or die("Could not open '$read_filename' for reading");
162     open($wfh, ">", "$write_filename") or die("Could not open '$write_filename' for writing");
163
164     # now run a transfer out of it
165     my $xfer = Amanda::Xfer->new([
166         Amanda::Xfer::Source::Fd->new(fileno($rfh)),
167         Amanda::Xfer::Filter::Xor->new(0xde),
168         Amanda::Xfer::Filter::Xor->new(0xde),
169         Amanda::Xfer::Dest::Fd->new(fileno($wfh)),
170     ]);
171
172     my $cb = sub {
173         my ($src, $msg, $xfer) = @_;
174         if ($msg->{type} == $XMSG_ERROR) {
175             die $msg->{elt} . " failed: " . $msg->{message};
176         } elsif ($msg->{'type'} == $XMSG_DONE) {
177             Amanda::MainLoop::quit();
178         }
179     };
180
181     $xfer->start($cb);
182
183     Amanda::MainLoop::run();
184
185     close($wfh);
186     close($rfh);
187
188     # now verify the file contents are identical
189     open($rfh, "<", $read_filename);
190     my $src = do { local $/; <$rfh> };
191
192     open($rfh, "<", $write_filename);
193     my $dest = do { local $/; <$rfh> };
194
195     is($src, $dest, "Source::Fd and Dest::Fd read and write files");
196
197     unlink($read_filename);
198     unlink($write_filename);
199 }
200
201 {
202     my $RANDOM_SEED = 0x5EAF00D;
203
204     # build a transfer that will keep going forever
205     my $xfer = Amanda::Xfer->new([
206         Amanda::Xfer::Source::Random->new(0, $RANDOM_SEED),
207         Amanda::Xfer::Filter::Xor->new(14),
208         Amanda::Xfer::Filter::Xor->new(14),
209         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
210     ]);
211
212     my $got_timeout = 0;
213     Amanda::MainLoop::timeout_source(200)->set_callback(sub {
214         my ($src) = @_;
215         $got_timeout = 1;
216         $src->remove();
217         $xfer->cancel();
218     });
219     $xfer->start(sub {
220         my ($src, $msg, $xfer) = @_;
221         if ($msg->{type} == $XMSG_ERROR) {
222             die $msg->{elt} . " failed: " . $msg->{message};
223         } elsif ($msg->{'type'} == $XMSG_DONE) {
224             Amanda::MainLoop::quit();
225         }
226     });
227     Amanda::MainLoop::run();
228     ok($got_timeout, "A neverending transfer finishes after being cancelled");
229     # (note that this does not test all of the cancellation possibilities)
230 }
231
232 {
233     # build a transfer that will write to a read-only fd
234     my $read_filename = "$Installcheck::TMP/xfer-junk-src.tmp";
235     my $rfh;
236
237     # create the file
238     open($rfh, ">", $read_filename) or die("Could not open '$read_filename' for writing");
239
240     # open it for reading
241     open($rfh, "<", $read_filename) or die("Could not open '$read_filename' for reading");;
242
243     my $xfer = Amanda::Xfer->new([
244         Amanda::Xfer::Source::Random->new(0, 1),
245         Amanda::Xfer::Dest::Fd->new(fileno($rfh)),
246     ]);
247
248     my $got_error = 0;
249     $xfer->start(sub {
250         my ($src, $msg, $xfer) = @_;
251         if ($msg->{type} == $XMSG_ERROR) {
252             $got_error = 1;
253         } elsif ($msg->{'type'} == $XMSG_DONE) {
254             Amanda::MainLoop::quit();
255         }
256     });
257     Amanda::MainLoop::run();
258     ok($got_error, "A transfer with an error cancels itself after sending an error");
259
260     unlink($read_filename);
261 }
262
263 # test the Process filter
264 {
265     my $RANDOM_SEED = 0xD00D;
266
267     my $xfer = Amanda::Xfer->new([
268         Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
269         Amanda::Xfer::Filter::Process->new(
270             [ $Amanda::Constants::COMPRESS_PATH, $Amanda::Constants::COMPRESS_BEST_OPT ], 0),
271         Amanda::Xfer::Filter::Process->new(
272             [ $Amanda::Constants::UNCOMPRESS_PATH, $Amanda::Constants::UNCOMPRESS_OPT ], 0),
273         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
274     ]);
275
276     $xfer->get_source()->set_callback(sub {
277         my ($src, $msg, $xfer) = @_;
278         if ($msg->{type} == $XMSG_ERROR) {
279             die $msg->{elt} . " failed: " . $msg->{message};
280         } elsif ($msg->{'type'} == $XMSG_DONE) {
281             $src->remove();
282             Amanda::MainLoop::quit();
283         }
284     });
285     $xfer->start();
286     Amanda::MainLoop::run();
287     pass("compress | uncompress gets back the original stream");
288 }
289
290 {
291     my $RANDOM_SEED = 0x5EAF00D;
292
293     # build a transfer that will keep going forever, using a source that
294     # cannot produce an EOF, so Filter::Process is forced to kill the
295     # compress process
296
297     open(my $zerofd, "<", "/dev/zero")
298         or die("could not open /dev/zero: $!");
299     my $xfer = Amanda::Xfer->new([
300         Amanda::Xfer::Source::Fd->new($zerofd),
301         Amanda::Xfer::Filter::Process->new(
302             [ $Amanda::Constants::COMPRESS_PATH, $Amanda::Constants::COMPRESS_BEST_OPT ], 0),
303         Amanda::Xfer::Dest::Null->new(0),
304     ]);
305
306     my $got_timeout = 0;
307     Amanda::MainLoop::timeout_source(200)->set_callback(sub {
308         my ($src) = @_;
309         $got_timeout = 1;
310         $src->remove();
311         $xfer->cancel();
312     });
313     $xfer->get_source()->set_callback(sub {
314         my ($src, $msg, $xfer) = @_;
315         if ($msg->{type} == $XMSG_ERROR) {
316             die $msg->{elt} . " failed: " . $msg->{message};
317         } elsif ($msg->{'type'} == $XMSG_DONE) {
318             $src->remove();
319             Amanda::MainLoop::quit();
320         }
321     });
322     $xfer->start();
323     Amanda::MainLoop::run();
324     ok($got_timeout, "Amanda::Xfer::Filter::Process can be cancelled");
325     # (note that this does not test all of the cancellation possibilities)
326 }
327
328 # Test Amanda::Xfer::Dest::Buffer
329 {
330     my $dest = Amanda::Xfer::Dest::Buffer->new(1025);
331     my $xfer = Amanda::Xfer->new([
332         Amanda::Xfer::Source::Pattern->new(1024, "ABCDEFGH"),
333         $dest,
334     ]);
335
336     $xfer->get_source()->set_callback(sub {
337         my ($src, $msg, $xfer) = @_;
338         if ($msg->{type} == $XMSG_ERROR) {
339             die $msg->{elt} . " failed: " . $msg->{message};
340         } elsif ($msg->{'type'} == $XMSG_DONE) {
341             $src->remove();
342             Amanda::MainLoop::quit();
343         }
344     });
345     $xfer->start();
346     Amanda::MainLoop::run();
347
348     is($dest->get(), 'ABCDEFGH' x 128,
349         "buffer captures the right bytes");
350 }
351
352 # Test that Amanda::Xfer::Dest::Buffer terminates an xfer early
353 {
354     my $dest = Amanda::Xfer::Dest::Buffer->new(100);
355     my $xfer = Amanda::Xfer->new([
356         Amanda::Xfer::Source::Pattern->new(1024, "ABCDEFGH"),
357         $dest,
358     ]);
359
360     my $got_err = 0;
361     $xfer->get_source()->set_callback(sub {
362         my ($src, $msg, $xfer) = @_;
363         if ($msg->{type} == $XMSG_ERROR) {
364             $got_err = 1;
365         } elsif ($msg->{'type'} == $XMSG_DONE) {
366             $src->remove();
367             Amanda::MainLoop::quit();
368         }
369     });
370     $xfer->start();
371     Amanda::MainLoop::run();
372
373     ok($got_err, "buffer stops the xfer if it doesn't have space");
374 }
375
376 SKIP: {
377     skip "not built with server", 25 unless Amanda::Util::built_with_component("server");
378
379     my $disk_cache_dir = "$Installcheck::TMP";
380     my $RANDOM_SEED = 0xFACADE;
381
382     # exercise device source and destination
383     {
384         my $RANDOM_SEED = 0xFACADE;
385         my $xfer;
386
387         my $quit_cb = make_cb(quit_cb => sub {
388             my ($src, $msg, $xfer) = @_;
389             if ($msg->{'type'} == $XMSG_ERROR) {
390                 die $msg->{'elt'} . " failed: " . $msg->{'message'};
391             } elsif ($msg->{'type'} == $XMSG_DONE) {
392                 Amanda::MainLoop::quit();
393             }
394         });
395
396         # set up vtapes
397         my $testconf = Installcheck::Run::setup();
398         $testconf->write();
399
400         # set up a device for slot 1
401         my $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape(1));
402         die("Could not open VFS device: " . $device->error())
403             unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
404
405         # write to it
406         my $hdr = Amanda::Header->new();
407         $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
408         $hdr->{'name'} = "installcheck";
409         $hdr->{'disk'} = "/";
410         $hdr->{'datestamp'} = "20080102030405";
411         $hdr->{'program'} = "INSTALLCHECK";
412
413         $device->finish();
414         $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
415         $device->start_file($hdr);
416
417         $xfer = Amanda::Xfer->new([
418             Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
419             Amanda::Xfer::Dest::Device->new($device, 0),
420         ]);
421
422         $xfer->start($quit_cb);
423
424         Amanda::MainLoop::run();
425         pass("write to a device (completed succesfully; data may not be correct)");
426
427         # finish up the file and device
428         ok(!$device->in_file(), "not in_file");
429         ok($device->finish(), "finish");
430
431         # now turn around and read from it
432         $device->start($Amanda::Device::ACCESS_READ, undef, undef);
433         $device->seek_file(1);
434
435         $xfer = Amanda::Xfer->new([
436             Amanda::Xfer::Source::Device->new($device),
437             Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
438         ]);
439
440         $xfer->start($quit_cb);
441
442         Amanda::MainLoop::run();
443         pass("read from a device succeeded, too, and data was correct");
444     }
445
446     # extra params:
447     #   cancel_after_partnum - after this partnum is completed, cancel the xfer
448     #   do_not_retry - do not retry a failed part - cancel the xfer instead
449     sub test_taper_dest {
450         my ($src, $dest_sub, $expected_messages, $msg_prefix, %params) = @_;
451         my $xfer;
452         my $device;
453         my $vtape_num = 1;
454         my @messages;
455
456         # set up vtapes
457         my $testconf = Installcheck::Run::setup();
458         $testconf->write();
459
460         my $hdr = Amanda::Header->new();
461         $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
462         $hdr->{'name'} = "installcheck";
463         $hdr->{'disk'} = "/";
464         $hdr->{'datestamp'} = "20080102030405";
465         $hdr->{'program'} = "INSTALLCHECK";
466
467         # set up a device for the taper dest
468         $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
469         die("Could not open VFS device: " . $device->error())
470             unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
471         $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
472         $device->property_set("LEOM", $params{'disable_leom'}? 0 : 1);
473         $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
474         my $dest = $dest_sub->($device);
475
476         # and create the xfer
477         $xfer = Amanda::Xfer->new([ $src, $dest ]);
478
479         my $start_new_part = sub {
480             my ($successful, $eof, $partnum, $eom) = @_;
481
482             if (exists $params{'cancel_after_partnum'}
483                     and $params{'cancel_after_partnum'} == $partnum) {
484                 push @messages, "CANCEL";
485                 $xfer->cancel();
486                 return;
487             }
488
489             if (!$device || $eom) {
490                 # set up a device and start writing a part to it
491                 $device->finish() if $device;
492                 $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
493                 die("Could not open VFS device: " . $device->error())
494                     unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
495                 $dest->use_device($device);
496                 $device->property_set("LEOM", $params{'disable_leom'}? 0 : 1);
497                 $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
498                 $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
499             }
500
501             # bail out if we shouldn't retry this part
502             if (!$successful and $params{'do_not_retry'}) {
503                 push @messages, "NOT-RETRYING";
504                 $xfer->cancel();
505                 return;
506             }
507
508             if (!$eof) {
509                 if ($successful) {
510                     $dest->start_part(0, $hdr);
511                 } else {
512                     $dest->start_part(1, $hdr);
513                 }
514             }
515         };
516
517         $xfer->start(sub {
518             my ($src, $msg, $xfer) = @_;
519
520             if ($msg->{'type'} == $XMSG_ERROR) {
521                 die $msg->{'elt'} . " failed: " . $msg->{'message'};
522             } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
523                 push @messages, "PART-" . $msg->{'partnum'} . '-' . ($msg->{'successful'}? "OK" : "FAILED");
524                 push @messages, "EOM" if $msg->{'eom'};
525                 $start_new_part->($msg->{'successful'}, $msg->{'eof'}, $msg->{'partnum'}, $msg->{'eom'});
526             } elsif ($msg->{'type'} == $XMSG_DONE) {
527                 push @messages, "DONE";
528                 Amanda::MainLoop::quit();
529             } elsif ($msg->{'type'} == $XMSG_CANCEL) {
530                 push @messages, "CANCELLED";
531             } else {
532                 push @messages, "$msg";
533             }
534         });
535
536         Amanda::MainLoop::call_later(sub { $start_new_part->(1, 0, -1); });
537         Amanda::MainLoop::run();
538
539         is_deeply([@messages],
540             $expected_messages,
541             "$msg_prefix: element produces the correct series of messages")
542         or diag(Dumper([@messages]));
543     }
544
545     sub run_recovery_source {
546         my ($dest, $files, $expected_messages, $finished_cb) = @_;
547         my $device;
548         my @filenums;
549         my @messages;
550         my $xfer;
551         my $dev;
552         my $src;
553
554         my $steps = define_steps
555             cb_ref => \$finished_cb;
556
557         step setup => sub {
558             # we need a device up front, so sneak a peek into @$files
559             $dev = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($files->[0]));
560             $src = Amanda::Xfer::Source::Recovery->new($dev);
561             $xfer = Amanda::Xfer->new([ $src, $dest ]);
562
563             $xfer->start($steps->{'got_xmsg'});
564             # got_xmsg will call got_ready when the element is ready
565         };
566
567         step got_ready => sub {
568             $steps->{'load_slot'}->();
569         };
570
571         step load_slot => sub {
572             if (!@$files) {
573                 return $src->start_part(undef);
574                 # (will trigger an XMSG_DONE; see below)
575             }
576
577             my $slot = shift @$files;
578             @filenums = @{ shift @$files };
579
580             $dev = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($slot));
581             if ($dev->status != $Amanda::Device::DEVICE_STATUS_SUCCESS) {
582                 die $dev->error_or_status();
583             }
584
585             $src->use_device($dev);
586
587             if (!$dev->start($Amanda::Device::ACCESS_READ, undef, undef)) {
588                 die $dev->error_or_status();
589             }
590
591             $steps->{'seek_file'}->();
592         };
593
594         step seek_file => sub {
595             if (!@filenums) {
596                 return $steps->{'load_slot'}->();
597             }
598
599             my $hdr = $dev->seek_file(shift @filenums);
600             if (!$hdr) {
601                 die $dev->error_or_status();
602             }
603
604             push @messages, "PART";
605
606             $src->start_part($dev);
607         };
608
609         step got_xmsg => sub {
610             my ($src, $msg, $xfer) = @_;
611
612             if ($msg->{'type'} == $XMSG_ERROR) {
613                 die $msg->{'elt'} . " failed: " . $msg->{'message'};
614             } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
615                 push @messages, "KB-" . ($msg->{'size'}/1024);
616                 $steps->{'seek_file'}->();
617             } elsif ($msg->{'type'} == $XMSG_DONE) {
618                 push @messages, "DONE";
619                 $steps->{'quit'}->();
620             } elsif ($msg->{'type'} == $XMSG_READY) {
621                 push @messages, "READY";
622                 $steps->{'got_ready'}->();
623             } elsif ($msg->{'type'} == $XMSG_CANCEL) {
624                 push @messages, "CANCELLED";
625             }
626         };
627
628         step quit => sub {
629             is_deeply([@messages],
630                 $expected_messages,
631                 "files read back and verified successfully with Amanda::Xfer::Recovery::Source")
632             or diag(Dumper([@messages]));
633
634             $finished_cb->();
635         };
636     }
637
638     sub test_recovery_source {
639         run_recovery_source(@_, \&Amanda::MainLoop::quit);
640         Amanda::MainLoop::run();
641     }
642
643     my $holding_base = "$Installcheck::TMP/source-holding";
644     my $holding_file;
645     # create a sequence of holding chunks, each 2MB.
646     sub make_holding_files {
647         my ($nchunks) = @_;
648         my $block = 'a' x 32768;
649
650         rmtree($holding_base);
651         mkpath($holding_base);
652         for (my $i = 0; $i < $nchunks; $i++) {
653             my $filename = "$holding_base/file$i";
654             open(my $fh, ">", "$filename");
655
656             my $hdr = Amanda::Header->new();
657             $hdr->{'type'} = ($i == 0)?
658                 $Amanda::Header::F_DUMPFILE : $Amanda::Header::F_CONT_DUMPFILE;
659             $hdr->{'datestamp'} = "20070102030405";
660             $hdr->{'dumplevel'} = 0;
661             $hdr->{'compressed'} = 1;
662             $hdr->{'name'} = "localhost";
663             $hdr->{'disk'} = "/home";
664             $hdr->{'program'} = "INSTALLCHECK";
665             if ($i != $nchunks-1) {
666                 $hdr->{'cont_filename'} = "$holding_base/file" . ($i+1);
667             }
668
669             print $fh $hdr->to_string(32768,32768);
670
671             for (my $b = 0; $b < 64; $b++) {
672                 print $fh $block;
673             }
674             close($fh);
675         }
676
677         return "$holding_base/file0";
678     }
679
680     # first, test the simpler Splitter class
681     test_taper_dest(
682         Amanda::Xfer::Source::Random->new(1024*1951, $RANDOM_SEED),
683         sub {
684             my ($first_dev) = @_;
685             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
686                                                      520*1024, 0);
687         },
688         [ "PART-1-OK", "PART-2-OK", "PART-3-OK", "PART-4-OK",
689           "DONE" ],
690         "Amanda::Xfer::Dest::Taper::Splitter - simple splitting");
691     test_recovery_source(
692         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
693         [ 1 => [ 1, 2, 3, 4 ], ],
694         [
695           'READY',
696           'PART',
697           'KB-544',
698           'PART',
699           'KB-544',
700           'PART',
701           'KB-544',
702           'PART',
703           'KB-319',
704           'DONE'
705         ]);
706
707     test_taper_dest(
708         Amanda::Xfer::Source::Random->new(1024*1024*3.1, $RANDOM_SEED),
709         sub {
710             my ($first_dev) = @_;
711             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
712                                                      1024*1024, 0);
713         },
714         [ "PART-1-OK", "PART-2-OK", "PART-3-OK", "EOM",
715           "PART-4-OK",
716           "DONE" ],
717         "Amanda::Xfer::Dest::Taper::Splitter - splitting and spanning with LEOM");
718     test_recovery_source(
719         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
720         [ 1 => [ 1, 2, 3 ], 2 => [ 1, ], ],
721         [
722           'READY',
723           'PART',
724           'KB-1024',
725           'PART',
726           'KB-1024',
727           'PART',
728           'KB-288',
729           'PART',
730           'KB-838',
731           'DONE'
732         ]);
733
734     test_taper_dest(
735         Amanda::Xfer::Source::Random->new(1024*1024*1.5, $RANDOM_SEED),
736         sub {
737             my ($first_dev) = @_;
738             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
739                                                      0, 0);
740         },
741         [ "PART-1-OK",
742           "DONE" ],
743         "Amanda::Xfer::Dest::Taper::Splitter - no splitting");
744     test_recovery_source(
745         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
746         [ 1 => [ 1, ], ],
747         [
748           'READY',
749           'PART',
750           'KB-1536',
751           'DONE'
752         ]);
753
754     test_taper_dest(
755         Amanda::Xfer::Source::Random->new(1024*1024*3.1, $RANDOM_SEED),
756         sub {
757             my ($first_dev) = @_;
758             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
759                                                      2368*1024, 0);
760         },
761         [ "PART-1-OK", "PART-2-OK", "EOM",
762           "PART-3-OK",
763           "DONE" ],
764         "Amanda::Xfer::Dest::Taper::Splitter - LEOM hits in file 2 header");
765     test_recovery_source(
766         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
767         [ 1 => [ 1, 2 ], 2 => [ 1, ], ],
768         [
769           'READY',
770           'PART',
771           'KB-2368',
772           'PART',
773           'KB-0', # this wouldn't be in the catalog, but it's on the vtape
774           'PART',
775           'KB-806',
776           'DONE'
777         ]);
778
779     test_taper_dest(
780         Amanda::Xfer::Source::Random->new(1024*1024*3.1, $RANDOM_SEED),
781         sub {
782             my ($first_dev) = @_;
783             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
784                                                      2368*1024, 0);
785         },
786         [ "PART-1-OK", "PART-2-FAILED", "EOM",
787           "NOT-RETRYING", "CANCELLED", "DONE" ],
788         "Amanda::Xfer::Dest::Taper::Splitter - LEOM fails, PEOM => failure",
789         disable_leom => 1, do_not_retry => 1);
790
791     # run A::X::Dest::Taper::Cacher test in each of a few different cache permutations
792     test_taper_dest(
793         Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
794         sub {
795             my ($first_dev) = @_;
796             Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024,
797                                                      1024*1024, 1, undef),
798         },
799         [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED", "EOM",
800           "PART-3-OK", "PART-4-OK", "PART-5-OK",
801           "DONE" ],
802         "Amanda::Xfer::Dest::Taper::Cacher - mem cache");
803     test_recovery_source(
804         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
805         [ 1 => [ 1, 2 ], 2 => [ 1, 2, 3 ], ],
806         [
807           'READY',
808           'PART',
809           'KB-1024',
810           'PART',
811           'KB-1024',
812           'PART',
813           'KB-1024',
814           'PART',
815           'KB-1024',
816           'PART',
817           'KB-102',
818           'DONE'
819         ]);
820
821     test_taper_dest(
822         Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
823         sub {
824             my ($first_dev) = @_;
825             Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024,
826                                               1024*1024, 0, $disk_cache_dir),
827         },
828         [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED", "EOM",
829           "PART-3-OK", "PART-4-OK", "PART-5-OK",
830           "DONE" ],
831         "Amanda::Xfer::Dest::Taper::Cacher - disk cache");
832     test_recovery_source(
833         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
834         [ 1 => [ 1, 2 ], 2 => [ 1, 2, 3 ], ],
835         [
836           'READY',
837           'PART',
838           'KB-1024',
839           'PART',
840           'KB-1024',
841           'PART',
842           'KB-1024',
843           'PART',
844           'KB-1024',
845           'PART',
846           'KB-102',
847           'DONE'
848         ]);
849
850     test_taper_dest(
851         Amanda::Xfer::Source::Random->new(1024*1024*2, $RANDOM_SEED),
852         sub {
853             my ($first_dev) = @_;
854             Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024,
855                                                     1024*1024, 0, undef),
856         },
857         [ "PART-1-OK", "PART-2-OK", "PART-3-OK",
858           "DONE" ],
859         "Amanda::Xfer::Dest::Taper::Cacher - no cache (no failed parts; exact multiple of part size)");
860     test_recovery_source(
861         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
862         [ 1 => [ 1, 2, 3 ], ],
863         [
864           'READY',
865           'PART',
866           'KB-1024',
867           'PART',
868           'KB-1024',
869           'PART',
870           'KB-0',
871           'DONE'
872         ]);
873
874     test_taper_dest(
875         Amanda::Xfer::Source::Random->new(1024*1024*2, $RANDOM_SEED),
876         sub {
877             my ($first_dev) = @_;
878             Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024, 0, 0, undef),
879         },
880         [ "PART-1-OK", "DONE" ],
881         "Amanda::Xfer::Dest::Taper::Cacher - no splitting (fits on volume)");
882     test_recovery_source(
883         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
884         [ 1 => [ 1 ], ],
885         [
886           'READY',
887           'PART',
888           'KB-2048',
889           'DONE'
890         ]);
891
892     test_taper_dest(
893         Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
894         sub {
895             my ($first_dev) = @_;
896             Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024, 0, 0, undef),
897         },
898         [ "PART-1-FAILED", "EOM",
899           "NOT-RETRYING", "CANCELLED", "DONE" ],
900         "Amanda::Xfer::Dest::Taper::Cacher - no splitting (doesn't fit on volume -> fails)",
901         do_not_retry => 1);
902
903     test_taper_dest(
904         Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
905         sub {
906             my ($first_dev) = @_;
907             Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024,
908                                             1024*1024, 0, $disk_cache_dir),
909         },
910         [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED", "EOM",
911           "PART-3-OK", "PART-4-OK", "CANCEL",
912           "CANCELLED", "DONE" ],
913         "Amanda::Xfer::Dest::Taper::Cacher - cancellation after success",
914         cancel_after_partnum => 4);
915
916     # set up a few holding chunks and read from those
917
918     $holding_file = make_holding_files(3);
919
920     test_taper_dest(
921         Amanda::Xfer::Source::Holding->new($holding_file),
922         sub {
923             my ($first_dev) = @_;
924             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
925                                             1024*1024, 1);
926         },
927         [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED", "EOM",
928           "PART-3-OK", "PART-4-OK", "PART-5-FAILED", "EOM",
929           "PART-5-OK", "PART-6-OK", "PART-7-OK",
930           "DONE" ],
931         "Amanda::Xfer::Dest::Taper::Splitter - Amanda::Xfer::Source::Holding "
932         . "acts as a source and supplies cache_inform",
933         disable_leom => 1);
934
935     ##
936     # test the cache_inform method
937
938     sub test_taper_dest_splitter_cache_inform {
939         my %params = @_;
940         my $xfer;
941         my $device;
942         my $fh;
943         my $part_size = 1024*1024;
944         my $file_size = $part_size * 4 + 100 * 1024;
945         my $cache_file = "$Installcheck::TMP/cache_file";
946         my $vtape_num = 1;
947
948         # set up our "cache", cleverly using an Amanda::Xfer::Dest::Fd
949         open($fh, ">", "$cache_file") or die("Could not open '$cache_file' for writing");
950         $xfer = Amanda::Xfer->new([
951             Amanda::Xfer::Source::Random->new($file_size, $RANDOM_SEED),
952             Amanda::Xfer::Dest::Fd->new(fileno($fh)),
953         ]);
954
955         $xfer->start(sub {
956             my ($src, $msg, $xfer) = @_;
957             if ($msg->{'type'} == $XMSG_ERROR) {
958                 die $msg->{'elt'} . " failed: " . $msg->{'message'};
959             } elsif ($msg->{'type'} == $XMSG_DONE) {
960                 Amanda::MainLoop::quit();
961             }
962         });
963         Amanda::MainLoop::run();
964         close($fh);
965
966         # create a list of holding chuunks, some slab-aligned, some part-aligned,
967         # some not
968         my @holding_chunks;
969         my $offset = 0;
970         my $do_chunk = sub {
971             my ($break) = @_;
972             die unless $break > $offset;
973             push @holding_chunks, [ $cache_file, $offset, $break - $offset ];
974             $offset = $break;
975         };
976         $do_chunk->(277);
977         $do_chunk->($part_size);
978         $do_chunk->($part_size+128*1024);
979         $do_chunk->($part_size*3);
980         $do_chunk->($part_size*3+1024);
981         $do_chunk->($part_size*3+1024*2);
982         $do_chunk->($part_size*3+1024*3);
983         $do_chunk->($part_size*4);
984         $do_chunk->($part_size*4 + 77);
985         $do_chunk->($file_size - 1);
986         $do_chunk->($file_size);
987
988         # set up vtapes
989         my $testconf = Installcheck::Run::setup();
990         $testconf->write();
991
992         my $hdr = Amanda::Header->new();
993         $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
994         $hdr->{'name'} = "installcheck";
995         $hdr->{'disk'} = "/";
996         $hdr->{'datestamp'} = "20080102030405";
997         $hdr->{'program'} = "INSTALLCHECK";
998
999         # set up the cache file
1000         open($fh, "<", "$cache_file") or die("Could not open '$cache_file' for reading");
1001
1002         # set up a device for writing
1003         $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
1004         die("Could not open VFS device: " . $device->error())
1005             unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
1006         $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
1007         $device->property_set("LEOM", 0);
1008         $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
1009
1010         my $dest = Amanda::Xfer::Dest::Taper::Splitter->new($device, 128*1024,
1011                                                     1024*1024, 1);
1012         $xfer = Amanda::Xfer->new([
1013             Amanda::Xfer::Source::Fd->new(fileno($fh)),
1014             $dest,
1015         ]);
1016
1017         my $start_new_part = sub {
1018             my ($successful, $eof, $last_partnum) = @_;
1019
1020             if (!$device || !$successful) {
1021                 # set up a device and start writing a part to it
1022                 $device->finish() if $device;
1023                 $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
1024                 die("Could not open VFS device: " . $device->error())
1025                     unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
1026                 $dest->use_device($device);
1027                 $device->property_set("LEOM", 0);
1028                 $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
1029                 $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
1030             }
1031
1032             # feed enough chunks to cache_inform
1033             my $upto = ($last_partnum+2) * $part_size;
1034             while (@holding_chunks and $holding_chunks[0]->[1] < $upto) {
1035                 my ($filename, $offset, $length) = @{shift @holding_chunks};
1036                 $dest->cache_inform($filename, $offset, $length);
1037             }
1038
1039             if (!$eof) {
1040                 if ($successful) {
1041                     $dest->start_part(0, $hdr);
1042                 } else {
1043                     $dest->start_part(1, $hdr);
1044                 }
1045             }
1046         };
1047
1048         my @messages;
1049         $xfer->start(sub {
1050             my ($src, $msg, $xfer) = @_;
1051
1052             if ($msg->{'type'} == $XMSG_ERROR) {
1053                 push @messages, "ERROR: $msg->{message}";
1054             } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
1055                 push @messages, "PART-" . ($msg->{'successful'}? "OK" : "FAILED");
1056                 $start_new_part->($msg->{'successful'}, $msg->{'eof'}, $msg->{'partnum'});
1057             } elsif ($msg->{'type'} == $XMSG_DONE) {
1058                 push @messages, "DONE";
1059                 Amanda::MainLoop::quit();
1060             } elsif ($msg->{'type'} == $XMSG_CANCEL) {
1061                 push @messages, "CANCELLED";
1062             } else {
1063                 push @messages, $msg->{'type'};
1064             }
1065         });
1066
1067         Amanda::MainLoop::call_later(sub { $start_new_part->(1, 0, -1); });
1068         Amanda::MainLoop::run();
1069
1070         unlink($cache_file);
1071
1072         return @messages;
1073     }
1074
1075     is_deeply([ test_taper_dest_splitter_cache_inform() ],
1076         [ "PART-OK", "PART-OK", "PART-FAILED",
1077           "PART-OK", "PART-OK", "PART-OK",
1078           "DONE" ],
1079         "cache_inform: splitter element produces the correct series of messages");
1080
1081     rmtree($holding_base);
1082 }
1083
1084 # test Amanda::Xfer::Dest::Taper::DirectTCP; do it twice, once with a cancellation
1085 SKIP: {
1086     skip "not built with ndmp and server", 3 unless
1087         Amanda::Util::built_with_component("ndmp") and Amanda::Util::built_with_component("server");
1088
1089     my $RANDOM_SEED = 0xFACADE;
1090
1091     # make XDT output fairly verbose
1092     $Amanda::Config::debug_taper = 2;
1093
1094     my $ndmp = Installcheck::Mock::NdmpServer->new();
1095     my $ndmp_port = $ndmp->{'port'};
1096     my $drive = $ndmp->{'drive'};
1097
1098     my $mkdevice = sub {
1099         my $dev = Amanda::Device->new("ndmp:127.0.0.1:$ndmp_port\@$drive");
1100         die "can't create device" unless $dev->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS;
1101         $dev->property_set("verbose", 1) or die "can't set VERBOSE";
1102         $dev->property_set("ndmp_username", "ndmp") or die "can't set username";
1103         $dev->property_set("ndmp_password", "ndmp") or die "can't set password";
1104
1105         return $dev;
1106     };
1107
1108     my $hdr = Amanda::Header->new();
1109     $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
1110     $hdr->{'name'} = "installcheck";
1111     $hdr->{'disk'} = "/";
1112     $hdr->{'datestamp'} = "20080102030405";
1113     $hdr->{'program'} = "INSTALLCHECK";
1114
1115     for my $do_cancel (0, 'later', 'in_setup') {
1116         my $dev;
1117         my $xfer;
1118         my @messages;
1119
1120         # make a starting device
1121         $dev = $mkdevice->();
1122
1123         # and create the xfer
1124         my $src = Amanda::Xfer::Source::Random->new(32768*34-7, $RANDOM_SEED);
1125         # note we ask for slightly less than 15 blocks; the dest should round up
1126         my $dest = Amanda::Xfer::Dest::Taper::DirectTCP->new($dev, 32768*16-99);
1127         $xfer = Amanda::Xfer->new([ $src, $dest ]);
1128
1129         my $start_new_part; # forward declaration
1130         my $xmsg_cb = sub {
1131             my ($src, $msg, $xfer) = @_;
1132
1133             if ($msg->{'type'} == $XMSG_ERROR) {
1134                 # if this is an expected error, don't die
1135                 if ($do_cancel eq 'in_setup' and $msg->{'message'} =~ /operation not supported/) {
1136                     push @messages, "ERROR";
1137                 } else {
1138                     die $msg->{'elt'} . " failed: " . $msg->{'message'};
1139                 }
1140             } elsif ($msg->{'type'} == $XMSG_READY) {
1141                 push @messages, "READY";
1142
1143                 # get ourselves a new (albeit identical) device, just to prove that the connections
1144                 # are a little bit portable
1145                 $dev->finish();
1146                 $dev = $mkdevice->();
1147                 $dest->use_device($dev);
1148                 $dev->start($Amanda::Device::ACCESS_WRITE, "TESTCONF02", "20080102030406");
1149
1150                 $start_new_part->(1, 0); # start first part
1151             } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
1152                 push @messages, "PART-" . $msg->{'partnum'} . '-' . ($msg->{'successful'}? "OK" : "FAILED");
1153                 if ($do_cancel and $msg->{'partnum'} == 2) {
1154                     $xfer->cancel();
1155                 } else {
1156                     $start_new_part->($msg->{'successful'}, $msg->{'eof'});
1157                 }
1158             } elsif ($msg->{'type'} == $XMSG_DONE) {
1159                 push @messages, "DONE";
1160                 Amanda::MainLoop::quit();
1161             } elsif ($msg->{'type'} == $XMSG_CANCEL) {
1162                 push @messages, "CANCELLED";
1163             } else {
1164                 push @messages, "$msg";
1165             }
1166         };
1167
1168         # trigger an error in the xfer dest's setup method by putting the device
1169         # in an error state.  NDMP devices do not support append, so starting in
1170         # append mode should trigger the failure.
1171         if ($do_cancel eq 'in_setup') {
1172             if ($dev->start($Amanda::Device::ACCESS_APPEND, "MYLABEL", undef)) {
1173                 die "successfully started NDMP device in ACCESS_APPEND?!";
1174             }
1175         }
1176
1177         $xfer->start($xmsg_cb);
1178
1179         $start_new_part = sub {
1180             my ($successful, $eof) = @_;
1181
1182             die "this dest shouldn't have unsuccessful parts" unless $successful;
1183
1184             if (!$eof) {
1185                 $dest->start_part(0, $hdr);
1186             }
1187         };
1188
1189         Amanda::MainLoop::run();
1190
1191         $dev->finish();
1192
1193         if (!$do_cancel) {
1194             is_deeply([@messages],
1195                 [ 'READY', 'PART-1-OK', 'PART-2-OK', 'PART-3-OK', 'DONE' ],
1196                 "Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages")
1197             or diag(Dumper([@messages]));
1198         } elsif ($do_cancel eq 'in_setup') {
1199             is_deeply([@messages],
1200                 [ 'ERROR', 'CANCELLED', 'DONE' ],
1201                 "Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages when cancelled during setup")
1202             or diag(Dumper([@messages]));
1203         } else {
1204             is_deeply([@messages],
1205                 [ 'READY', 'PART-1-OK', 'PART-2-OK', 'CANCELLED', 'DONE' ],
1206                 "Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages when cancelled in mid-xfer")
1207             or diag(Dumper([@messages]));
1208         }
1209     }
1210
1211     # Amanda::Xfer::Source::Recovery's directtcp functionality is not
1212     # tested here, as to do so would basically require re-implementing
1213     # Amanda::Recovery::Clerk; the xfer source is adequately tested by
1214     # the Amanda::Recovery::Clerk tests.
1215
1216     $ndmp->cleanup();
1217 }
1218
1219 # directtcp stuff
1220
1221 {
1222     my $RANDOM_SEED = 0x13131313; # 13 is bad luck, right?
1223
1224     # we want this to look like:
1225     # A: [ Random -> DirectTCPConnect ]
1226     #        --dtcp-->
1227     # B: [ DirectTCPListen -> filter -> DirectTCPListen ]
1228     #        --dtcp-->
1229     # C: [ DirectTCPConnect -> filter -> Null ]
1230     #
1231     # this tests both XFER_MECH_DIRECTTCP_CONNECT and
1232     # XFER_MECH_DIRECTTCP_LISTEN, as well as some of the glue
1233     # used to attach those to filters.
1234     #
1235     # that means we need to start transfer B, since it has all of the
1236     # addresses, before creating A or C.
1237
1238     my $done = { };
1239     my $handle_msg = sub {
1240         my ($letter, $src, $msg, $xfer) = @_;
1241         if ($msg->{type} == $XMSG_ERROR) {
1242             die $msg->{elt} . " failed: " . $msg->{message};
1243         } elsif ($msg->{'type'} == $XMSG_DONE) {
1244             $done->{$letter} = 1;
1245         }
1246         if ($done->{'A'} and $done->{'B'} and $done->{'C'}) {
1247             Amanda::MainLoop::quit();
1248         }
1249     };
1250
1251     my %cbs;
1252     for my $letter ('A', 'B', 'C') {
1253         $cbs{$letter} = sub { $handle_msg->($letter, @_); };
1254     }
1255
1256     my $src_listen = Amanda::Xfer::Source::DirectTCPListen->new();
1257     my $dst_listen = Amanda::Xfer::Dest::DirectTCPListen->new();
1258     my $xferB = Amanda::Xfer->new([
1259         $src_listen,
1260         Amanda::Xfer::Filter::Xor->new(0x13),
1261         $dst_listen
1262     ]);
1263
1264     $xferB->start($cbs{'B'});
1265
1266     my $xferA = Amanda::Xfer->new([
1267         Amanda::Xfer::Source::Random->new(1024*1024*3, $RANDOM_SEED),
1268         Amanda::Xfer::Dest::DirectTCPConnect->new($src_listen->get_addrs())
1269     ]);
1270
1271     $xferA->start($cbs{'A'});
1272
1273     my $xferC = Amanda::Xfer->new([
1274         Amanda::Xfer::Source::DirectTCPConnect->new($dst_listen->get_addrs()),
1275         Amanda::Xfer::Filter::Xor->new(0x13),
1276         Amanda::Xfer::Dest::Null->new($RANDOM_SEED)
1277     ]);
1278
1279     $xferC->start($cbs{'C'});
1280
1281     # let the already-started transfers go out of scope before they 
1282     # complete, as a memory management test..
1283     Amanda::MainLoop::run();
1284     pass("Three xfers interlinked via DirectTCP complete successfully");
1285 }
1286
1287 # try cancelling a DirectTCP xfer while it's waiting in accept()
1288 {
1289     my $xfer_src = Amanda::Xfer::Source::DirectTCPListen->new();
1290     my $xfer_dst = Amanda::Xfer::Dest::Null->new(0);
1291     my $xfer = Amanda::Xfer->new([ $xfer_src, $xfer_dst ]);
1292
1293     # start up the transfer, which starts a thread which will accept
1294     # soon after that.
1295     $xfer->start(sub {
1296         my ($src, $msg, $xfer) = @_;
1297         if ($msg->{'type'} == $XMSG_DONE) {
1298             Amanda::MainLoop::quit();
1299         }
1300     });
1301
1302     sleep(1);
1303
1304     # Now, ideally we'd wait until the accept() is running, maybe testing it
1305     # with a SYN or something like that.  This is not terribly critical,
1306     # because the element glue does not check for cancellation before it begins
1307     # accepting.
1308     $xfer->cancel();
1309
1310     Amanda::MainLoop::run();
1311     pass("A DirectTCP accept operation can be cancelled");
1312 }
1313
1314 # test element comparison
1315 {
1316     my $a = Amanda::Xfer::Filter::Xor->new(0);
1317     my $b = Amanda::Xfer::Filter::Xor->new(1);
1318     ok($a == $a, "elements compare equal to themselves");
1319     ok(!($a == $b), ".. and not to other elements");
1320     ok(!($a != $a), "elements do not compare != to themselves");
1321     ok($a != $b, ".. but are != to other elements");
1322 }