Imported Upstream version 3.1.0
[debian/amanda] / installcheck / Amanda_Xfer.pl
1 # Copyright (c) 2008, 2009, 2010 Zmanda, Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License version 2 as published
5 # by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
10 # for more details.
11 #
12 # You should have received a copy of the GNU General Public License along
13 # with this program; if not, write to the Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
15 #
16 # Contact information: Zmanda Inc, 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
18
19 use Test::More tests => 37;
20 use File::Path;
21 use Data::Dumper;
22 use strict;
23
24 use lib "@amperldir@";
25 use Installcheck;
26 use Installcheck::Run;
27 use Installcheck::Mock;
28 use Amanda::Xfer qw( :constants );
29 use Amanda::Header;
30 use Amanda::Debug;
31 use Amanda::MainLoop;
32 use Amanda::Paths;
33 use Amanda::Config;
34 use Amanda::Constants;
35
36 # get Amanda::Device only when we're building for server
37 BEGIN {
38     use Amanda::Util;
39     if (Amanda::Util::built_with_component("server")) {
40         eval "use Amanda::Device;";
41         die $@ if $@;
42     }
43 }
44
45 # set up debugging so debug output doesn't interfere with test results
46 Amanda::Debug::dbopen("installcheck");
47 Installcheck::log_test_output();
48
49 # and disable Debug's die() and warn() overrides
50 Amanda::Debug::disable_die_override();
51
52 {
53     my $RANDOM_SEED = 0xD00D;
54
55     my $xfer = Amanda::Xfer->new([
56         Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
57         Amanda::Xfer::Filter::Xor->new(0), # key of 0 -> no change, so random seeds match
58         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
59     ]);
60
61     pass("Creating a transfer doesn't crash"); # hey, it's a start..
62
63     my $got_msg = "(not received)";
64     $xfer->start(sub {
65         my ($src, $msg, $xfer) = @_;
66         if ($msg->{type} == $XMSG_ERROR) {
67             die $msg->{elt} . " failed: " . $msg->{message};
68         }
69         if ($msg->{type} == $XMSG_INFO) {
70             $got_msg = $msg->{message};
71         } elsif ($msg->{'type'} == $XMSG_DONE) {
72             Amanda::MainLoop::quit();
73         }
74     });
75     Amanda::MainLoop::run();
76     pass("A simple transfer runs to completion");
77     is($got_msg, "Is this thing on?",
78         "XMSG_INFO from Amanda::Xfer::Dest::Null has correct message");
79 }
80
81 {
82     my $RANDOM_SEED = 0xDEADBEEF;
83
84     my $xfer1 = Amanda::Xfer->new([
85         Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
86         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
87     ]);
88     my $xfer2 = Amanda::Xfer->new([
89         Amanda::Xfer::Source::Random->new(1024*1024*3, $RANDOM_SEED),
90         Amanda::Xfer::Filter::Xor->new(0xf0),
91         Amanda::Xfer::Filter::Xor->new(0xf0),
92         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
93     ]);
94
95     my $cb = sub {
96         my ($src, $msg, $xfer) = @_;
97         if ($msg->{type} == $XMSG_ERROR) {
98             die $msg->{elt} . " failed: " . $msg->{message};
99         } elsif ($msg->{'type'} == $XMSG_DONE) {
100             if  ($xfer1->get_status() == $Amanda::Xfer::XFER_DONE
101              and $xfer2->get_status() == $Amanda::Xfer::XFER_DONE) {
102                 Amanda::MainLoop::quit();
103             }
104         }
105     };
106
107     $xfer1->start($cb);
108     $xfer2->start($cb);
109 }
110 # let the already-started transfers go out of scope before they 
111 # complete, as a memory management test..
112 Amanda::MainLoop::run();
113 pass("Two simultaneous transfers run to completion");
114
115 {
116     my $RANDOM_SEED = 0xD0DEEDAA;
117     my @elts;
118
119     # note that, because the Xor filter is flexible, assembling
120     # long pipelines can take an exponentially long time.  A 10-elt
121     # pipeline exercises the linking algorithm without wasting
122     # too many CPU cycles
123
124     push @elts, Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED);
125     for my $i (1 .. 4) {
126         push @elts, Amanda::Xfer::Filter::Xor->new($i);
127         push @elts, Amanda::Xfer::Filter::Xor->new($i);
128     }
129     push @elts, Amanda::Xfer::Dest::Null->new($RANDOM_SEED);
130     my $xfer = Amanda::Xfer->new(\@elts);
131
132     my $cb = sub {
133         my ($src, $msg, $xfer) = @_;
134         if ($msg->{type} == $XMSG_ERROR) {
135             die $msg->{elt} . " failed: " . $msg->{message};
136         } elsif ($msg->{'type'} == $XMSG_DONE) {
137             Amanda::MainLoop::quit();
138         }
139     };
140
141     $xfer->start($cb);
142
143     Amanda::MainLoop::run();
144     pass("One 10-element transfer runs to completion");
145 }
146
147
148 {
149     my $read_filename = "$Installcheck::TMP/xfer-junk-src.tmp";
150     my $write_filename = "$Installcheck::TMP/xfer-junk-dest.tmp";
151     my ($rfh, $wfh);
152
153     mkdir($Installcheck::TMP) unless (-e $Installcheck::TMP);
154
155     # fill the file with some stuff
156     open($wfh, ">", $read_filename) or die("Could not open '$read_filename' for writing");
157     for my $i (1 .. 100) { print $wfh "line $i\n"; }
158     close($wfh);
159
160     open($rfh, "<", $read_filename) or die("Could not open '$read_filename' for reading");
161     open($wfh, ">", "$write_filename") or die("Could not open '$write_filename' for writing");
162
163     # now run a transfer out of it
164     my $xfer = Amanda::Xfer->new([
165         Amanda::Xfer::Source::Fd->new(fileno($rfh)),
166         Amanda::Xfer::Filter::Xor->new(0xde),
167         Amanda::Xfer::Filter::Xor->new(0xde),
168         Amanda::Xfer::Dest::Fd->new(fileno($wfh)),
169     ]);
170
171     my $cb = sub {
172         my ($src, $msg, $xfer) = @_;
173         if ($msg->{type} == $XMSG_ERROR) {
174             die $msg->{elt} . " failed: " . $msg->{message};
175         } elsif ($msg->{'type'} == $XMSG_DONE) {
176             Amanda::MainLoop::quit();
177         }
178     };
179
180     $xfer->start($cb);
181
182     Amanda::MainLoop::run();
183
184     close($wfh);
185     close($rfh);
186
187     # now verify the file contents are identical
188     open($rfh, "<", $read_filename);
189     my $src = do { local $/; <$rfh> };
190
191     open($rfh, "<", $write_filename);
192     my $dest = do { local $/; <$rfh> };
193
194     is($src, $dest, "Source::Fd and Dest::Fd read and write files");
195
196     unlink($read_filename);
197     unlink($write_filename);
198 }
199
200 {
201     my $RANDOM_SEED = 0x5EAF00D;
202
203     # build a transfer that will keep going forever
204     my $xfer = Amanda::Xfer->new([
205         Amanda::Xfer::Source::Random->new(0, $RANDOM_SEED),
206         Amanda::Xfer::Filter::Xor->new(14),
207         Amanda::Xfer::Filter::Xor->new(14),
208         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
209     ]);
210
211     my $got_timeout = 0;
212     Amanda::MainLoop::timeout_source(200)->set_callback(sub {
213         my ($src) = @_;
214         $got_timeout = 1;
215         $src->remove();
216         $xfer->cancel();
217     });
218     $xfer->start(sub {
219         my ($src, $msg, $xfer) = @_;
220         if ($msg->{type} == $XMSG_ERROR) {
221             die $msg->{elt} . " failed: " . $msg->{message};
222         } elsif ($msg->{'type'} == $XMSG_DONE) {
223             Amanda::MainLoop::quit();
224         }
225     });
226     Amanda::MainLoop::run();
227     ok($got_timeout, "A neverending transfer finishes after being cancelled");
228     # (note that this does not test all of the cancellation possibilities)
229 }
230
231 {
232     # build a transfer that will write to a read-only fd
233     my $read_filename = "$Installcheck::TMP/xfer-junk-src.tmp";
234     my $rfh;
235
236     # create the file
237     open($rfh, ">", $read_filename) or die("Could not open '$read_filename' for writing");
238
239     # open it for reading
240     open($rfh, "<", $read_filename) or die("Could not open '$read_filename' for reading");;
241
242     my $xfer = Amanda::Xfer->new([
243         Amanda::Xfer::Source::Random->new(0, 1),
244         Amanda::Xfer::Dest::Fd->new(fileno($rfh)),
245     ]);
246
247     my $got_error = 0;
248     $xfer->start(sub {
249         my ($src, $msg, $xfer) = @_;
250         if ($msg->{type} == $XMSG_ERROR) {
251             $got_error = 1;
252         } elsif ($msg->{'type'} == $XMSG_DONE) {
253             Amanda::MainLoop::quit();
254         }
255     });
256     Amanda::MainLoop::run();
257     ok($got_error, "A transfer with an error cancels itself after sending an error");
258
259     unlink($read_filename);
260 }
261
262 # test the Process filter
263 {
264     my $RANDOM_SEED = 0xD00D;
265
266     my $xfer = Amanda::Xfer->new([
267         Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
268         Amanda::Xfer::Filter::Process->new(
269             [ $Amanda::Constants::COMPRESS_PATH, $Amanda::Constants::COMPRESS_BEST_OPT ], 0),
270         Amanda::Xfer::Filter::Process->new(
271             [ $Amanda::Constants::UNCOMPRESS_PATH, $Amanda::Constants::UNCOMPRESS_OPT ], 0),
272         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
273     ]);
274
275     $xfer->get_source()->set_callback(sub {
276         my ($src, $msg, $xfer) = @_;
277         if ($msg->{type} == $XMSG_ERROR) {
278             die $msg->{elt} . " failed: " . $msg->{message};
279         } elsif ($msg->{'type'} == $XMSG_DONE) {
280             $src->remove();
281             Amanda::MainLoop::quit();
282         }
283     });
284     $xfer->start();
285     Amanda::MainLoop::run();
286     pass("compress | uncompress gets back the original stream");
287 }
288
289 {
290     my $RANDOM_SEED = 0x5EAF00D;
291
292     # build a transfer that will keep going forever, using a source that
293     # cannot produce an EOF, so Filter::Process is forced to kill the
294     # compress process
295
296     open(my $zerofd, "<", "/dev/zero")
297         or die("could not open /dev/zero: $!");
298     my $xfer = Amanda::Xfer->new([
299         Amanda::Xfer::Source::Fd->new($zerofd),
300         Amanda::Xfer::Filter::Process->new(
301             [ $Amanda::Constants::COMPRESS_PATH, $Amanda::Constants::COMPRESS_BEST_OPT ], 0),
302         Amanda::Xfer::Dest::Null->new(0),
303     ]);
304
305     my $got_timeout = 0;
306     Amanda::MainLoop::timeout_source(200)->set_callback(sub {
307         my ($src) = @_;
308         $got_timeout = 1;
309         $src->remove();
310         $xfer->cancel();
311     });
312     $xfer->get_source()->set_callback(sub {
313         my ($src, $msg, $xfer) = @_;
314         if ($msg->{type} == $XMSG_ERROR) {
315             die $msg->{elt} . " failed: " . $msg->{message};
316         } elsif ($msg->{'type'} == $XMSG_DONE) {
317             $src->remove();
318             Amanda::MainLoop::quit();
319         }
320     });
321     $xfer->start();
322     Amanda::MainLoop::run();
323     ok($got_timeout, "Amanda::Xfer::Filter::Process can be cancelled");
324     # (note that this does not test all of the cancellation possibilities)
325 }
326
327 # Test Amanda::Xfer::Dest::Buffer
328 {
329     my $dest = Amanda::Xfer::Dest::Buffer->new(1025);
330     my $xfer = Amanda::Xfer->new([
331         Amanda::Xfer::Source::Pattern->new(1024, "ABCDEFGH"),
332         $dest,
333     ]);
334
335     $xfer->get_source()->set_callback(sub {
336         my ($src, $msg, $xfer) = @_;
337         if ($msg->{type} == $XMSG_ERROR) {
338             die $msg->{elt} . " failed: " . $msg->{message};
339         } elsif ($msg->{'type'} == $XMSG_DONE) {
340             $src->remove();
341             Amanda::MainLoop::quit();
342         }
343     });
344     $xfer->start();
345     Amanda::MainLoop::run();
346
347     is($dest->get(), 'ABCDEFGH' x 128,
348         "buffer captures the right bytes");
349 }
350
351 # Test that Amanda::Xfer::Dest::Buffer terminates an xfer early
352 {
353     my $dest = Amanda::Xfer::Dest::Buffer->new(100);
354     my $xfer = Amanda::Xfer->new([
355         Amanda::Xfer::Source::Pattern->new(1024, "ABCDEFGH"),
356         $dest,
357     ]);
358
359     my $got_err = 0;
360     $xfer->get_source()->set_callback(sub {
361         my ($src, $msg, $xfer) = @_;
362         if ($msg->{type} == $XMSG_ERROR) {
363             $got_err = 1;
364         } elsif ($msg->{'type'} == $XMSG_DONE) {
365             $src->remove();
366             Amanda::MainLoop::quit();
367         }
368     });
369     $xfer->start();
370     Amanda::MainLoop::run();
371
372     ok($got_err, "buffer stops the xfer if it doesn't have space");
373 }
374
375 SKIP: {
376     skip "not built with server", 17 unless Amanda::Util::built_with_component("server");
377
378     my $disk_cache_dir = "$Installcheck::TMP";
379     my $RANDOM_SEED = 0xFACADE;
380
381     # exercise device source and destination
382     {
383         my $RANDOM_SEED = 0xFACADE;
384         my $xfer;
385
386         my $quit_cb = make_cb(quit_cb => sub {
387             my ($src, $msg, $xfer) = @_;
388             if ($msg->{'type'} == $XMSG_ERROR) {
389                 die $msg->{'elt'} . " failed: " . $msg->{'message'};
390             } elsif ($msg->{'type'} == $XMSG_DONE) {
391                 Amanda::MainLoop::quit();
392             }
393         });
394
395         # set up vtapes
396         my $testconf = Installcheck::Run::setup();
397         $testconf->write();
398
399         # set up a device for slot 1
400         my $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape(1));
401         die("Could not open VFS device: " . $device->error())
402             unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
403
404         # write to it
405         my $hdr = Amanda::Header->new();
406         $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
407         $hdr->{'name'} = "installcheck";
408         $hdr->{'disk'} = "/";
409         $hdr->{'datestamp'} = "20080102030405";
410         $hdr->{'program'} = "INSTALLCHECK";
411
412         $device->finish();
413         $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
414         $device->start_file($hdr);
415
416         $xfer = Amanda::Xfer->new([
417             Amanda::Xfer::Source::Random->new(1024*1024, $RANDOM_SEED),
418             Amanda::Xfer::Dest::Device->new($device, $device->block_size() * 10),
419         ]);
420
421         $xfer->start($quit_cb);
422
423         Amanda::MainLoop::run();
424         pass("write to a device (completed succesfully; data may not be correct)");
425
426         # finish up the file and device
427         ok(!$device->in_file(), "not in_file");
428         ok($device->finish(), "finish");
429
430         # now turn around and read from it
431         $device->start($Amanda::Device::ACCESS_READ, undef, undef);
432         $device->seek_file(1);
433
434         $xfer = Amanda::Xfer->new([
435             Amanda::Xfer::Source::Device->new($device),
436             Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
437         ]);
438
439         $xfer->start($quit_cb);
440
441         Amanda::MainLoop::run();
442         pass("read from a device succeeded, too, and data was correct");
443     }
444
445     # extra params:
446     #   cancel_after_partnum - after this partnum is completed, cancel the xfer
447     #   do_not_retry - do not retry a failed part - cancel the xfer instead
448     sub test_taper_dest {
449         my ($src, $dest_sub, $expected_messages, $msg_prefix, %params) = @_;
450         my $xfer;
451         my $device;
452         my $vtape_num = 1;
453         my @messages;
454
455         # set up vtapes
456         my $testconf = Installcheck::Run::setup();
457         $testconf->write();
458
459         my $hdr = Amanda::Header->new();
460         $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
461         $hdr->{'name'} = "installcheck";
462         $hdr->{'disk'} = "/";
463         $hdr->{'datestamp'} = "20080102030405";
464         $hdr->{'program'} = "INSTALLCHECK";
465
466         # set up a device for the taper dest
467         $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
468         die("Could not open VFS device: " . $device->error())
469             unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
470         $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
471         $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
472         my $dest = $dest_sub->($device);
473
474         # and create the xfer
475         $xfer = Amanda::Xfer->new([ $src, $dest ]);
476
477         my $start_new_part = sub {
478             my ($successful, $eof, $partnum) = @_;
479
480             if (exists $params{'cancel_after_partnum'}
481                     and $params{'cancel_after_partnum'} == $partnum) {
482                 push @messages, "CANCEL";
483                 $xfer->cancel();
484                 return;
485             }
486
487             if (!$device || !$successful) {
488                 # set up a device and start writing a part to it
489                 $device->finish() if $device;
490                 $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
491                 die("Could not open VFS device: " . $device->error())
492                     unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
493                 $dest->use_device($device);
494                 $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
495                 $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
496             }
497
498             # bail out if we shouldn't retry this part
499             if (!$successful and $params{'do_not_retry'}) {
500                 push @messages, "NOT-RETRYING";
501                 $xfer->cancel();
502                 return;
503             }
504
505             if (!$eof) {
506                 if ($successful) {
507                     $dest->start_part(0, $hdr);
508                 } else {
509                     $dest->start_part(1, $hdr);
510                 }
511             }
512         };
513
514         $xfer->start(sub {
515             my ($src, $msg, $xfer) = @_;
516
517             if ($msg->{'type'} == $XMSG_ERROR) {
518                 die $msg->{'elt'} . " failed: " . $msg->{'message'};
519             } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
520                 push @messages, "PART-" . $msg->{'partnum'} . '-' . ($msg->{'successful'}? "OK" : "FAILED");
521                 $start_new_part->($msg->{'successful'}, $msg->{'eof'}, $msg->{'partnum'});
522             } elsif ($msg->{'type'} == $XMSG_DONE) {
523                 push @messages, "DONE";
524                 Amanda::MainLoop::quit();
525             } elsif ($msg->{'type'} == $XMSG_CANCEL) {
526                 push @messages, "CANCELLED";
527             } else {
528                 push @messages, "$msg";
529             }
530         });
531
532         Amanda::MainLoop::call_later(sub { $start_new_part->(1, 0, -1); });
533         Amanda::MainLoop::run();
534
535         is_deeply([@messages],
536             $expected_messages,
537             "$msg_prefix: element produces the correct series of messages")
538         or diag(Dumper([@messages]));
539     }
540
541     sub run_recovery_source {
542         my ($dest, $files, $expected_messages, $finished_cb) = @_;
543         my $device;
544         my @filenums;
545         my @messages;
546         my $xfer;
547         my $dev;
548         my $src;
549
550         my $steps = define_steps
551             cb_ref => \$finished_cb;
552
553         step setup => sub {
554             # we need a device up front, so sneak a peek into @$files
555             $dev = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($files->[0]));
556             $src = Amanda::Xfer::Source::Recovery->new($dev);
557             $xfer = Amanda::Xfer->new([ $src, $dest ]);
558
559             $xfer->start($steps->{'got_xmsg'});
560             # got_xmsg will call got_ready when the element is ready
561         };
562
563         step got_ready => sub {
564             $steps->{'load_slot'}->();
565         };
566
567         step load_slot => sub {
568             if (!@$files) {
569                 return $src->start_part(undef);
570                 # (will trigger an XMSG_DONE; see below)
571             }
572
573             my $slot = shift @$files;
574             @filenums = @{ shift @$files };
575
576             $dev = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($slot));
577             if ($dev->status != $Amanda::Device::DEVICE_STATUS_SUCCESS) {
578                 die $dev->error_or_status();
579             }
580
581             $src->use_device($dev);
582
583             if (!$dev->start($Amanda::Device::ACCESS_READ, undef, undef)) {
584                 die $dev->error_or_status();
585             }
586
587             $steps->{'seek_file'}->();
588         };
589
590         step seek_file => sub {
591             if (!@filenums) {
592                 return $steps->{'load_slot'}->();
593             }
594
595             my $hdr = $dev->seek_file(shift @filenums);
596             if (!$hdr) {
597                 die $dev->error_or_status();
598             }
599
600             push @messages, "PART";
601
602             $src->start_part($dev);
603         };
604
605         step got_xmsg => sub {
606             my ($src, $msg, $xfer) = @_;
607
608             if ($msg->{'type'} == $XMSG_ERROR) {
609                 die $msg->{'elt'} . " failed: " . $msg->{'message'};
610             } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
611                 push @messages, "KB-" . ($msg->{'size'}/1024);
612                 $steps->{'seek_file'}->();
613             } elsif ($msg->{'type'} == $XMSG_DONE) {
614                 push @messages, "DONE";
615                 $steps->{'quit'}->();
616             } elsif ($msg->{'type'} == $XMSG_READY) {
617                 push @messages, "READY";
618                 $steps->{'got_ready'}->();
619             } elsif ($msg->{'type'} == $XMSG_CANCEL) {
620                 push @messages, "CANCELLED";
621             }
622         };
623
624         step quit => sub {
625             is_deeply([@messages],
626                 $expected_messages,
627                 "files read back and verified successfully with Amanda::Xfer::Recovery::Source")
628             or diag(Dumper([@messages]));
629
630             $finished_cb->();
631         };
632     }
633
634     sub test_recovery_source {
635         run_recovery_source(@_, \&Amanda::MainLoop::quit);
636         Amanda::MainLoop::run();
637     }
638
639     my $holding_base = "$Installcheck::TMP/source-holding";
640     my $holding_file;
641     # create a sequence of holding chunks, each 2MB.
642     sub make_holding_files {
643         my ($nchunks) = @_;
644         my $block = 'a' x 32768;
645
646         rmtree($holding_base);
647         mkpath($holding_base);
648         for (my $i = 0; $i < $nchunks; $i++) {
649             my $filename = "$holding_base/file$i";
650             open(my $fh, ">", "$filename");
651
652             my $hdr = Amanda::Header->new();
653             $hdr->{'type'} = ($i == 0)?
654                 $Amanda::Header::F_DUMPFILE : $Amanda::Header::F_CONT_DUMPFILE;
655             $hdr->{'datestamp'} = "20070102030405";
656             $hdr->{'dumplevel'} = 0;
657             $hdr->{'compressed'} = 1;
658             $hdr->{'name'} = "localhost";
659             $hdr->{'disk'} = "/home";
660             $hdr->{'program'} = "INSTALLCHECK";
661             if ($i != $nchunks-1) {
662                 $hdr->{'cont_filename'} = "$holding_base/file" . ($i+1);
663             }
664
665             print $fh $hdr->to_string(32768,32768);
666
667             for (my $b = 0; $b < 64; $b++) {
668                 print $fh $block;
669             }
670             close($fh);
671         }
672
673         return "$holding_base/file0";
674     }
675
676     # run this test in each of a few different cache permutations
677     test_taper_dest(
678         Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
679         sub {
680             my ($first_dev) = @_;
681             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
682                                                      1024*1024, 1, undef),
683         },
684         [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED",
685           "PART-3-OK", "PART-4-OK", "PART-5-OK",
686           "DONE" ],
687         "mem cache");
688     test_recovery_source(
689         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
690         [ 1 => [ 1, 2 ], 2 => [ 1, 2, 3 ], ],
691         [
692           'READY',
693           'PART',
694           'KB-1024',
695           'PART',
696           'KB-1024',
697           'PART',
698           'KB-1024',
699           'PART',
700           'KB-1024',
701           'PART',
702           'KB-102',
703           'DONE'
704         ]);
705
706     test_taper_dest(
707         Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
708         sub {
709             my ($first_dev) = @_;
710             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
711                                               1024*1024, 0, $disk_cache_dir),
712         },
713         [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED",
714           "PART-3-OK", "PART-4-OK", "PART-5-OK",
715           "DONE" ],
716         "disk cache");
717     test_recovery_source(
718         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
719         [ 1 => [ 1, 2 ], 2 => [ 1, 2, 3 ], ],
720         [
721           'READY',
722           'PART',
723           'KB-1024',
724           'PART',
725           'KB-1024',
726           'PART',
727           'KB-1024',
728           'PART',
729           'KB-1024',
730           'PART',
731           'KB-102',
732           'DONE'
733         ]);
734
735     test_taper_dest(
736         Amanda::Xfer::Source::Random->new(1024*1024*2, $RANDOM_SEED),
737         sub {
738             my ($first_dev) = @_;
739             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
740                                                     1024*1024, 0, undef),
741         },
742         [ "PART-1-OK", "PART-2-OK", "PART-3-OK",
743           "DONE" ],
744         "no cache (no failed parts; exact multiple of part size)");
745     test_recovery_source(
746         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
747         [ 1 => [ 1, 2, 3 ], ],
748         [
749           'READY',
750           'PART',
751           'KB-1024',
752           'PART',
753           'KB-1024',
754           'PART',
755           'KB-0',
756           'DONE'
757         ]);
758
759     test_taper_dest(
760         Amanda::Xfer::Source::Random->new(1024*1024*2, $RANDOM_SEED),
761         sub {
762             my ($first_dev) = @_;
763             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024, 0, 0, undef),
764         },
765         [ "PART-1-OK", "DONE" ],
766         "no splitting (fits on volume)");
767     test_recovery_source(
768         Amanda::Xfer::Dest::Null->new($RANDOM_SEED),
769         [ 1 => [ 1 ], ],
770         [
771           'READY',
772           'PART',
773           'KB-2048',
774           'DONE'
775         ]);
776
777     test_taper_dest(
778         Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
779         sub {
780             my ($first_dev) = @_;
781             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024, 0, 0, undef),
782         },
783         [ "PART-1-FAILED", "NOT-RETRYING", "CANCELLED", "DONE" ],
784         "no splitting (doesn't fit on volume -> fails)",
785         do_not_retry => 1);
786
787     test_taper_dest(
788         Amanda::Xfer::Source::Random->new(1024*1024*4.1, $RANDOM_SEED),
789         sub {
790             my ($first_dev) = @_;
791             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
792                                             1024*1024, 0, $disk_cache_dir),
793         },
794         [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED",
795           "PART-3-OK", "PART-4-OK", "CANCEL",
796           "CANCELLED", "DONE" ],
797         "cancellation after success",
798         cancel_after_partnum => 4);
799
800     # set up a few holding chunks and read from those
801     $holding_file = make_holding_files(3);
802     test_taper_dest(
803         Amanda::Xfer::Source::Holding->new($holding_file),
804         sub {
805             my ($first_dev) = @_;
806             Amanda::Xfer::Dest::Taper::Splitter->new($first_dev, 128*1024,
807                                             1024*1024, 0, undef),
808         },
809         [ "PART-1-OK", "PART-2-OK", "PART-3-FAILED",
810           "PART-3-OK", "PART-4-OK", "PART-5-FAILED",
811           "PART-5-OK", "PART-6-OK", "PART-7-OK",
812           "DONE" ],
813         "Amanda::Xfer::Source::Holding acts as a source and supplies cache_inform");
814
815     ##
816     # test the cache_inform method
817
818     sub test_taper_dest_cache_inform {
819         my %params = @_;
820         my $xfer;
821         my $device;
822         my $fh;
823         my $part_size = 1024*1024;
824         my $file_size = $part_size * 4 + 100 * 1024;
825         my $cache_file = "$Installcheck::TMP/cache_file";
826         my $vtape_num = 1;
827
828         # set up our "cache", cleverly using an Amanda::Xfer::Dest::Fd
829         open($fh, ">", "$cache_file") or die("Could not open '$cache_file' for writing");
830         $xfer = Amanda::Xfer->new([
831             Amanda::Xfer::Source::Random->new($file_size, $RANDOM_SEED),
832             Amanda::Xfer::Dest::Fd->new(fileno($fh)),
833         ]);
834
835         $xfer->start(sub {
836             my ($src, $msg, $xfer) = @_;
837             if ($msg->{'type'} == $XMSG_ERROR) {
838                 die $msg->{'elt'} . " failed: " . $msg->{'message'};
839             } elsif ($msg->{'type'} == $XMSG_DONE) {
840                 Amanda::MainLoop::quit();
841             }
842         });
843         Amanda::MainLoop::run();
844         close($fh);
845
846         # create a list of holding chuunks, some slab-aligned, some part-aligned,
847         # some not
848         my @holding_chunks;
849         if (!$params{'omit_chunks'}) {
850             my $offset = 0;
851             my $do_chunk = sub {
852                 my ($break) = @_;
853                 die unless $break > $offset;
854                 push @holding_chunks, [ $cache_file, $offset, $break - $offset ];
855                 $offset = $break;
856             };
857             $do_chunk->(277);
858             $do_chunk->($part_size);
859             $do_chunk->($part_size+128*1024);
860             $do_chunk->($part_size*3);
861             $do_chunk->($part_size*3+1024);
862             $do_chunk->($part_size*3+1024*2);
863             $do_chunk->($part_size*3+1024*3);
864             $do_chunk->($part_size*4);
865             $do_chunk->($part_size*4 + 77);
866             $do_chunk->($file_size - 1);
867             $do_chunk->($file_size);
868         }
869
870         # set up vtapes
871         my $testconf = Installcheck::Run::setup();
872         $testconf->write();
873
874         my $hdr = Amanda::Header->new();
875         $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
876         $hdr->{'name'} = "installcheck";
877         $hdr->{'disk'} = "/";
878         $hdr->{'datestamp'} = "20080102030405";
879         $hdr->{'program'} = "INSTALLCHECK";
880
881         # set up the cache file
882         open($fh, "<", "$cache_file") or die("Could not open '$cache_file' for reading");
883
884         # set up a device for writing
885         $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
886         die("Could not open VFS device: " . $device->error())
887             unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
888         $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
889         $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
890
891         my $dest = Amanda::Xfer::Dest::Taper::Splitter->new($device, 128*1024,
892                                                     1024*1024, 0, undef);
893         $xfer = Amanda::Xfer->new([
894             Amanda::Xfer::Source::Fd->new(fileno($fh)),
895             $dest,
896         ]);
897
898         my $start_new_part = sub {
899             my ($successful, $eof, $last_partnum) = @_;
900
901             if (!$device || !$successful) {
902                 # set up a device and start writing a part to it
903                 $device->finish() if $device;
904                 $device = Amanda::Device->new("file:" . Installcheck::Run::load_vtape($vtape_num++));
905                 die("Could not open VFS device: " . $device->error())
906                     unless ($device->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS);
907                 $dest->use_device($device);
908                 $device->start($Amanda::Device::ACCESS_WRITE, "TESTCONF01", "20080102030405");
909                 $device->property_set("MAX_VOLUME_USAGE", 1024*1024*2.5);
910             }
911
912             # feed enough chunks to cache_inform
913             my $upto = ($last_partnum+2) * $part_size;
914             while (@holding_chunks and $holding_chunks[0]->[1] < $upto) {
915                 my ($filename, $offset, $length) = @{shift @holding_chunks};
916                 $dest->cache_inform($filename, $offset, $length);
917             }
918
919             if (!$eof) {
920                 if ($successful) {
921                     $dest->start_part(0, $hdr);
922                 } else {
923                     $dest->start_part(1, $hdr);
924                 }
925             }
926         };
927
928         my @messages;
929         $xfer->start(sub {
930             my ($src, $msg, $xfer) = @_;
931
932             if ($msg->{'type'} == $XMSG_ERROR) {
933                 push @messages, "ERROR: $msg->{message}";
934             } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
935                 push @messages, "PART-" . ($msg->{'successful'}? "OK" : "FAILED");
936                 $start_new_part->($msg->{'successful'}, $msg->{'eof'}, $msg->{'partnum'});
937             } elsif ($msg->{'type'} == $XMSG_DONE) {
938                 push @messages, "DONE";
939                 Amanda::MainLoop::quit();
940             } elsif ($msg->{'type'} == $XMSG_CANCEL) {
941                 push @messages, "CANCELLED";
942             } else {
943                 push @messages, $msg->{'type'};
944             }
945         });
946
947         Amanda::MainLoop::call_later(sub { $start_new_part->(1, 0, -1); });
948         Amanda::MainLoop::run();
949
950         unlink($cache_file);
951
952         return @messages;
953     }
954
955     is_deeply([ test_taper_dest_cache_inform() ],
956         [ "PART-OK", "PART-OK", "PART-FAILED",
957           "PART-OK", "PART-OK", "PART-OK",
958           "DONE" ],
959         "cache_inform: element produces the correct series of messages");
960
961     is_deeply([ test_taper_dest_cache_inform(omit_chunks => 1) ],
962         [ "PART-OK", "PART-OK", "PART-FAILED",
963           "ERROR: Failed part was not cached; cannot retry", "CANCELLED",
964           "DONE" ],
965         "cache_inform: element produces the correct series of messages when a chunk is missing");
966
967     rmtree($holding_base);
968 }
969
970 # test Amanda::Xfer::Dest::Taper::DirectTCP; do it twice, once with a cancellation
971 SKIP: {
972     skip "not built with ndmp and server", 3 unless
973         Amanda::Util::built_with_component("ndmp") and Amanda::Util::built_with_component("server");
974
975     my $RANDOM_SEED = 0xFACADE;
976
977     # make XDT output fairly verbose
978     $Amanda::Config::debug_taper = 2;
979
980     my $ndmp = Installcheck::Mock::NdmpServer->new();
981     my $ndmp_port = $ndmp->{'port'};
982     my $drive = $ndmp->{'drive'};
983
984     my $mkdevice = sub {
985         my $dev = Amanda::Device->new("ndmp:127.0.0.1:$ndmp_port\@$drive");
986         die "can't create device" unless $dev->status() == $Amanda::Device::DEVICE_STATUS_SUCCESS;
987         $dev->property_set("verbose", 1) or die "can't set VERBOSE";
988         $dev->property_set("ndmp_username", "ndmp") or die "can't set username";
989         $dev->property_set("ndmp_password", "ndmp") or die "can't set password";
990
991         return $dev;
992     };
993
994     my $hdr = Amanda::Header->new();
995     $hdr->{'type'} = $Amanda::Header::F_DUMPFILE;
996     $hdr->{'name'} = "installcheck";
997     $hdr->{'disk'} = "/";
998     $hdr->{'datestamp'} = "20080102030405";
999     $hdr->{'program'} = "INSTALLCHECK";
1000
1001     for my $do_cancel (0, 'later', 'in_setup') {
1002         my $dev;
1003         my $xfer;
1004         my @messages;
1005
1006         # make a starting device
1007         $dev = $mkdevice->();
1008
1009         # and create the xfer
1010         my $src = Amanda::Xfer::Source::Random->new(32768*34-7, $RANDOM_SEED);
1011         my $dest = Amanda::Xfer::Dest::Taper::DirectTCP->new($dev, 32768*16);
1012         $xfer = Amanda::Xfer->new([ $src, $dest ]);
1013
1014         my $start_new_part; # forward declaration
1015         my $xmsg_cb = sub {
1016             my ($src, $msg, $xfer) = @_;
1017
1018             if ($msg->{'type'} == $XMSG_ERROR) {
1019                 # if this is an expected error, don't die
1020                 if ($do_cancel eq 'in_setup' and $msg->{'message'} =~ /operation not supported/) {
1021                     push @messages, "ERROR";
1022                 } else {
1023                     die $msg->{'elt'} . " failed: " . $msg->{'message'};
1024                 }
1025             } elsif ($msg->{'type'} == $XMSG_READY) {
1026                 push @messages, "READY";
1027
1028                 # get ourselves a new (albeit identical) device, just to prove that the connections
1029                 # are a little bit portable
1030                 $dev->finish();
1031                 $dev = $mkdevice->();
1032                 $dest->use_device($dev);
1033                 $dev->start($Amanda::Device::ACCESS_WRITE, "TESTCONF02", "20080102030406");
1034
1035                 $start_new_part->(1, 0); # start first part
1036             } elsif ($msg->{'type'} == $XMSG_PART_DONE) {
1037                 push @messages, "PART-" . $msg->{'partnum'} . '-' . ($msg->{'successful'}? "OK" : "FAILED");
1038                 if ($do_cancel and $msg->{'partnum'} == 2) {
1039                     $xfer->cancel();
1040                 } else {
1041                     $start_new_part->($msg->{'successful'}, $msg->{'eof'});
1042                 }
1043             } elsif ($msg->{'type'} == $XMSG_DONE) {
1044                 push @messages, "DONE";
1045                 Amanda::MainLoop::quit();
1046             } elsif ($msg->{'type'} == $XMSG_CANCEL) {
1047                 push @messages, "CANCELLED";
1048             } else {
1049                 push @messages, "$msg";
1050             }
1051         };
1052
1053         # trigger an error in the xfer dest's setup method by putting the device
1054         # in an error state.  NDMP devices do not support append, so starting in
1055         # append mode should trigger the failure.
1056         if ($do_cancel eq 'in_setup') {
1057             if ($dev->start($Amanda::Device::ACCESS_APPEND, "MYLABEL", undef)) {
1058                 die "successfully started NDMP device in ACCESS_APPEND?!";
1059             }
1060         }
1061
1062         $xfer->start($xmsg_cb);
1063
1064         $start_new_part = sub {
1065             my ($successful, $eof) = @_;
1066
1067             die "this dest shouldn't have unsuccessful parts" unless $successful;
1068
1069             if (!$eof) {
1070                 $dest->start_part(0, $hdr);
1071             }
1072         };
1073
1074         Amanda::MainLoop::run();
1075
1076         $dev->finish();
1077
1078         if (!$do_cancel) {
1079             is_deeply([@messages],
1080                 [ 'READY', 'PART-1-OK', 'PART-2-OK', 'PART-3-OK', 'DONE' ],
1081                 "Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages")
1082             or diag(Dumper([@messages]));
1083         } elsif ($do_cancel eq 'in_setup') {
1084             is_deeply([@messages],
1085                 [ 'ERROR', 'CANCELLED', 'DONE' ],
1086                 "Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages when cancelled during setup")
1087             or diag(Dumper([@messages]));
1088         } else {
1089             is_deeply([@messages],
1090                 [ 'READY', 'PART-1-OK', 'PART-2-OK', 'CANCELLED', 'DONE' ],
1091                 "Amanda::Xfer::Dest::Taper::DirectTCP element produces the correct series of messages when cancelled in mid-xfer")
1092             or diag(Dumper([@messages]));
1093         }
1094     }
1095
1096     # Amanda::Xfer::Source::Recovery's directtcp functionality is not
1097     # tested here, as to do so would basically require re-implementing
1098     # Amanda::Recovery::Clerk; the xfer source is adequately tested by
1099     # the Amanda::Recovery::Clerk tests.
1100
1101     $ndmp->cleanup();
1102 }
1103
1104 # directtcp stuff
1105
1106 {
1107     my $RANDOM_SEED = 0x13131313; # 13 is bad luck, right?
1108
1109     # we want this to look like:
1110     # A: [ Random -> DirectTCPConnect ]
1111     #        --dtcp-->
1112     # B: [ DirectTCPListen -> filter -> DirectTCPListen ]
1113     #        --dtcp-->
1114     # C: [ DirectTCPConnect -> filter -> Null ]
1115     #
1116     # this tests both XFER_MECH_DIRECTTCP_CONNECT and
1117     # XFER_MECH_DIRECTTCP_LISTEN, as well as some of the glue
1118     # used to attach those to filters.
1119     #
1120     # that means we need to start transfer B, since it has all of the
1121     # addresses, before creating A or C.
1122
1123     my $done = { };
1124     my $handle_msg = sub {
1125         my ($letter, $src, $msg, $xfer) = @_;
1126         if ($msg->{type} == $XMSG_ERROR) {
1127             die $msg->{elt} . " failed: " . $msg->{message};
1128         } elsif ($msg->{'type'} == $XMSG_DONE) {
1129             $done->{$letter} = 1;
1130         }
1131         if ($done->{'A'} and $done->{'B'} and $done->{'C'}) {
1132             Amanda::MainLoop::quit();
1133         }
1134     };
1135
1136     my %cbs;
1137     for my $letter ('A', 'B', 'C') {
1138         $cbs{$letter} = sub { $handle_msg->($letter, @_); };
1139     }
1140
1141     my $src_listen = Amanda::Xfer::Source::DirectTCPListen->new();
1142     my $dst_listen = Amanda::Xfer::Dest::DirectTCPListen->new();
1143     my $xferB = Amanda::Xfer->new([
1144         $src_listen,
1145         Amanda::Xfer::Filter::Xor->new(0x13),
1146         $dst_listen
1147     ]);
1148
1149     $xferB->start($cbs{'B'});
1150
1151     my $xferA = Amanda::Xfer->new([
1152         Amanda::Xfer::Source::Random->new(1024*1024*3, $RANDOM_SEED),
1153         Amanda::Xfer::Dest::DirectTCPConnect->new($src_listen->get_addrs())
1154     ]);
1155
1156     $xferA->start($cbs{'A'});
1157
1158     my $xferC = Amanda::Xfer->new([
1159         Amanda::Xfer::Source::DirectTCPConnect->new($dst_listen->get_addrs()),
1160         Amanda::Xfer::Filter::Xor->new(0x13),
1161         Amanda::Xfer::Dest::Null->new($RANDOM_SEED)
1162     ]);
1163
1164     $xferC->start($cbs{'C'});
1165
1166     # let the already-started transfers go out of scope before they 
1167     # complete, as a memory management test..
1168     Amanda::MainLoop::run();
1169     pass("Three xfers interlinked via DirectTCP complete successfully");
1170 }
1171
1172 # test element comparison
1173 {
1174     my $a = Amanda::Xfer::Filter::Xor->new(0);
1175     my $b = Amanda::Xfer::Filter::Xor->new(1);
1176     ok($a == $a, "elements compare equal to themselves");
1177     ok(!($a == $b), ".. and not to other elements");
1178     ok(!($a != $a), "elements do not compare != to themselves");
1179     ok($a != $b, ".. but are != to other elements");
1180 }