Imported Upstream version 3.1.0
[debian/amanda] / installcheck / Amanda_Xfer.pl
index 86a33a27ea4e35086e61d643ded1be492ac11bbc..da845b378b067e451d2ada4ef289d5e54bda9ddc 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2005-2008 Zmanda Inc.  All Rights Reserved.
+# Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
 #
 # This program is free software; you can redistribute it and/or modify it
 # under the terms of the GNU General Public License version 2 as published
 # with this program; if not, write to the Free Software Foundation, Inc.,
 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
 #
-# Contact information: Zmanda Inc, 465 S Mathlida Ave, Suite 300
+# Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
 
-use Test::More tests => 12;
+use Test::More tests => 37;
 use File::Path;
+use Data::Dumper;
 use strict;
 
 use lib "@amperldir@";
+use Installcheck;
 use Installcheck::Run;
+use Installcheck::Mock;
 use Amanda::Xfer qw( :constants );
-use Amanda::Device qw( :constants );
-use Amanda::Types;
+use Amanda::Header;
 use Amanda::Debug;
 use Amanda::MainLoop;
 use Amanda::Paths;
 use Amanda::Config;
+use Amanda::Constants;
+
+# get Amanda::Device only when we're building for server
+BEGIN {
+    use Amanda::Util;
+    if (Amanda::Util::built_with_component("server")) {
+       eval "use Amanda::Device;";
+       die $@ if $@;
+    }
+}
 
 # set up debugging so debug output doesn't interfere with test results
 Amanda::Debug::dbopen("installcheck");
+Installcheck::log_test_output();
 
 # and disable Debug's die() and warn() overrides
 Amanda::Debug::disable_die_override();
 
-# initialize configuration for the device API
-Amanda::Config::config_init(0, undef);
-
 {
     my $RANDOM_SEED = 0xD00D;
 
@@ -51,20 +61,17 @@ Amanda::Config::config_init(0, undef);
     pass("Creating a transfer doesn't crash"); # hey, it's a start..
 
     my $got_msg = "(not received)";
-    $xfer->get_source()->set_callback(sub {
+    $xfer->start(sub {
        my ($src, $msg, $xfer) = @_;
        if ($msg->{type} == $XMSG_ERROR) {
            die $msg->{elt} . " failed: " . $msg->{message};
        }
        if ($msg->{type} == $XMSG_INFO) {
            $got_msg = $msg->{message};
-       }
-       elsif ($xfer->get_status() == $Amanda::Xfer::XFER_DONE) {
-           $src->remove();
+       } elsif ($msg->{'type'} == $XMSG_DONE) {
            Amanda::MainLoop::quit();
        }
     });
-    $xfer->start();
     Amanda::MainLoop::run();
     pass("A simple transfer runs to completion");
     is($got_msg, "Is this thing on?",
@@ -89,20 +96,16 @@ Amanda::Config::config_init(0, undef);
        my ($src, $msg, $xfer) = @_;
        if ($msg->{type} == $XMSG_ERROR) {
            die $msg->{elt} . " failed: " . $msg->{message};
-       }
-       if  ($xfer1->get_status() == $Amanda::Xfer::XFER_DONE
-        and $xfer2->get_status() == $Amanda::Xfer::XFER_DONE) {
-           $xfer1->get_source()->remove();
-           $xfer2->get_source()->remove();
-           Amanda::MainLoop::quit();
+       } elsif ($msg->{'type'} == $XMSG_DONE) {
+           if  ($xfer1->get_status() == $Amanda::Xfer::XFER_DONE
+            and $xfer2->get_status() == $Amanda::Xfer::XFER_DONE) {
+               Amanda::MainLoop::quit();
+           }
        }
     };
 
-    $xfer1->get_source()->set_callback($cb);
-    $xfer2->get_source()->set_callback($cb);
-
-    $xfer1->start();
-    $xfer2->start();
+    $xfer1->start($cb);
+    $xfer2->start($cb);
 }
 # let the already-started transfers go out of scope before they 
 # complete, as a memory management test..
@@ -130,15 +133,12 @@ pass("Two simultaneous transfers run to completion");
        my ($src, $msg, $xfer) = @_;
        if ($msg->{type} == $XMSG_ERROR) {
            die $msg->{elt} . " failed: " . $msg->{message};
-       }
-       if ($xfer->get_status() == $Amanda::Xfer::XFER_DONE) {
-           $xfer->get_source()->remove();
+       } elsif ($msg->{'type'} == $XMSG_DONE) {
            Amanda::MainLoop::quit();
        }
     };
 
-    $xfer->get_source()->set_callback($cb);
-    $xfer->start();
+    $xfer->start($cb);
 
     Amanda::MainLoop::run();
     pass("One 10-element transfer runs to completion");
@@ -146,11 +146,11 @@ pass("Two simultaneous transfers run to completion");
 
 
 {
-    my $read_filename = "$Amanda::Paths::AMANDA_TMPDIR/xfer-junk-src.tmp";
-    my $write_filename = "$Amanda::Paths::AMANDA_TMPDIR/xfer-junk-dest.tmp";
+    my $read_filename = "$Installcheck::TMP/xfer-junk-src.tmp";
+    my $write_filename = "$Installcheck::TMP/xfer-junk-dest.tmp";
     my ($rfh, $wfh);
 
-    mkdir($Amanda::Paths::AMANDA_TMPDIR) unless (-e $Amanda::Paths::AMANDA_TMPDIR);
+    mkdir($Installcheck::TMP) unless (-e $Installcheck::TMP);
 
     # fill the file with some stuff
     open($wfh, ">", $read_filename) or die("Could not open '$read_filename' for writing");
@@ -172,15 +172,12 @@ pass("Two simultaneous transfers run to completion");
        my ($src, $msg, $xfer) = @_;
        if ($msg->{type} == $XMSG_ERROR) {
            die $msg->{elt} . " failed: " . $msg->{message};
-       }
-       if ($xfer->get_status() == $Amanda::Xfer::XFER_DONE) {
-           $xfer->get_source()->remove();
+       } elsif ($msg->{'type'} == $XMSG_DONE) {
            Amanda::MainLoop::quit();
        }
     };
 
-    $xfer->get_source()->set_callback($cb);
-    $xfer->start();
+    $xfer->start($cb);
 
     Amanda::MainLoop::run();
 
@@ -200,82 +197,109 @@ pass("Two simultaneous transfers run to completion");
     unlink($write_filename);
 }
 
-# exercise device source and destination
 {
-    my $RANDOM_SEED = 0xFACADE;
-    my $xfer;
+    my $RANDOM_SEED = 0x5EAF00D;
 
-    my $quit_cb = sub {
+    # build a transfer that will keep going forever
+    my $xfer = Amanda::Xfer->new([
+       Amanda::Xfer::Source::Random->new(0, $RANDOM_SEED),
+       Amanda::Xfer::Filter::Xor->new(14),
+       Amanda::Xfer::Filter::Xor->new(14),
+       Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
+    ]);
+
+    my $got_timeout = 0;
+    Amanda::MainLoop::timeout_source(200)->set_callback(sub {
+       my ($src) = @_;
+       $got_timeout = 1;
+       $src->remove();
+       $xfer->cancel();
+    });
+    $xfer->start(sub {
        my ($src, $msg, $xfer) = @_;
        if ($msg->{type} == $XMSG_ERROR) {
            die $msg->{elt} . " failed: " . $msg->{message};
-       }
-       if ($xfer->get_status() == $Amanda::Xfer::XFER_DONE) {
-           $xfer->get_source()->remove();
+       } elsif ($msg->{'type'} == $XMSG_DONE) {
            Amanda::MainLoop::quit();
        }
-    };
-
-    # set up vtapes
-    my $testconf = Installcheck::Run::setup();
-    $testconf->write();
+    });
+    Amanda::MainLoop::run();
+    ok($got_timeout, "A neverending transfer finishes after being cancelled");
+    # (note that this does not test all of the cancellation possibilities)
+}
 
-    # set up a device for slot 1
-    my $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape(1));
-    die("Could not open VFS device: " . $device->error())
-       unless ($device->status() == $DEVICE_STATUS_SUCCESS);
+{
+    # build a transfer that will write to a read-only fd
+    my $read_filename = "$Installcheck::TMP/xfer-junk-src.tmp";
+    my $rfh;
 
-    # write to it
-    my $hdr = Amanda::Types::dumpfile_t->new();
-    $hdr->{type} = $Amanda::Types::F_DUMPFILE;
-    $hdr->{name} = "installcheck";
-    $hdr->{disk} = "/";
-    $hdr->{datestamp} = "20080102030405";
+    # create the file
+    open($rfh, ">", $read_filename) or die("Could not open '$read_filename' for writing");
 
-    $device->finish();
-    $device->start($ACCESS_WRITE, "TESTCONF01", "20080102030405");
-    $device->start_file($hdr);
+    # open it for reading
+    open($rfh, "<", $read_filename) or die("Could not open '$read_filename' for reading");;
 
-    $xfer = Amanda::Xfer->new([
-       Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
-       Amanda::Xfer::Dest::Device->new($device, $device->block_size() * 10),
+    my $xfer = Amanda::Xfer->new([
+       Amanda::Xfer::Source::Random->new(0, 1),
+       Amanda::Xfer::Dest::Fd->new(fileno($rfh)),
     ]);
 
-    $xfer->get_source()->set_callback($quit_cb);
-    $xfer->start();
-
+    my $got_error = 0;
+    $xfer->start(sub {
+       my ($src, $msg, $xfer) = @_;
+       if ($msg->{type} == $XMSG_ERROR) {
+           $got_error = 1;
+       } elsif ($msg->{'type'} == $XMSG_DONE) {
+           Amanda::MainLoop::quit();
+       }
+    });
     Amanda::MainLoop::run();
-    pass("write to a device (completed succesfully; data may not be correct)");
+    ok($got_error, "A transfer with an error cancels itself after sending an error");
 
-    # finish up the file and device
-    ok(!$device->in_file(), "not in_file");
-    ok($device->finish(), "finish");
+    unlink($read_filename);
+}
 
-    # now turn around and read from it
-    $device->start($ACCESS_READ, undef, undef);
-    $device->seek_file(1);
+# test the Process filter
+{
+    my $RANDOM_SEED = 0xD00D;
 
-    $xfer = Amanda::Xfer->new([
-       Amanda::Xfer::Source::Device->new($device),
+    my $xfer = Amanda::Xfer->new([
+       Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
+       Amanda::Xfer::Filter::Process->new(
+           [ $Amanda::Constants::COMPRESS_PATH, $Amanda::Constants::COMPRESS_BEST_OPT ], 0),
+       Amanda::Xfer::Filter::Process->new(
+           [ $Amanda::Constants::UNCOMPRESS_PATH, $Amanda::Constants::UNCOMPRESS_OPT ], 0),
        Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
     ]);
 
-    $xfer->get_source()->set_callback($quit_cb);
+    $xfer->get_source()->set_callback(sub {
+       my ($src, $msg, $xfer) = @_;
+       if ($msg->{type} == $XMSG_ERROR) {
+           die $msg->{elt} . " failed: " . $msg->{message};
+       } elsif ($msg->{'type'} == $XMSG_DONE) {
+           $src->remove();
+           Amanda::MainLoop::quit();
+       }
+    });
     $xfer->start();
-
     Amanda::MainLoop::run();
-    pass("read from a device succeeded, too, and data was correct");
+    pass("compress | uncompress gets back the original stream");
 }
 
 {
     my $RANDOM_SEED = 0x5EAF00D;
 
-    # build a transfer that will keep going forever
+    # build a transfer that will keep going forever, using a source that
+    # cannot produce an EOF, so Filter::Process is forced to kill the
+    # compress process
+
+    open(my $zerofd, "<", "/dev/zero")
+       or die("could not open /dev/zero: $!");
     my $xfer = Amanda::Xfer->new([
-       Amanda::Xfer::Source::Random->new(0, $RANDOM_SEED),
-       Amanda::Xfer::Filter::Xor->new(14),
-       Amanda::Xfer::Filter::Xor->new(14),
-       Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
+       Amanda::Xfer::Source::Fd->new($zerofd),
+       Amanda::Xfer::Filter::Process->new(
+           [ $Amanda::Constants::COMPRESS_PATH, $Amanda::Constants::COMPRESS_BEST_OPT ], 0),
+       Amanda::Xfer::Dest::Null->new(0),
     ]);
 
     my $got_timeout = 0;
@@ -289,46 +313,868 @@ pass("Two simultaneous transfers run to completion");
        my ($src, $msg, $xfer) = @_;
        if ($msg->{type} == $XMSG_ERROR) {
            die $msg->{elt} . " failed: " . $msg->{message};
-       }
-       if ($xfer->get_status() == $Amanda::Xfer::XFER_DONE) {
+       } elsif ($msg->{'type'} == $XMSG_DONE) {
            $src->remove();
            Amanda::MainLoop::quit();
        }
     });
     $xfer->start();
     Amanda::MainLoop::run();
-    ok($got_timeout, "A neverending transfer finishes after being cancelled");
+    ok($got_timeout, "Amanda::Xfer::Filter::Process can be cancelled");
     # (note that this does not test all of the cancellation possibilities)
 }
 
+# Test Amanda::Xfer::Dest::Buffer
 {
-    # build a transfer that will write to a read-only fd
-    my $read_filename = "$Amanda::Paths::AMANDA_TMPDIR/xfer-junk-src.tmp";
-    my $rfh;
+    my $dest = Amanda::Xfer::Dest::Buffer->new(1025);
+    my $xfer = Amanda::Xfer->new([
+       Amanda::Xfer::Source::Pattern->new(1024, "ABCDEFGH"),
+       $dest,
+    ]);
 
-    # create the file
-    open($rfh, ">", $read_filename) or die("Could not open '$read_filename' for writing");
+    $xfer->get_source()->set_callback(sub {
+       my ($src, $msg, $xfer) = @_;
+       if ($msg->{type} == $XMSG_ERROR) {
+           die $msg->{elt} . " failed: " . $msg->{message};
+       } elsif ($msg->{'type'} == $XMSG_DONE) {
+           $src->remove();
+           Amanda::MainLoop::quit();
+       }
+    });
+    $xfer->start();
+    Amanda::MainLoop::run();
 
-    # open it for reading
-    open($rfh, "<", $read_filename) or die("Could not open '$read_filename' for reading");;
+    is($dest->get(), 'ABCDEFGH' x 128,
+       "buffer captures the right bytes");
+}
 
+# Test that Amanda::Xfer::Dest::Buffer terminates an xfer early
+{
+    my $dest = Amanda::Xfer::Dest::Buffer->new(100);
     my $xfer = Amanda::Xfer->new([
-       Amanda::Xfer::Source::Random->new(0, 1),
-       Amanda::Xfer::Dest::Fd->new(fileno($rfh)),
+       Amanda::Xfer::Source::Pattern->new(1024, "ABCDEFGH"),
+       $dest,
     ]);
 
-    my $got_error = 0;
+    my $got_err = 0;
     $xfer->get_source()->set_callback(sub {
        my ($src, $msg, $xfer) = @_;
        if ($msg->{type} == $XMSG_ERROR) {
-           $got_error = 1;
-       }
-       if ($xfer->get_status() == $Amanda::Xfer::XFER_DONE) {
+           $got_err = 1;
+       } elsif ($msg->{'type'} == $XMSG_DONE) {
            $src->remove();
            Amanda::MainLoop::quit();
        }
     });
     $xfer->start();
     Amanda::MainLoop::run();
-    ok($got_error, "A transfer with an error cancels itself after sending an error");
+
+    ok($got_err, "buffer stops the xfer if it doesn't have space");
+}
+
+SKIP: {
+    skip "not built with server", 17 unless Amanda::Util::built_with_component("server");
+
+    my $disk_cache_dir = "$Installcheck::TMP";
+    my $RANDOM_SEED = 0xFACADE;
+
+    # exercise device source and destination
+    {
+       my $RANDOM_SEED = 0xFACADE;
+       my $xfer;
+
+       my $quit_cb = make_cb(quit_cb => sub {
+           my ($src, $msg, $xfer) = @_;
+           if ($msg->{'type'} == $XMSG_ERROR) {
+               die $msg->{'elt'} . " failed: " . $msg->{'message'};
+           } elsif ($msg->{'type'} == $XMSG_DONE) {
+               Amanda::MainLoop::quit();
+           }
+       });
+
+       # set up vtapes
+       my $testconf = Installcheck::Run::setup();
+       $testconf->write();
+
+       # set up a device for slot 1
+       my $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape(1));
+       die("Could not open VFS device: " . $device->error())
+           unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
+
+       # write to it
+       my $hdr = Amanda::Header->new();
+       $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
+       $hdr->{'name'} = "installcheck";
+       $hdr->{'disk'} = "/";
+       $hdr->{'datestamp'} = "20080102030405";
+       $hdr->{'program'} = "INSTALLCHECK";
+
+       $device->finish();
+       $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
+       $device->start_file($hdr);
+
+       $xfer = Amanda::Xfer->new([
+           Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
+           Amanda::Xfer::Dest::Device->new($device, $device->block_size() * 10),
+       ]);
+
+       $xfer->start($quit_cb);
+
+       Amanda::MainLoop::run();
+       pass("write to a device (completed succesfully; data may not be correct)");
+
+       # finish up the file and device
+       ok(!$device->in_file(), "not in_file");
+       ok($device->finish(), "finish");
+
+       # now turn around and read from it
+       $device->start($Amanda::Device::ACCESS_READ, undef, undef);
+       $device->seek_file(1);
+
+       $xfer = Amanda::Xfer->new([
+           Amanda::Xfer::Source::Device->new($device),
+           Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
+       ]);
+
+       $xfer->start($quit_cb);
+
+       Amanda::MainLoop::run();
+       pass("read from a device succeeded, too, and data was correct");
+    }
+
+    # extra params:
+    #   cancel_after_partnum - after this partnum is completed, cancel the xfer
+    #   do_not_retry - do not retry a failed part - cancel the xfer instead
+    sub test_taper_dest {
+       my ($src, $dest_sub, $expected_messages, $msg_prefix, %params) = @_;
+       my $xfer;
+       my $device;
+       my $vtape_num = 1;
+       my @messages;
+
+       # set up vtapes
+       my $testconf = Installcheck::Run::setup();
+       $testconf->write();
+
+       my $hdr = Amanda::Header->new();
+       $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
+       $hdr->{'name'} = "installcheck";
+       $hdr->{'disk'} = "/";
+       $hdr->{'datestamp'} = "20080102030405";
+       $hdr->{'program'} = "INSTALLCHECK";
+
+       # set up a device for the taper dest
+       $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
+       die("Could not open VFS device: " . $device->error())
+           unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
+       $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
+       $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
+       my $dest = $dest_sub->($device);
+
+       # and create the xfer
+       $xfer = Amanda::Xfer->new([ $src, $dest ]);
+
+       my $start_new_part = sub {
+           my ($successful, $eof, $partnum) = @_;
+
+           if (exists $params{'cancel_after_partnum'}
+                   and $params{'cancel_after_partnum'} == $partnum) {
+               push @messages, "CANCEL";
+               $xfer->cancel();
+               return;
+           }
+
+           if (!$device || !$successful) {
+               # set up a device and start writing a part to it
+               $device->finish() if $device;
+               $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
+               die("Could not open VFS device: " . $device->error())
+                   unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
+               $dest->use_device($device);
+               $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
+               $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
+           }
+
+           # bail out if we shouldn't retry this part
+           if (!$successful and $params{'do_not_retry'}) {
+               push @messages, "NOT-RETRYING";
+               $xfer->cancel();
+               return;
+           }
+
+           if (!$eof) {
+               if ($successful) {
+                   $dest->start_part(0, $hdr);
+               } else {
+                   $dest->start_part(1, $hdr);
+               }
+           }
+       };
+
+       $xfer->start(sub {
+           my ($src, $msg, $xfer) = @_;
+
+           if ($msg->{'type'} == $XMSG_ERROR) {
+               die $msg->{'elt'} . " failed: " . $msg->{'message'};
+           } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
+               push @messages, "PART-" . $msg->{'partnum'} . '-' . ($msg->{'successful'}? "OK" : "FAILED");
+               $start_new_part->($msg->{'successful'}, $msg->{'eof'}, $msg->{'partnum'});
+           } elsif ($msg->{'type'} == $XMSG_DONE) {
+               push @messages, "DONE";
+               Amanda::MainLoop::quit();
+           } elsif ($msg->{'type'} == $XMSG_CANCEL) {
+               push @messages, "CANCELLED";
+           } else {
+               push @messages, "$msg";
+           }
+       });
+
+       Amanda::MainLoop::call_later(sub { $start_new_part->(1, 0, -1); });
+       Amanda::MainLoop::run();
+
+       is_deeply([@messages],
+           $expected_messages,
+           "$msg_prefix: element produces the correct series of messages")
+       or diag(Dumper([@messages]));
+    }
+
+    sub run_recovery_source {
+       my ($dest, $files, $expected_messages, $finished_cb) = @_;
+       my $device;
+       my @filenums;
+       my @messages;
+       my $xfer;
+       my $dev;
+       my $src;
+
+       my $steps = define_steps
+           cb_ref => \$finished_cb;
+
+       step setup => sub {
+           # we need a device up front, so sneak a peek into @$files
+           $dev = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($files->[0]));
+           $src = Amanda::Xfer::Source::Recovery->new($dev);
+           $xfer = Amanda::Xfer->new([ $src, $dest ]);
+
+           $xfer->start($steps->{'got_xmsg'});
+           # got_xmsg will call got_ready when the element is ready
+       };
+
+       step got_ready => sub {
+           $steps->{'load_slot'}->();
+       };
+
+       step load_slot => sub {
+           if (!@$files) {
+               return $src->start_part(undef);
+               # (will trigger an XMSG_DONE; see below)
+           }
+
+           my $slot = shift @$files;
+           @filenums = @{ shift @$files };
+
+           $dev = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($slot));
+           if ($dev->status != $Amanda::Device::DEVICE_STATUS_SUCCESS) {
+               die $dev->error_or_status();
+           }
+
+           $src->use_device($dev);
+
+           if (!$dev->start($Amanda::Device::ACCESS_READ, undef, undef)) {
+               die $dev->error_or_status();
+           }
+
+           $steps->{'seek_file'}->();
+       };
+
+       step seek_file => sub {
+           if (!@filenums) {
+               return $steps->{'load_slot'}->();
+           }
+
+           my $hdr = $dev->seek_file(shift @filenums);
+           if (!$hdr) {
+               die $dev->error_or_status();
+           }
+
+           push @messages, "PART";
+
+           $src->start_part($dev);
+       };
+
+       step got_xmsg => sub {
+           my ($src, $msg, $xfer) = @_;
+
+           if ($msg->{'type'} == $XMSG_ERROR) {
+               die $msg->{'elt'} . " failed: " . $msg->{'message'};
+           } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
+               push @messages, "KB-" . ($msg->{'size'}/1024);
+               $steps->{'seek_file'}->();
+           } elsif ($msg->{'type'} == $XMSG_DONE) {
+               push @messages, "DONE";
+               $steps->{'quit'}->();
+           } elsif ($msg->{'type'} == $XMSG_READY) {
+               push @messages, "READY";
+               $steps->{'got_ready'}->();
+           } elsif ($msg->{'type'} == $XMSG_CANCEL) {
+               push @messages, "CANCELLED";
+           }
+       };
+
+       step quit => sub {
+           is_deeply([@messages],
+               $expected_messages,
+               "files read back and verified successfully with Amanda::Xfer::Recovery::Source")
+           or diag(Dumper([@messages]));
+
+           $finished_cb->();
+       };
+    }
+
+    sub test_recovery_source {
+       run_recovery_source(@_, \&Amanda::MainLoop::quit);
+       Amanda::MainLoop::run();
+    }
+
+    my $holding_base = "$Installcheck::TMP/source-holding";
+    my $holding_file;
+    # create a sequence of holding chunks, each 2MB.
+    sub make_holding_files {
+       my ($nchunks) = @_;
+       my $block = 'a' x 32768;
+
+       rmtree($holding_base);
+       mkpath($holding_base);
+       for (my $i = 0; $i < $nchunks; $i++) {
+           my $filename = "$holding_base/file$i";
+           open(my $fh, ">", "$filename");
+
+           my $hdr = Amanda::Header->new();
+           $hdr->{'type'} = ($i == 0)?
+               $Amanda::Header::F_DUMPFILE : $Amanda::Header::F_CONT_DUMPFILE;
+           $hdr->{'datestamp'} = "20070102030405";
+           $hdr->{'dumplevel'} = 0;
+           $hdr->{'compressed'} = 1;
+           $hdr->{'name'} = "localhost";
+           $hdr->{'disk'} = "/home";
+           $hdr->{'program'} = "INSTALLCHECK";
+           if ($i != $nchunks-1) {
+               $hdr->{'cont_filename'} = "$holding_base/file" . ($i+1);
+           }
+
+           print $fh $hdr->to_string(32768,32768);
+
+           for (my $b = 0; $b < 64; $b++) {
+               print $fh $block;
+           }
+           close($fh);
+       }
+
+       return "$holding_base/file0";
+    }
+
+    # run this test in each of a few different cache permutations
+    test_taper_dest(
+       Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
+       sub {
+           my ($first_dev) = @_;
+           Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
+                                                    1024*1024, 1, undef),
+       },
+       [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED",
+         "PART-3-OK", "PART-4-OK", "PART-5-OK",
+         "DONE" ],
+       "mem cache");
+    test_recovery_source(
+       Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
+       [ 1 => [ 1, 2 ], 2 => [ 1, 2, 3 ], ],
+       [
+         'READY',
+         'PART',
+         'KB-1024',
+         'PART',
+         'KB-1024',
+         'PART',
+         'KB-1024',
+         'PART',
+         'KB-1024',
+         'PART',
+         'KB-102',
+         'DONE'
+       ]);
+
+    test_taper_dest(
+       Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
+       sub {
+           my ($first_dev) = @_;
+           Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
+                                             1024*1024, 0, $disk_cache_dir),
+       },
+       [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED",
+         "PART-3-OK", "PART-4-OK", "PART-5-OK",
+         "DONE" ],
+       "disk cache");
+    test_recovery_source(
+       Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
+       [ 1 => [ 1, 2 ], 2 => [ 1, 2, 3 ], ],
+       [
+         'READY',
+         'PART',
+         'KB-1024',
+         'PART',
+         'KB-1024',
+         'PART',
+         'KB-1024',
+         'PART',
+         'KB-1024',
+         'PART',
+         'KB-102',
+         'DONE'
+       ]);
+
+    test_taper_dest(
+       Amanda::Xfer::Source::Random->new(1024*1024*2, $RANDOM_SEED),
+       sub {
+           my ($first_dev) = @_;
+           Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
+                                                   1024*1024, 0, undef),
+       },
+       [ "PART-1-OK", "PART-2-OK", "PART-3-OK",
+         "DONE" ],
+       "no cache (no failed parts; exact multiple of part size)");
+    test_recovery_source(
+       Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
+       [ 1 => [ 1, 2, 3 ], ],
+       [
+         'READY',
+         'PART',
+         'KB-1024',
+         'PART',
+         'KB-1024',
+         'PART',
+         'KB-0',
+         'DONE'
+       ]);
+
+    test_taper_dest(
+       Amanda::Xfer::Source::Random->new(1024*1024*2, $RANDOM_SEED),
+       sub {
+           my ($first_dev) = @_;
+           Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024, 0, 0, undef),
+       },
+       [ "PART-1-OK", "DONE" ],
+       "no splitting (fits on volume)");
+    test_recovery_source(
+       Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
+       [ 1 => [ 1 ], ],
+       [
+         'READY',
+         'PART',
+         'KB-2048',
+         'DONE'
+       ]);
+
+    test_taper_dest(
+       Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
+       sub {
+           my ($first_dev) = @_;
+           Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024, 0, 0, undef),
+       },
+       [ "PART-1-FAILED", "NOT-RETRYING", "CANCELLED", "DONE" ],
+       "no splitting (doesn't fit on volume -> fails)",
+       do_not_retry => 1);
+
+    test_taper_dest(
+       Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
+       sub {
+           my ($first_dev) = @_;
+           Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
+                                           1024*1024, 0, $disk_cache_dir),
+       },
+       [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED",
+         "PART-3-OK", "PART-4-OK", "CANCEL",
+         "CANCELLED", "DONE" ],
+       "cancellation after success",
+       cancel_after_partnum => 4);
+
+    # set up a few holding chunks and read from those
+    $holding_file = make_holding_files(3);
+    test_taper_dest(
+       Amanda::Xfer::Source::Holding->new($holding_file),
+       sub {
+           my ($first_dev) = @_;
+           Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
+                                           1024*1024, 0, undef),
+       },
+       [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED",
+         "PART-3-OK", "PART-4-OK", "PART-5-FAILED",
+         "PART-5-OK", "PART-6-OK", "PART-7-OK",
+         "DONE" ],
+       "Amanda::Xfer::Source::Holding acts as a source and supplies cache_inform");
+
+    ##
+    # test the cache_inform method
+
+    sub test_taper_dest_cache_inform {
+       my %params = @_;
+       my $xfer;
+       my $device;
+       my $fh;
+       my $part_size = 1024*1024;
+       my $file_size = $part_size * 4 + 100 * 1024;
+       my $cache_file = "$Installcheck::TMP/cache_file";
+       my $vtape_num = 1;
+
+       # set up our "cache", cleverly using an Amanda::Xfer::Dest::Fd
+       open($fh, ">", "$cache_file") or die("Could not open '$cache_file' for writing");
+       $xfer = Amanda::Xfer->new([
+           Amanda::Xfer::Source::Random->new($file_size, $RANDOM_SEED),
+           Amanda::Xfer::Dest::Fd->new(fileno($fh)),
+       ]);
+
+       $xfer->start(sub {
+           my ($src, $msg, $xfer) = @_;
+           if ($msg->{'type'} == $XMSG_ERROR) {
+               die $msg->{'elt'} . " failed: " . $msg->{'message'};
+           } elsif ($msg->{'type'} == $XMSG_DONE) {
+               Amanda::MainLoop::quit();
+           }
+       });
+       Amanda::MainLoop::run();
+       close($fh);
+
+       # create a list of holding chuunks, some slab-aligned, some part-aligned,
+       # some not
+       my @holding_chunks;
+       if (!$params{'omit_chunks'}) {
+           my $offset = 0;
+           my $do_chunk = sub {
+               my ($break) = @_;
+               die unless $break > $offset;
+               push @holding_chunks, [ $cache_file, $offset, $break - $offset ];
+               $offset = $break;
+           };
+           $do_chunk->(277);
+           $do_chunk->($part_size);
+           $do_chunk->($part_size+128*1024);
+           $do_chunk->($part_size*3);
+           $do_chunk->($part_size*3+1024);
+           $do_chunk->($part_size*3+1024*2);
+           $do_chunk->($part_size*3+1024*3);
+           $do_chunk->($part_size*4);
+           $do_chunk->($part_size*4 + 77);
+           $do_chunk->($file_size - 1);
+           $do_chunk->($file_size);
+       }
+
+       # set up vtapes
+       my $testconf = Installcheck::Run::setup();
+       $testconf->write();
+
+       my $hdr = Amanda::Header->new();
+       $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
+       $hdr->{'name'} = "installcheck";
+       $hdr->{'disk'} = "/";
+       $hdr->{'datestamp'} = "20080102030405";
+       $hdr->{'program'} = "INSTALLCHECK";
+
+       # set up the cache file
+       open($fh, "<", "$cache_file") or die("Could not open '$cache_file' for reading");
+
+       # set up a device for writing
+       $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
+       die("Could not open VFS device: " . $device->error())
+           unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
+       $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
+       $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
+
+       my $dest = Amanda::Xfer::Dest::Taper::Splitter->new($device, 128*1024,
+                                                   1024*1024, 0, undef);
+       $xfer = Amanda::Xfer->new([
+           Amanda::Xfer::Source::Fd->new(fileno($fh)),
+           $dest,
+       ]);
+
+       my $start_new_part = sub {
+           my ($successful, $eof, $last_partnum) = @_;
+
+           if (!$device || !$successful) {
+               # set up a device and start writing a part to it
+               $device->finish() if $device;
+               $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
+               die("Could not open VFS device: " . $device->error())
+                   unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
+               $dest->use_device($device);
+               $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
+               $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
+           }
+
+           # feed enough chunks to cache_inform
+           my $upto = ($last_partnum+2) * $part_size;
+           while (@holding_chunks and $holding_chunks[0]->[1] < $upto) {
+               my ($filename, $offset, $length) = @{shift @holding_chunks};
+               $dest->cache_inform($filename, $offset, $length);
+           }
+
+           if (!$eof) {
+               if ($successful) {
+                   $dest->start_part(0, $hdr);
+               } else {
+                   $dest->start_part(1, $hdr);
+               }
+           }
+       };
+
+       my @messages;
+       $xfer->start(sub {
+           my ($src, $msg, $xfer) = @_;
+
+           if ($msg->{'type'} == $XMSG_ERROR) {
+               push @messages, "ERROR: $msg->{message}";
+           } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
+               push @messages, "PART-" . ($msg->{'successful'}? "OK" : "FAILED");
+               $start_new_part->($msg->{'successful'}, $msg->{'eof'}, $msg->{'partnum'});
+           } elsif ($msg->{'type'} == $XMSG_DONE) {
+               push @messages, "DONE";
+               Amanda::MainLoop::quit();
+           } elsif ($msg->{'type'} == $XMSG_CANCEL) {
+               push @messages, "CANCELLED";
+           } else {
+               push @messages, $msg->{'type'};
+           }
+       });
+
+       Amanda::MainLoop::call_later(sub { $start_new_part->(1, 0, -1); });
+       Amanda::MainLoop::run();
+
+       unlink($cache_file);
+
+       return @messages;
+    }
+
+    is_deeply([ test_taper_dest_cache_inform() ],
+       [ "PART-OK", "PART-OK", "PART-FAILED",
+         "PART-OK", "PART-OK", "PART-OK",
+         "DONE" ],
+       "cache_inform: element produces the correct series of messages");
+
+    is_deeply([ test_taper_dest_cache_inform(omit_chunks => 1) ],
+       [ "PART-OK", "PART-OK", "PART-FAILED",
+         "ERROR: Failed part was not cached; cannot retry", "CANCELLED",
+         "DONE" ],
+       "cache_inform: element produces the correct series of messages when a chunk is missing");
+
+    rmtree($holding_base);
+}
+
+# test Amanda::Xfer::Dest::Taper::DirectTCP; do it twice, once with a cancellation
+SKIP: {
+    skip "not built with ndmp and server", 3 unless
+       Amanda::Util::built_with_component("ndmp") and Amanda::Util::built_with_component("server");
+
+    my $RANDOM_SEED = 0xFACADE;
+
+    # make XDT output fairly verbose
+    $Amanda::Config::debug_taper = 2;
+
+    my $ndmp = Installcheck::Mock::NdmpServer->new();
+    my $ndmp_port = $ndmp->{'port'};
+    my $drive = $ndmp->{'drive'};
+
+    my $mkdevice = sub {
+       my $dev = Amanda::Device->new("ndmp:127.0.0.1:$ndmp_port\@$drive");
+       die "can't create device" unless $dev->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS;
+       $dev->property_set("verbose", 1) or die "can't set VERBOSE";
+       $dev->property_set("ndmp_username", "ndmp") or die "can't set username";
+       $dev->property_set("ndmp_password", "ndmp") or die "can't set password";
+
+       return $dev;
+    };
+
+    my $hdr = Amanda::Header->new();
+    $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
+    $hdr->{'name'} = "installcheck";
+    $hdr->{'disk'} = "/";
+    $hdr->{'datestamp'} = "20080102030405";
+    $hdr->{'program'} = "INSTALLCHECK";
+
+    for my $do_cancel (0, 'later', 'in_setup') {
+       my $dev;
+       my $xfer;
+       my @messages;
+
+       # make a starting device
+       $dev = $mkdevice->();
+
+       # and create the xfer
+       my $src = Amanda::Xfer::Source::Random->new(32768*34-7, $RANDOM_SEED);
+       my $dest = Amanda::Xfer::Dest::Taper::DirectTCP->new($dev, 32768*16);
+       $xfer = Amanda::Xfer->new([ $src, $dest ]);
+
+       my $start_new_part; # forward declaration
+       my $xmsg_cb = sub {
+           my ($src, $msg, $xfer) = @_;
+
+           if ($msg->{'type'} == $XMSG_ERROR) {
+               # if this is an expected error, don't die
+               if ($do_cancel eq 'in_setup' and $msg->{'message'} =~ /operation not supported/) {
+                   push @messages, "ERROR";
+               } else {
+                   die $msg->{'elt'} . " failed: " . $msg->{'message'};
+               }
+           } elsif ($msg->{'type'} == $XMSG_READY) {
+               push @messages, "READY";
+
+               # get ourselves a new (albeit identical) device, just to prove that the connections
+               # are a little bit portable
+               $dev->finish();
+               $dev = $mkdevice->();
+               $dest->use_device($dev);
+               $dev->start($Amanda::Device::ACCESS_WRITE, "TESTCONF02", "20080102030406");
+
+               $start_new_part->(1, 0); # start first part
+           } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
+               push @messages, "PART-" . $msg->{'partnum'} . '-' . ($msg->{'successful'}? "OK" : "FAILED");
+               if ($do_cancel and $msg->{'partnum'} == 2) {
+                   $xfer->cancel();
+               } else {
+                   $start_new_part->($msg->{'successful'}, $msg->{'eof'});
+               }
+           } elsif ($msg->{'type'} == $XMSG_DONE) {
+               push @messages, "DONE";
+               Amanda::MainLoop::quit();
+           } elsif ($msg->{'type'} == $XMSG_CANCEL) {
+               push @messages, "CANCELLED";
+           } else {
+               push @messages, "$msg";
+           }
+       };
+
+       # trigger an error in the xfer dest's setup method by putting the device
+       # in an error state.  NDMP devices do not support append, so starting in
+       # append mode should trigger the failure.
+       if ($do_cancel eq 'in_setup') {
+           if ($dev->start($Amanda::Device::ACCESS_APPEND, "MYLABEL", undef)) {
+               die "successfully started NDMP device in ACCESS_APPEND?!";
+           }
+       }
+
+       $xfer->start($xmsg_cb);
+
+       $start_new_part = sub {
+           my ($successful, $eof) = @_;
+
+           die "this dest shouldn't have unsuccessful parts" unless $successful;
+
+           if (!$eof) {
+               $dest->start_part(0, $hdr);
+           }
+       };
+
+       Amanda::MainLoop::run();
+
+       $dev->finish();
+
+       if (!$do_cancel) {
+           is_deeply([@messages],
+               [ 'READY', 'PART-1-OK', 'PART-2-OK', 'PART-3-OK', 'DONE' ],
+               "Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages")
+           or diag(Dumper([@messages]));
+       } elsif ($do_cancel eq 'in_setup') {
+           is_deeply([@messages],
+               [ 'ERROR', 'CANCELLED', 'DONE' ],
+               "Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages when cancelled during setup")
+           or diag(Dumper([@messages]));
+       } else {
+           is_deeply([@messages],
+               [ 'READY', 'PART-1-OK', 'PART-2-OK', 'CANCELLED', 'DONE' ],
+               "Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages when cancelled in mid-xfer")
+           or diag(Dumper([@messages]));
+       }
+    }
+
+    # Amanda::Xfer::Source::Recovery's directtcp functionality is not
+    # tested here, as to do so would basically require re-implementing
+    # Amanda::Recovery::Clerk; the xfer source is adequately tested by
+    # the Amanda::Recovery::Clerk tests.
+
+    $ndmp->cleanup();
+}
+
+# directtcp stuff
+
+{
+    my $RANDOM_SEED = 0x13131313; # 13 is bad luck, right?
+
+    # we want this to look like:
+    # A: [ Random -> DirectTCPConnect ]
+    #        --dtcp-->
+    # B: [ DirectTCPListen -> filter -> DirectTCPListen ]
+    #        --dtcp-->
+    # C: [ DirectTCPConnect -> filter -> Null ]
+    #
+    # this tests both XFER_MECH_DIRECTTCP_CONNECT and
+    # XFER_MECH_DIRECTTCP_LISTEN, as well as some of the glue
+    # used to attach those to filters.
+    #
+    # that means we need to start transfer B, since it has all of the
+    # addresses, before creating A or C.
+
+    my $done = { };
+    my $handle_msg = sub {
+       my ($letter, $src, $msg, $xfer) = @_;
+       if ($msg->{type} == $XMSG_ERROR) {
+           die $msg->{elt} . " failed: " . $msg->{message};
+       } elsif ($msg->{'type'} == $XMSG_DONE) {
+           $done->{$letter} = 1;
+       }
+       if ($done->{'A'} and $done->{'B'} and $done->{'C'}) {
+           Amanda::MainLoop::quit();
+       }
+    };
+
+    my %cbs;
+    for my $letter ('A', 'B', 'C') {
+       $cbs{$letter} = sub { $handle_msg->($letter, @_); };
+    }
+
+    my $src_listen = Amanda::Xfer::Source::DirectTCPListen->new();
+    my $dst_listen = Amanda::Xfer::Dest::DirectTCPListen->new();
+    my $xferB = Amanda::Xfer->new([
+       $src_listen,
+       Amanda::Xfer::Filter::Xor->new(0x13),
+       $dst_listen
+    ]);
+
+    $xferB->start($cbs{'B'});
+
+    my $xferA = Amanda::Xfer->new([
+       Amanda::Xfer::Source::Random->new(1024*1024*3, $RANDOM_SEED),
+       Amanda::Xfer::Dest::DirectTCPConnect->new($src_listen->get_addrs())
+    ]);
+
+    $xferA->start($cbs{'A'});
+
+    my $xferC = Amanda::Xfer->new([
+       Amanda::Xfer::Source::DirectTCPConnect->new($dst_listen->get_addrs()),
+       Amanda::Xfer::Filter::Xor->new(0x13),
+       Amanda::Xfer::Dest::Null->new($RANDOM_SEED)
+    ]);
+
+    $xferC->start($cbs{'C'});
+
+    # let the already-started transfers go out of scope before they 
+    # complete, as a memory management test..
+    Amanda::MainLoop::run();
+    pass("Three xfers interlinked via DirectTCP complete successfully");
+}
+
+# test element comparison
+{
+    my $a = Amanda::Xfer::Filter::Xor->new(0);
+    my $b = Amanda::Xfer::Filter::Xor->new(1);
+    ok($a == $a, "elements compare equal to themselves");
+    ok(!($a == $b), ".. and not to other elements");
+    ok(!($a != $a), "elements do not compare != to themselves");
+    ok($a != $b, ".. but are != to other elements");
 }