Imported Upstream version 3.3.3
[debian/amanda] / installcheck / Amanda_Xfer.pl
1 # Copyright (c) 2008-2012 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or
4 # modify it under the terms of the GNU General Public License
5 # as published by the Free Software Foundation; either version 2
6 # of the License, or (at your option) any later version.
7 #
8 # This program is distributed in the hope that it will be useful, but
9 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
10 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
11 # for more details.
12 #
13 # You should have received a copy of the GNU General Public License along
14 # with this program; if not, write to the Free Software Foundation, Inc.,
15 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
16 #
17 # Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
18 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
19
20 use Test::More tests => 46;
21 use File::Path;
22 use Data::Dumper;
23 use strict;
24 use warnings;
25
26 use lib "@amperldir@";
27 use Installcheck;
28 use Installcheck::Run;
29 use Installcheck::Mock;
30 use Amanda::Xfer qw( :constants );
31 use Amanda::Header;
32 use Amanda::Debug;
33 use Amanda::MainLoop;
34 use Amanda::Paths;
35 use Amanda::Config;
36 use Amanda::Constants;
37
38 # get Amanda::Device only when we're building for server
39 BEGIN {
40     use Amanda::Util;
41     if (Amanda::Util::built_with_component("server")) {
42         eval "use Amanda::Device;";
43         die $@ if $@;
44     }
45 }
46
47 # set up debugging so debug output doesn't interfere with test results
48 Amanda::Debug::dbopen("installcheck");
49 Installcheck::log_test_output();
50
51 # and disable Debug's die() and warn() overrides
52 Amanda::Debug::disable_die_override();
53
54 {
55     my $RANDOM_SEED = 0xD00D;
56
57     my $xfer = Amanda::Xfer->new([
58         Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
59         Amanda::Xfer::Filter::Xor->new(0), # key of 0 -> no change, so random seeds match
60         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
61     ]);
62
63     pass("Creating a transfer doesn't crash"); # hey, it's a start..
64
65     my $got_msg = "(not received)";
66     $xfer->start(sub {
67         my ($src, $msg, $xfer) = @_;
68         if ($msg->{type} == $XMSG_ERROR) {
69             die $msg->{elt} . " failed: " . $msg->{message};
70         }
71         if ($msg->{type} == $XMSG_INFO) {
72             $got_msg = $msg->{message};
73         } elsif ($msg->{'type'} == $XMSG_DONE) {
74             Amanda::MainLoop::quit();
75         }
76     });
77     Amanda::MainLoop::run();
78     pass("A simple transfer runs to completion");
79     is($got_msg, "Is this thing on?",
80         "XMSG_INFO from Amanda::Xfer::Dest::Null has correct message");
81 }
82
83 {
84     my $RANDOM_SEED = 0xDEADBEEF;
85
86     my $xfer1 = Amanda::Xfer->new([
87         Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
88         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
89     ]);
90     my $xfer2 = Amanda::Xfer->new([
91         Amanda::Xfer::Source::Random->new(1024*1024*3, $RANDOM_SEED),
92         Amanda::Xfer::Filter::Xor->new(0xf0),
93         Amanda::Xfer::Filter::Xor->new(0xf0),
94         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
95     ]);
96
97     my $cb = sub {
98         my ($src, $msg, $xfer) = @_;
99         if ($msg->{type} == $XMSG_ERROR) {
100             die $msg->{elt} . " failed: " . $msg->{message};
101         } elsif ($msg->{'type'} == $XMSG_DONE) {
102             if  ($xfer1->get_status() == $Amanda::Xfer::XFER_DONE
103              and $xfer2->get_status() == $Amanda::Xfer::XFER_DONE) {
104                 Amanda::MainLoop::quit();
105             }
106         }
107     };
108
109     $xfer1->start($cb);
110     $xfer2->start($cb);
111 }
112 # let the already-started transfers go out of scope before they 
113 # complete, as a memory management test..
114 Amanda::MainLoop::run();
115 pass("Two simultaneous transfers run to completion");
116
117 {
118     my $RANDOM_SEED = 0xD0DEEDAA;
119     my @elts;
120
121     # note that, because the Xor filter is flexible, assembling
122     # long pipelines can take an exponentially long time.  A 10-elt
123     # pipeline exercises the linking algorithm without wasting
124     # too many CPU cycles
125
126     push @elts, Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED);
127     for my $i (1 .. 4) {
128         push @elts, Amanda::Xfer::Filter::Xor->new($i);
129         push @elts, Amanda::Xfer::Filter::Xor->new($i);
130     }
131     push @elts, Amanda::Xfer::Dest::Null->new($RANDOM_SEED);
132     my $xfer = Amanda::Xfer->new(\@elts);
133
134     my $cb = sub {
135         my ($src, $msg, $xfer) = @_;
136         if ($msg->{type} == $XMSG_ERROR) {
137             die $msg->{elt} . " failed: " . $msg->{message};
138         } elsif ($msg->{'type'} == $XMSG_DONE) {
139             Amanda::MainLoop::quit();
140         }
141     };
142
143     $xfer->start($cb);
144
145     Amanda::MainLoop::run();
146     pass("One 10-element transfer runs to completion");
147 }
148
149
150 {
151     my $read_filename = "$Installcheck::TMP/xfer-junk-src.tmp";
152     my $write_filename = "$Installcheck::TMP/xfer-junk-dest.tmp";
153     my ($rfh, $wfh);
154
155     mkdir($Installcheck::TMP) unless (-e $Installcheck::TMP);
156
157     # fill the file with some stuff
158     open($wfh, ">", $read_filename) or die("Could not open '$read_filename' for writing");
159     for my $i (1 .. 100) { print $wfh "line $i\n"; }
160     close($wfh);
161
162     open($rfh, "<", $read_filename) or die("Could not open '$read_filename' for reading");
163     open($wfh, ">", "$write_filename") or die("Could not open '$write_filename' for writing");
164
165     # now run a transfer out of it
166     my $xfer = Amanda::Xfer->new([
167         Amanda::Xfer::Source::Fd->new(fileno($rfh)),
168         Amanda::Xfer::Filter::Xor->new(0xde),
169         Amanda::Xfer::Filter::Xor->new(0xde),
170         Amanda::Xfer::Dest::Fd->new(fileno($wfh)),
171     ]);
172
173     my $cb = sub {
174         my ($src, $msg, $xfer) = @_;
175         if ($msg->{type} == $XMSG_ERROR) {
176             die $msg->{elt} . " failed: " . $msg->{message};
177         } elsif ($msg->{'type'} == $XMSG_DONE) {
178             Amanda::MainLoop::quit();
179         }
180     };
181
182     $xfer->start($cb);
183
184     Amanda::MainLoop::run();
185
186     close($wfh);
187     close($rfh);
188
189     # now verify the file contents are identical
190     open($rfh, "<", $read_filename);
191     my $src = do { local $/; <$rfh> };
192
193     open($rfh, "<", $write_filename);
194     my $dest = do { local $/; <$rfh> };
195
196     is($src, $dest, "Source::Fd and Dest::Fd read and write files");
197
198     unlink($read_filename);
199     unlink($write_filename);
200 }
201
202 {
203     my $RANDOM_SEED = 0x5EAF00D;
204
205     # build a transfer that will keep going forever
206     my $xfer = Amanda::Xfer->new([
207         Amanda::Xfer::Source::Random->new(0, $RANDOM_SEED),
208         Amanda::Xfer::Filter::Xor->new(14),
209         Amanda::Xfer::Filter::Xor->new(14),
210         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
211     ]);
212
213     my $got_timeout = 0;
214     Amanda::MainLoop::timeout_source(200)->set_callback(sub {
215         my ($src) = @_;
216         $got_timeout = 1;
217         $src->remove();
218         $xfer->cancel();
219     });
220     $xfer->start(sub {
221         my ($src, $msg, $xfer) = @_;
222         if ($msg->{type} == $XMSG_ERROR) {
223             die $msg->{elt} . " failed: " . $msg->{message};
224         } elsif ($msg->{'type'} == $XMSG_DONE) {
225             Amanda::MainLoop::quit();
226         }
227     });
228     Amanda::MainLoop::run();
229     ok($got_timeout, "A neverending transfer finishes after being cancelled");
230     # (note that this does not test all of the cancellation possibilities)
231 }
232
233 {
234     # build a transfer that will write to a read-only fd
235     my $read_filename = "$Installcheck::TMP/xfer-junk-src.tmp";
236     my $rfh;
237
238     # create the file
239     open($rfh, ">", $read_filename) or die("Could not open '$read_filename' for writing");
240
241     # open it for reading
242     open($rfh, "<", $read_filename) or die("Could not open '$read_filename' for reading");;
243
244     my $xfer = Amanda::Xfer->new([
245         Amanda::Xfer::Source::Random->new(0, 1),
246         Amanda::Xfer::Dest::Fd->new(fileno($rfh)),
247     ]);
248
249     my $got_error = 0;
250     $xfer->start(sub {
251         my ($src, $msg, $xfer) = @_;
252         if ($msg->{type} == $XMSG_ERROR) {
253             $got_error = 1;
254         } elsif ($msg->{'type'} == $XMSG_DONE) {
255             Amanda::MainLoop::quit();
256         }
257     });
258     Amanda::MainLoop::run();
259     ok($got_error, "A transfer with an error cancels itself after sending an error");
260
261     unlink($read_filename);
262 }
263
264 # test the Process filter
265 {
266     my $RANDOM_SEED = 0xD00D;
267
268     my $xfer = Amanda::Xfer->new([
269         Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
270         Amanda::Xfer::Filter::Process->new(
271             [ $Amanda::Constants::COMPRESS_PATH, $Amanda::Constants::COMPRESS_BEST_OPT ], 0),
272         Amanda::Xfer::Filter::Process->new(
273             [ $Amanda::Constants::UNCOMPRESS_PATH, $Amanda::Constants::UNCOMPRESS_OPT ], 0),
274         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
275     ]);
276
277     $xfer->get_source()->set_callback(sub {
278         my ($src, $msg, $xfer) = @_;
279         if ($msg->{type} == $XMSG_ERROR) {
280             die $msg->{elt} . " failed: " . $msg->{message};
281         } elsif ($msg->{'type'} == $XMSG_DONE) {
282             $src->remove();
283             Amanda::MainLoop::quit();
284         }
285     });
286     $xfer->start();
287     Amanda::MainLoop::run();
288     pass("compress | uncompress gets back the original stream");
289 }
290
291 {
292     my $RANDOM_SEED = 0x5EAF00D;
293
294     # build a transfer that will keep going forever, using a source that
295     # cannot produce an EOF, so Filter::Process is forced to kill the
296     # compress process
297
298     open(my $zerofd, "<", "/dev/zero")
299         or die("could not open /dev/zero: $!");
300     my $xfer = Amanda::Xfer->new([
301         Amanda::Xfer::Source::Fd->new($zerofd),
302         Amanda::Xfer::Filter::Process->new(
303             [ $Amanda::Constants::COMPRESS_PATH, $Amanda::Constants::COMPRESS_BEST_OPT ], 0),
304         Amanda::Xfer::Dest::Null->new(0),
305     ]);
306
307     my $got_timeout = 0;
308     Amanda::MainLoop::timeout_source(200)->set_callback(sub {
309         my ($src) = @_;
310         $got_timeout = 1;
311         $src->remove();
312         $xfer->cancel();
313     });
314     $xfer->get_source()->set_callback(sub {
315         my ($src, $msg, $xfer) = @_;
316         if ($msg->{type} == $XMSG_ERROR) {
317             die $msg->{elt} . " failed: " . $msg->{message};
318         } elsif ($msg->{'type'} == $XMSG_DONE) {
319             $src->remove();
320             Amanda::MainLoop::quit();
321         }
322     });
323     $xfer->start();
324     Amanda::MainLoop::run();
325     ok($got_timeout, "Amanda::Xfer::Filter::Process can be cancelled");
326     # (note that this does not test all of the cancellation possibilities)
327 }
328
329 # Test Amanda::Xfer::Dest::Buffer
330 {
331     my $dest = Amanda::Xfer::Dest::Buffer->new(1025);
332     my $xfer = Amanda::Xfer->new([
333         Amanda::Xfer::Source::Pattern->new(1024, "ABCDEFGH"),
334         $dest,
335     ]);
336
337     $xfer->get_source()->set_callback(sub {
338         my ($src, $msg, $xfer) = @_;
339         if ($msg->{type} == $XMSG_ERROR) {
340             die $msg->{elt} . " failed: " . $msg->{message};
341         } elsif ($msg->{'type'} == $XMSG_DONE) {
342             $src->remove();
343             Amanda::MainLoop::quit();
344         }
345     });
346     $xfer->start();
347     Amanda::MainLoop::run();
348
349     is($dest->get(), 'ABCDEFGH' x 128,
350         "buffer captures the right bytes");
351 }
352
353 # Test that Amanda::Xfer::Dest::Buffer terminates an xfer early
354 {
355     my $dest = Amanda::Xfer::Dest::Buffer->new(100);
356     my $xfer = Amanda::Xfer->new([
357         Amanda::Xfer::Source::Pattern->new(1024, "ABCDEFGH"),
358         $dest,
359     ]);
360
361     my $got_err = 0;
362     $xfer->get_source()->set_callback(sub {
363         my ($src, $msg, $xfer) = @_;
364         if ($msg->{type} == $XMSG_ERROR) {
365             $got_err = 1;
366         } elsif ($msg->{'type'} == $XMSG_DONE) {
367             $src->remove();
368             Amanda::MainLoop::quit();
369         }
370     });
371     $xfer->start();
372     Amanda::MainLoop::run();
373
374     ok($got_err, "buffer stops the xfer if it doesn't have space");
375 }
376
377 SKIP: {
378     skip "not built with server", 25 unless Amanda::Util::built_with_component("server");
379
380     my $disk_cache_dir = "$Installcheck::TMP";
381     my $RANDOM_SEED = 0xFACADE;
382
383     # exercise device source and destination
384     {
385         my $RANDOM_SEED = 0xFACADE;
386         my $xfer;
387
388         my $quit_cb = make_cb(quit_cb => sub {
389             my ($src, $msg, $xfer) = @_;
390             if ($msg->{'type'} == $XMSG_ERROR) {
391                 die $msg->{'elt'} . " failed: " . $msg->{'message'};
392             } elsif ($msg->{'type'} == $XMSG_DONE) {
393                 Amanda::MainLoop::quit();
394             }
395         });
396
397         # set up vtapes
398         my $testconf = Installcheck::Run::setup();
399         $testconf->write();
400
401         # set up a device for slot 1
402         my $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape(1));
403         die("Could not open VFS device: " . $device->error())
404             unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
405
406         # write to it
407         my $hdr = Amanda::Header->new();
408         $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
409         $hdr->{'name'} = "installcheck";
410         $hdr->{'disk'} = "/";
411         $hdr->{'datestamp'} = "20080102030405";
412         $hdr->{'program'} = "INSTALLCHECK";
413
414         $device->finish();
415         $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
416         $device->start_file($hdr);
417
418         $xfer = Amanda::Xfer->new([
419             Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
420             Amanda::Xfer::Dest::Device->new($device, 0),
421         ]);
422
423         $xfer->start($quit_cb);
424
425         Amanda::MainLoop::run();
426         pass("write to a device (completed succesfully; data may not be correct)");
427
428         # finish up the file and device
429         ok(!$device->in_file(), "not in_file");
430         ok($device->finish(), "finish");
431
432         # now turn around and read from it
433         $device->start($Amanda::Device::ACCESS_READ, undef, undef);
434         $device->seek_file(1);
435
436         $xfer = Amanda::Xfer->new([
437             Amanda::Xfer::Source::Device->new($device),
438             Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
439         ]);
440
441         $xfer->start($quit_cb);
442
443         Amanda::MainLoop::run();
444         pass("read from a device succeeded, too, and data was correct");
445     }
446
447     # extra params:
448     #   cancel_after_partnum - after this partnum is completed, cancel the xfer
449     #   do_not_retry - do not retry a failed part - cancel the xfer instead
450     sub test_taper_dest {
451         my ($src, $dest_sub, $expected_messages, $msg_prefix, %params) = @_;
452         my $xfer;
453         my $device;
454         my $vtape_num = 1;
455         my @messages;
456
457         # set up vtapes
458         my $testconf = Installcheck::Run::setup();
459         $testconf->write();
460
461         my $hdr = Amanda::Header->new();
462         $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
463         $hdr->{'name'} = "installcheck";
464         $hdr->{'disk'} = "/";
465         $hdr->{'datestamp'} = "20080102030405";
466         $hdr->{'program'} = "INSTALLCHECK";
467
468         # set up a device for the taper dest
469         $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
470         die("Could not open VFS device: " . $device->error())
471             unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
472         $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
473         $device->property_set("LEOM", $params{'disable_leom'}? 0 : 1);
474         $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
475         my $dest = $dest_sub->($device);
476
477         # and create the xfer
478         $xfer = Amanda::Xfer->new([ $src, $dest ]);
479
480         my $start_new_part = sub {
481             my ($successful, $eof, $partnum, $eom) = @_;
482
483             if (exists $params{'cancel_after_partnum'}
484                     and $params{'cancel_after_partnum'} == $partnum) {
485                 push @messages, "CANCEL";
486                 $xfer->cancel();
487                 return;
488             }
489
490             if (!$device || $eom) {
491                 # set up a device and start writing a part to it
492                 $device->finish() if $device;
493                 $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
494                 die("Could not open VFS device: " . $device->error())
495                     unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
496                 $dest->use_device($device);
497                 $device->property_set("LEOM", $params{'disable_leom'}? 0 : 1);
498                 $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
499                 $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
500             }
501
502             # bail out if we shouldn't retry this part
503             if (!$successful and $params{'do_not_retry'}) {
504                 push @messages, "NOT-RETRYING";
505                 $xfer->cancel();
506                 return;
507             }
508
509             if (!$eof) {
510                 if ($successful) {
511                     $dest->start_part(0, $hdr);
512                 } else {
513                     $dest->start_part(1, $hdr);
514                 }
515             }
516         };
517
518         $xfer->start(sub {
519             my ($src, $msg, $xfer) = @_;
520
521             if ($msg->{'type'} == $XMSG_ERROR) {
522                 die $msg->{'elt'} . " failed: " . $msg->{'message'};
523             } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
524                 push @messages, "PART-" . $msg->{'partnum'} . '-' . ($msg->{'successful'}? "OK" : "FAILED");
525                 push @messages, "EOM" if $msg->{'eom'};
526                 $start_new_part->($msg->{'successful'}, $msg->{'eof'}, $msg->{'partnum'}, $msg->{'eom'});
527             } elsif ($msg->{'type'} == $XMSG_DONE) {
528                 push @messages, "DONE";
529                 Amanda::MainLoop::quit();
530             } elsif ($msg->{'type'} == $XMSG_CANCEL) {
531                 push @messages, "CANCELLED";
532             } else {
533                 push @messages, "$msg";
534             }
535         });
536
537         Amanda::MainLoop::call_later(sub { $start_new_part->(1, 0, -1); });
538         Amanda::MainLoop::run();
539
540         is_deeply([@messages],
541             $expected_messages,
542             "$msg_prefix: element produces the correct series of messages")
543         or diag(Dumper([@messages]));
544     }
545
546     sub run_recovery_source {
547         my ($dest, $files, $expected_messages, $finished_cb) = @_;
548         my $device;
549         my @filenums;
550         my @messages;
551         my $xfer;
552         my $dev;
553         my $src;
554
555         my $steps = define_steps
556             cb_ref => \$finished_cb;
557
558         step setup => sub {
559             # we need a device up front, so sneak a peek into @$files
560             $dev = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($files->[0]));
561             $src = Amanda::Xfer::Source::Recovery->new($dev);
562             $xfer = Amanda::Xfer->new([ $src, $dest ]);
563
564             $xfer->start($steps->{'got_xmsg'});
565             # got_xmsg will call got_ready when the element is ready
566         };
567
568         step got_ready => sub {
569             $steps->{'load_slot'}->();
570         };
571
572         step load_slot => sub {
573             if (!@$files) {
574                 return $src->start_part(undef);
575                 # (will trigger an XMSG_DONE; see below)
576             }
577
578             my $slot = shift @$files;
579             @filenums = @{ shift @$files };
580
581             $dev = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($slot));
582             if ($dev->status != $Amanda::Device::DEVICE_STATUS_SUCCESS) {
583                 die $dev->error_or_status();
584             }
585
586             $src->use_device($dev);
587
588             if (!$dev->start($Amanda::Device::ACCESS_READ, undef, undef)) {
589                 die $dev->error_or_status();
590             }
591
592             $steps->{'seek_file'}->();
593         };
594
595         step seek_file => sub {
596             if (!@filenums) {
597                 return $steps->{'load_slot'}->();
598             }
599
600             my $hdr = $dev->seek_file(shift @filenums);
601             if (!$hdr) {
602                 die $dev->error_or_status();
603             }
604
605             push @messages, "PART";
606
607             $src->start_part($dev);
608         };
609
610         step got_xmsg => sub {
611             my ($src, $msg, $xfer) = @_;
612
613             if ($msg->{'type'} == $XMSG_ERROR) {
614                 die $msg->{'elt'} . " failed: " . $msg->{'message'};
615             } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
616                 push @messages, "KB-" . ($msg->{'size'}/1024);
617                 $steps->{'seek_file'}->();
618             } elsif ($msg->{'type'} == $XMSG_DONE) {
619                 push @messages, "DONE";
620                 $steps->{'quit'}->();
621             } elsif ($msg->{'type'} == $XMSG_READY) {
622                 push @messages, "READY";
623                 $steps->{'got_ready'}->();
624             } elsif ($msg->{'type'} == $XMSG_CANCEL) {
625                 push @messages, "CANCELLED";
626             }
627         };
628
629         step quit => sub {
630             is_deeply([@messages],
631                 $expected_messages,
632                 "files read back and verified successfully with Amanda::Xfer::Recovery::Source")
633             or diag(Dumper([@messages]));
634
635             $finished_cb->();
636         };
637     }
638
639     sub test_recovery_source {
640         run_recovery_source(@_, \&Amanda::MainLoop::quit);
641         Amanda::MainLoop::run();
642     }
643
644     my $holding_base = "$Installcheck::TMP/source-holding";
645     my $holding_file;
646     # create a sequence of holding chunks, each 2MB.
647     sub make_holding_files {
648         my ($nchunks) = @_;
649         my $block = 'a' x 32768;
650
651         rmtree($holding_base);
652         mkpath($holding_base);
653         for (my $i = 0; $i < $nchunks; $i++) {
654             my $filename = "$holding_base/file$i";
655             open(my $fh, ">", "$filename");
656
657             my $hdr = Amanda::Header->new();
658             $hdr->{'type'} = ($i == 0)?
659                 $Amanda::Header::F_DUMPFILE : $Amanda::Header::F_CONT_DUMPFILE;
660             $hdr->{'datestamp'} = "20070102030405";
661             $hdr->{'dumplevel'} = 0;
662             $hdr->{'compressed'} = 1;
663             $hdr->{'name'} = "localhost";
664             $hdr->{'disk'} = "/home";
665             $hdr->{'program'} = "INSTALLCHECK";
666             if ($i != $nchunks-1) {
667                 $hdr->{'cont_filename'} = "$holding_base/file" . ($i+1);
668             }
669
670             print $fh $hdr->to_string(32768,32768);
671
672             for (my $b = 0; $b < 64; $b++) {
673                 print $fh $block;
674             }
675             close($fh);
676         }
677
678         return "$holding_base/file0";
679     }
680
681     # first, test the simpler Splitter class
682     test_taper_dest(
683         Amanda::Xfer::Source::Random->new(1024*1951, $RANDOM_SEED),
684         sub {
685             my ($first_dev) = @_;
686             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
687                                                      520*1024, 0);
688         },
689         [ "PART-1-OK", "PART-2-OK", "PART-3-OK", "PART-4-OK",
690           "DONE" ],
691         "Amanda::Xfer::Dest::Taper::Splitter - simple splitting");
692     test_recovery_source(
693         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
694         [ 1 => [ 1, 2, 3, 4 ], ],
695         [
696           'READY',
697           'PART',
698           'KB-544',
699           'PART',
700           'KB-544',
701           'PART',
702           'KB-544',
703           'PART',
704           'KB-319',
705           'DONE'
706         ]);
707
708     test_taper_dest(
709         Amanda::Xfer::Source::Random->new(1024*1024*3.1, $RANDOM_SEED),
710         sub {
711             my ($first_dev) = @_;
712             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
713                                                      1024*1024, 0);
714         },
715         [ "PART-1-OK", "PART-2-OK", "PART-3-OK", "EOM",
716           "PART-4-OK",
717           "DONE" ],
718         "Amanda::Xfer::Dest::Taper::Splitter - splitting and spanning with LEOM");
719     test_recovery_source(
720         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
721         [ 1 => [ 1, 2, 3 ], 2 => [ 1, ], ],
722         [
723           'READY',
724           'PART',
725           'KB-1024',
726           'PART',
727           'KB-1024',
728           'PART',
729           'KB-288',
730           'PART',
731           'KB-838',
732           'DONE'
733         ]);
734
735     test_taper_dest(
736         Amanda::Xfer::Source::Random->new(1024*1024*1.5, $RANDOM_SEED),
737         sub {
738             my ($first_dev) = @_;
739             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
740                                                      0, 0);
741         },
742         [ "PART-1-OK",
743           "DONE" ],
744         "Amanda::Xfer::Dest::Taper::Splitter - no splitting");
745     test_recovery_source(
746         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
747         [ 1 => [ 1, ], ],
748         [
749           'READY',
750           'PART',
751           'KB-1536',
752           'DONE'
753         ]);
754
755     test_taper_dest(
756         Amanda::Xfer::Source::Random->new(1024*1024*3.1, $RANDOM_SEED),
757         sub {
758             my ($first_dev) = @_;
759             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
760                                                      2368*1024, 0);
761         },
762         [ "PART-1-OK", "PART-2-OK", "EOM",
763           "PART-2-OK",
764           "DONE" ],
765         "Amanda::Xfer::Dest::Taper::Splitter - LEOM hits in file 2 header");
766     test_recovery_source(
767         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
768         [ 1 => [ 1, 2 ], 2 => [ 1, ], ],
769         [
770           'READY',
771           'PART',
772           'KB-2368',
773           'PART',
774           'KB-0', # this wouldn't be in the catalog, but it's on the vtape
775           'PART',
776           'KB-806',
777           'DONE'
778         ]);
779
780     test_taper_dest(
781         Amanda::Xfer::Source::Random->new(1024*1024*3.1, $RANDOM_SEED),
782         sub {
783             my ($first_dev) = @_;
784             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
785                                                      2368*1024, 0);
786         },
787         [ "PART-1-OK", "PART-2-FAILED", "EOM",
788           "NOT-RETRYING", "CANCELLED", "DONE" ],
789         "Amanda::Xfer::Dest::Taper::Splitter - LEOM fails, PEOM => failure",
790         disable_leom => 1, do_not_retry => 1);
791
792     # run A::X::Dest::Taper::Cacher test in each of a few different cache permutations
793     test_taper_dest(
794         Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
795         sub {
796             my ($first_dev) = @_;
797             Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024,
798                                                      1024*1024, 1, undef),
799         },
800         [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED", "EOM",
801           "PART-3-OK", "PART-4-OK", "PART-5-OK",
802           "DONE" ],
803         "Amanda::Xfer::Dest::Taper::Cacher - mem cache");
804     test_recovery_source(
805         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
806         [ 1 => [ 1, 2 ], 2 => [ 1, 2, 3 ], ],
807         [
808           'READY',
809           'PART',
810           'KB-1024',
811           'PART',
812           'KB-1024',
813           'PART',
814           'KB-1024',
815           'PART',
816           'KB-1024',
817           'PART',
818           'KB-102',
819           'DONE'
820         ]);
821
822     test_taper_dest(
823         Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
824         sub {
825             my ($first_dev) = @_;
826             Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024,
827                                               1024*1024, 0, $disk_cache_dir),
828         },
829         [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED", "EOM",
830           "PART-3-OK", "PART-4-OK", "PART-5-OK",
831           "DONE" ],
832         "Amanda::Xfer::Dest::Taper::Cacher - disk cache");
833     test_recovery_source(
834         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
835         [ 1 => [ 1, 2 ], 2 => [ 1, 2, 3 ], ],
836         [
837           'READY',
838           'PART',
839           'KB-1024',
840           'PART',
841           'KB-1024',
842           'PART',
843           'KB-1024',
844           'PART',
845           'KB-1024',
846           'PART',
847           'KB-102',
848           'DONE'
849         ]);
850
851     test_taper_dest(
852         Amanda::Xfer::Source::Random->new(1024*1024*2, $RANDOM_SEED),
853         sub {
854             my ($first_dev) = @_;
855             Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024,
856                                                     1024*1024, 0, undef),
857         },
858         [ "PART-1-OK", "PART-2-OK", "PART-3-OK",
859           "DONE" ],
860         "Amanda::Xfer::Dest::Taper::Cacher - no cache (no failed parts; exact multiple of part size)");
861     test_recovery_source(
862         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
863         [ 1 => [ 1, 2, 3 ], ],
864         [
865           'READY',
866           'PART',
867           'KB-1024',
868           'PART',
869           'KB-1024',
870           'PART',
871           'KB-0',
872           'DONE'
873         ]);
874
875     test_taper_dest(
876         Amanda::Xfer::Source::Random->new(1024*1024*2, $RANDOM_SEED),
877         sub {
878             my ($first_dev) = @_;
879             Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024, 0, 0, undef),
880         },
881         [ "PART-1-OK", "DONE" ],
882         "Amanda::Xfer::Dest::Taper::Cacher - no splitting (fits on volume)");
883     test_recovery_source(
884         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
885         [ 1 => [ 1 ], ],
886         [
887           'READY',
888           'PART',
889           'KB-2048',
890           'DONE'
891         ]);
892
893     test_taper_dest(
894         Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
895         sub {
896             my ($first_dev) = @_;
897             Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024, 0, 0, undef),
898         },
899         [ "PART-1-FAILED", "EOM",
900           "NOT-RETRYING", "CANCELLED", "DONE" ],
901         "Amanda::Xfer::Dest::Taper::Cacher - no splitting (doesn't fit on volume -> fails)",
902         do_not_retry => 1);
903
904     test_taper_dest(
905         Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
906         sub {
907             my ($first_dev) = @_;
908             Amanda::Xfer::Dest::Taper::Cacher->new($first_dev, 128*1024,
909                                             1024*1024, 0, $disk_cache_dir),
910         },
911         [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED", "EOM",
912           "PART-3-OK", "PART-4-OK", "CANCEL",
913           "CANCELLED", "DONE" ],
914         "Amanda::Xfer::Dest::Taper::Cacher - cancellation after success",
915         cancel_after_partnum => 4);
916
917     # set up a few holding chunks and read from those
918
919     $holding_file = make_holding_files(3);
920
921     test_taper_dest(
922         Amanda::Xfer::Source::Holding->new($holding_file),
923         sub {
924             my ($first_dev) = @_;
925             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
926                                             1024*1024, 1);
927         },
928         [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED", "EOM",
929           "PART-3-OK", "PART-4-OK", "PART-5-FAILED", "EOM",
930           "PART-5-OK", "PART-6-OK", "PART-7-OK",
931           "DONE" ],
932         "Amanda::Xfer::Dest::Taper::Splitter - Amanda::Xfer::Source::Holding "
933         . "acts as a source and supplies cache_inform",
934         disable_leom => 1);
935
936     ##
937     # test the cache_inform method
938
939     sub test_taper_dest_splitter_cache_inform {
940         my %params = @_;
941         my $xfer;
942         my $device;
943         my $fh;
944         my $part_size = 1024*1024;
945         my $file_size = $part_size * 4 + 100 * 1024;
946         my $cache_file = "$Installcheck::TMP/cache_file";
947         my $vtape_num = 1;
948
949         # set up our "cache", cleverly using an Amanda::Xfer::Dest::Fd
950         open($fh, ">", "$cache_file") or die("Could not open '$cache_file' for writing");
951         $xfer = Amanda::Xfer->new([
952             Amanda::Xfer::Source::Random->new($file_size, $RANDOM_SEED),
953             Amanda::Xfer::Dest::Fd->new(fileno($fh)),
954         ]);
955
956         $xfer->start(sub {
957             my ($src, $msg, $xfer) = @_;
958             if ($msg->{'type'} == $XMSG_ERROR) {
959                 die $msg->{'elt'} . " failed: " . $msg->{'message'};
960             } elsif ($msg->{'type'} == $XMSG_DONE) {
961                 Amanda::MainLoop::quit();
962             }
963         });
964         Amanda::MainLoop::run();
965         close($fh);
966
967         # create a list of holding chuunks, some slab-aligned, some part-aligned,
968         # some not
969         my @holding_chunks;
970         my $offset = 0;
971         my $do_chunk = sub {
972             my ($break) = @_;
973             die unless $break > $offset;
974             push @holding_chunks, [ $cache_file, $offset, $break - $offset ];
975             $offset = $break;
976         };
977         $do_chunk->(277);
978         $do_chunk->($part_size);
979         $do_chunk->($part_size+128*1024);
980         $do_chunk->($part_size*3);
981         $do_chunk->($part_size*3+1024);
982         $do_chunk->($part_size*3+1024*2);
983         $do_chunk->($part_size*3+1024*3);
984         $do_chunk->($part_size*4);
985         $do_chunk->($part_size*4 + 77);
986         $do_chunk->($file_size - 1);
987         $do_chunk->($file_size);
988
989         # set up vtapes
990         my $testconf = Installcheck::Run::setup();
991         $testconf->write();
992
993         my $hdr = Amanda::Header->new();
994         $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
995         $hdr->{'name'} = "installcheck";
996         $hdr->{'disk'} = "/";
997         $hdr->{'datestamp'} = "20080102030405";
998         $hdr->{'program'} = "INSTALLCHECK";
999
1000         # set up the cache file
1001         open($fh, "<", "$cache_file") or die("Could not open '$cache_file' for reading");
1002
1003         # set up a device for writing
1004         $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
1005         die("Could not open VFS device: " . $device->error())
1006             unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
1007         $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
1008         $device->property_set("LEOM", 0);
1009         $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
1010
1011         my $dest = Amanda::Xfer::Dest::Taper::Splitter->new($device, 128*1024,
1012                                                     1024*1024, 1);
1013         $xfer = Amanda::Xfer->new([
1014             Amanda::Xfer::Source::Fd->new(fileno($fh)),
1015             $dest,
1016         ]);
1017
1018         my $start_new_part = sub {
1019             my ($successful, $eof, $last_partnum) = @_;
1020
1021             if (!$device || !$successful) {
1022                 # set up a device and start writing a part to it
1023                 $device->finish() if $device;
1024                 $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
1025                 die("Could not open VFS device: " . $device->error())
1026                     unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
1027                 $dest->use_device($device);
1028                 $device->property_set("LEOM", 0);
1029                 $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
1030                 $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
1031             }
1032
1033             # feed enough chunks to cache_inform
1034             my $upto = ($last_partnum+2) * $part_size;
1035             while (@holding_chunks and $holding_chunks[0]->[1] < $upto) {
1036                 my ($filename, $offset, $length) = @{shift @holding_chunks};
1037                 $dest->cache_inform($filename, $offset, $length);
1038             }
1039
1040             if (!$eof) {
1041                 if ($successful) {
1042                     $dest->start_part(0, $hdr);
1043                 } else {
1044                     $dest->start_part(1, $hdr);
1045                 }
1046             }
1047         };
1048
1049         my @messages;
1050         $xfer->start(sub {
1051             my ($src, $msg, $xfer) = @_;
1052
1053             if ($msg->{'type'} == $XMSG_ERROR) {
1054                 push @messages, "ERROR: $msg->{message}";
1055             } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
1056                 push @messages, "PART-" . ($msg->{'successful'}? "OK" : "FAILED");
1057                 $start_new_part->($msg->{'successful'}, $msg->{'eof'}, $msg->{'partnum'});
1058             } elsif ($msg->{'type'} == $XMSG_DONE) {
1059                 push @messages, "DONE";
1060                 Amanda::MainLoop::quit();
1061             } elsif ($msg->{'type'} == $XMSG_CANCEL) {
1062                 push @messages, "CANCELLED";
1063             } else {
1064                 push @messages, $msg->{'type'};
1065             }
1066         });
1067
1068         Amanda::MainLoop::call_later(sub { $start_new_part->(1, 0, -1); });
1069         Amanda::MainLoop::run();
1070
1071         unlink($cache_file);
1072
1073         return @messages;
1074     }
1075
1076     is_deeply([ test_taper_dest_splitter_cache_inform() ],
1077         [ "PART-OK", "PART-OK", "PART-FAILED",
1078           "PART-OK", "PART-OK", "PART-OK",
1079           "DONE" ],
1080         "cache_inform: splitter element produces the correct series of messages");
1081
1082     rmtree($holding_base);
1083 }
1084
1085 # test Amanda::Xfer::Dest::Taper::DirectTCP; do it twice, once with a cancellation
1086 SKIP: {
1087     skip "not built with ndmp and server", 3 unless
1088         Amanda::Util::built_with_component("ndmp") and Amanda::Util::built_with_component("server");
1089
1090     my $RANDOM_SEED = 0xFACADE;
1091
1092     # make XDT output fairly verbose
1093     $Amanda::Config::debug_taper = 2;
1094
1095     my $ndmp = Installcheck::Mock::NdmpServer->new();
1096     my $ndmp_port = $ndmp->{'port'};
1097     my $drive = $ndmp->{'drive'};
1098
1099     my $mkdevice = sub {
1100         my $dev = Amanda::Device->new("ndmp:127.0.0.1:$ndmp_port\@$drive");
1101         die "can't create device" unless $dev->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS;
1102         $dev->property_set("verbose", 1) or die "can't set VERBOSE";
1103         $dev->property_set("ndmp_username", "ndmp") or die "can't set username";
1104         $dev->property_set("ndmp_password", "ndmp") or die "can't set password";
1105
1106         return $dev;
1107     };
1108
1109     my $hdr = Amanda::Header->new();
1110     $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
1111     $hdr->{'name'} = "installcheck";
1112     $hdr->{'disk'} = "/";
1113     $hdr->{'datestamp'} = "20080102030405";
1114     $hdr->{'program'} = "INSTALLCHECK";
1115
1116     for my $do_cancel (0, 'later', 'in_setup') {
1117         my $dev;
1118         my $xfer;
1119         my @messages;
1120
1121         # make a starting device
1122         $dev = $mkdevice->();
1123
1124         # and create the xfer
1125         my $src = Amanda::Xfer::Source::Random->new(32768*34-7, $RANDOM_SEED);
1126         # note we ask for slightly less than 15 blocks; the dest should round up
1127         my $dest = Amanda::Xfer::Dest::Taper::DirectTCP->new($dev, 32768*16-99);
1128         $xfer = Amanda::Xfer->new([ $src, $dest ]);
1129
1130         my $start_new_part; # forward declaration
1131         my $xmsg_cb = sub {
1132             my ($src, $msg, $xfer) = @_;
1133
1134             if ($msg->{'type'} == $XMSG_ERROR) {
1135                 # if this is an expected error, don't die
1136                 if ($do_cancel eq 'in_setup' and $msg->{'message'} =~ /operation not supported/) {
1137                     push @messages, "ERROR";
1138                 } else {
1139                     die $msg->{'elt'} . " failed: " . $msg->{'message'};
1140                 }
1141             } elsif ($msg->{'type'} == $XMSG_READY) {
1142                 push @messages, "READY";
1143
1144                 # get ourselves a new (albeit identical) device, just to prove that the connections
1145                 # are a little bit portable
1146                 $dev->finish();
1147                 $dev = $mkdevice->();
1148                 $dest->use_device($dev);
1149                 $dev->start($Amanda::Device::ACCESS_WRITE, "TESTCONF02", "20080102030406");
1150
1151                 $start_new_part->(1, 0); # start first part
1152             } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
1153                 push @messages, "PART-" . $msg->{'partnum'} . '-' . ($msg->{'successful'}? "OK" : "FAILED");
1154                 if ($do_cancel and $msg->{'partnum'} == 2) {
1155                     $xfer->cancel();
1156                 } else {
1157                     $start_new_part->($msg->{'successful'}, $msg->{'eof'});
1158                 }
1159             } elsif ($msg->{'type'} == $XMSG_DONE) {
1160                 push @messages, "DONE";
1161                 Amanda::MainLoop::quit();
1162             } elsif ($msg->{'type'} == $XMSG_CANCEL) {
1163                 push @messages, "CANCELLED";
1164             } else {
1165                 push @messages, "$msg";
1166             }
1167         };
1168
1169         # trigger an error in the xfer dest's setup method by putting the device
1170         # in an error state.  NDMP devices do not support append, so starting in
1171         # append mode should trigger the failure.
1172         if ($do_cancel eq 'in_setup') {
1173             if ($dev->start($Amanda::Device::ACCESS_APPEND, "MYLABEL", undef)) {
1174                 die "successfully started NDMP device in ACCESS_APPEND?!";
1175             }
1176         }
1177
1178         $xfer->start($xmsg_cb);
1179
1180         $start_new_part = sub {
1181             my ($successful, $eof) = @_;
1182
1183             die "this dest shouldn't have unsuccessful parts" unless $successful;
1184
1185             if (!$eof) {
1186                 $dest->start_part(0, $hdr);
1187             }
1188         };
1189
1190         Amanda::MainLoop::run();
1191
1192         $dev->finish();
1193
1194         if (!$do_cancel) {
1195             is_deeply([@messages],
1196                 [ 'READY', 'PART-1-OK', 'PART-2-OK', 'PART-3-OK', 'DONE' ],
1197                 "Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages")
1198             or diag(Dumper([@messages]));
1199         } elsif ($do_cancel eq 'in_setup') {
1200             is_deeply([@messages],
1201                 [ 'ERROR', 'CANCELLED', 'DONE' ],
1202                 "Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages when cancelled during setup")
1203             or diag(Dumper([@messages]));
1204         } else {
1205             is_deeply([@messages],
1206                 [ 'READY', 'PART-1-OK', 'PART-2-OK', 'CANCELLED', 'DONE' ],
1207                 "Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages when cancelled in mid-xfer")
1208             or diag(Dumper([@messages]));
1209         }
1210     }
1211
1212     # Amanda::Xfer::Source::Recovery's directtcp functionality is not
1213     # tested here, as to do so would basically require re-implementing
1214     # Amanda::Recovery::Clerk; the xfer source is adequately tested by
1215     # the Amanda::Recovery::Clerk tests.
1216
1217     $ndmp->cleanup();
1218 }
1219
1220 # directtcp stuff
1221
1222 {
1223     my $RANDOM_SEED = 0x13131313; # 13 is bad luck, right?
1224
1225     # we want this to look like:
1226     # A: [ Random -> DirectTCPConnect ]
1227     #        --dtcp-->
1228     # B: [ DirectTCPListen -> filter -> DirectTCPListen ]
1229     #        --dtcp-->
1230     # C: [ DirectTCPConnect -> filter -> Null ]
1231     #
1232     # this tests both XFER_MECH_DIRECTTCP_CONNECT and
1233     # XFER_MECH_DIRECTTCP_LISTEN, as well as some of the glue
1234     # used to attach those to filters.
1235     #
1236     # that means we need to start transfer B, since it has all of the
1237     # addresses, before creating A or C.
1238
1239     my $done = { };
1240     my $handle_msg = sub {
1241         my ($letter, $src, $msg, $xfer) = @_;
1242         if ($msg->{type} == $XMSG_ERROR) {
1243             die $msg->{elt} . " failed: " . $msg->{message};
1244         } elsif ($msg->{'type'} == $XMSG_DONE) {
1245             $done->{$letter} = 1;
1246         }
1247         if ($done->{'A'} and $done->{'B'} and $done->{'C'}) {
1248             Amanda::MainLoop::quit();
1249         }
1250     };
1251
1252     my %cbs;
1253     for my $letter ('A', 'B', 'C') {
1254         $cbs{$letter} = sub { $handle_msg->($letter, @_); };
1255     }
1256
1257     my $src_listen = Amanda::Xfer::Source::DirectTCPListen->new();
1258     my $dst_listen = Amanda::Xfer::Dest::DirectTCPListen->new();
1259     my $xferB = Amanda::Xfer->new([
1260         $src_listen,
1261         Amanda::Xfer::Filter::Xor->new(0x13),
1262         $dst_listen
1263     ]);
1264
1265     $xferB->start($cbs{'B'});
1266
1267     my $xferA = Amanda::Xfer->new([
1268         Amanda::Xfer::Source::Random->new(1024*1024*3, $RANDOM_SEED),
1269         Amanda::Xfer::Dest::DirectTCPConnect->new($src_listen->get_addrs())
1270     ]);
1271
1272     $xferA->start($cbs{'A'});
1273
1274     my $xferC = Amanda::Xfer->new([
1275         Amanda::Xfer::Source::DirectTCPConnect->new($dst_listen->get_addrs()),
1276         Amanda::Xfer::Filter::Xor->new(0x13),
1277         Amanda::Xfer::Dest::Null->new($RANDOM_SEED)
1278     ]);
1279
1280     $xferC->start($cbs{'C'});
1281
1282     # let the already-started transfers go out of scope before they 
1283     # complete, as a memory management test..
1284     Amanda::MainLoop::run();
1285     pass("Three xfers interlinked via DirectTCP complete successfully");
1286 }
1287
1288 # try cancelling a DirectTCP xfer while it's waiting in accept()
1289 {
1290     my $xfer_src = Amanda::Xfer::Source::DirectTCPListen->new();
1291     my $xfer_dst = Amanda::Xfer::Dest::Null->new(0);
1292     my $xfer = Amanda::Xfer->new([ $xfer_src, $xfer_dst ]);
1293
1294     # start up the transfer, which starts a thread which will accept
1295     # soon after that.
1296     $xfer->start(sub {
1297         my ($src, $msg, $xfer) = @_;
1298         if ($msg->{'type'} == $XMSG_DONE) {
1299             Amanda::MainLoop::quit();
1300         }
1301     });
1302
1303     sleep(1);
1304
1305     # Now, ideally we'd wait until the accept() is running, maybe testing it
1306     # with a SYN or something like that.  This is not terribly critical,
1307     # because the element glue does not check for cancellation before it begins
1308     # accepting.
1309     $xfer->cancel();
1310
1311     Amanda::MainLoop::run();
1312     pass("A DirectTCP accept operation can be cancelled");
1313 }
1314
1315 # test element comparison
1316 {
1317     my $a = Amanda::Xfer::Filter::Xor->new(0);
1318     my $b = Amanda::Xfer::Filter::Xor->new(1);
1319     ok($a == $a, "elements compare equal to themselves");
1320     ok(!($a == $b), ".. and not to other elements");
1321     ok(!($a != $a), "elements do not compare != to themselves");
1322     ok($a != $b, ".. but are != to other elements");
1323 }