Imported Upstream version 3.3.2
[debian/amanda] / perl / Amanda / Taper / Worker.pm
1 # Copyright (c) 2009-2012 Zmanda Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License version 2 as published
5 # by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
10 # for more details.
11 #
12 # You should have received a copy of the GNU General Public License along
13 # with this program; if not, write to the Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
15 #
16 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
18
19 =head1 NAME
20
21 Amanda::Taper::Worker
22
23 =head1 DESCRIPTION
24
25 This package is a component of the Amanda taper, and is not intended for use by
26 other scripts or applications.
27
28 This package interface between L<Amanda::Taper::Controller> and L<Amanda::Taper::Scribe>.
29
30 The worker use an L<Amanda::Taper::Scribe> object to execute the request
31 received from the L<Amanda::Taper::Controller>.
32
33 =cut
34
35 use lib '@amperldir@';
36 use strict;
37 use warnings;
38
39 package Amanda::Taper::Worker;
40
41 use Carp;
42 use POSIX qw( :errno_h );
43 use Amanda::Changer;
44 use Amanda::Config qw( :getconf config_dir_relative );
45 use Amanda::Debug qw( :logging );
46 use Amanda::Header;
47 use Amanda::Holding;
48 use Amanda::MainLoop qw( :GIOCondition );
49 use Amanda::MainLoop;
50 use Amanda::Taper::Protocol;
51 use Amanda::Taper::Scan;
52 use Amanda::Taper::Scribe qw( get_splitting_args_from_config );
53 use Amanda::Logfile qw( :logtype_t log_add make_stats );
54 use Amanda::Xfer qw( :constants );
55 use Amanda::Util qw( quote_string );
56 use Amanda::Tapelist;
57 use File::Temp;
58
59 use base qw( Amanda::Taper::Scribe::Feedback );
60
61 our $tape_num = 0;
62
63 sub new {
64     my $class           = shift;
65     my $worker_name     = shift;
66     my $controller      = shift;
67     my $write_timestamp = shift;
68
69     my $self = bless {
70         state       => "init",
71         worker_name => $worker_name,
72         controller  => $controller,
73         scribe      => undef,
74         timestamp   => $write_timestamp,
75
76         # filled in when a write starts:
77         xfer => undef,
78         xfer_source => undef,
79         xfer_dest => undef,
80         handle => undef,
81         hostname => undef,
82         diskname => undef,
83         datestamp => undef,
84         level => undef,
85         header => undef,
86         doing_port_write => undef,
87         input_errors => [],
88
89         # periodic status updates
90         timer => undef,
91         status_filename => undef,
92         status_fh => undef,
93
94         # filled in after the header is available
95         header => undef,
96
97         # filled in when a new tape is started:
98         label => undef
99     }, $class;
100
101     my $scribe = Amanda::Taper::Scribe->new(
102         taperscan => $controller->{'taperscan'},
103         feedback => $self,
104         debug => $Amanda::Config::debug_taper,
105         eject_volume => getconf($CNF_EJECT_VOLUME));
106
107     $self->{'scribe'} = $scribe;
108     $self->{'scribe'}->start(write_timestamp => $write_timestamp,
109         finished_cb => sub { $self->_scribe_started_cb(@_); });
110
111     return $self;
112 }
113
114 # called when the scribe is fully started up and ready to go
115 sub _scribe_started_cb {
116     my $self = shift;
117     my ($err) = @_;
118
119     if ($err) {
120         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPE_ERROR,
121                 worker_name  => $self->{'worker_name'},
122                 message => "$err");
123         $self->{'state'} = "error";
124
125         # log the error (note that the message is intentionally not quoted)
126         log_add($L_ERROR, "no-tape error [$err]");
127
128     } else {
129         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPER_OK,
130                 worker_name => $self->{'worker_name'});
131         $self->{'state'} = "idle";
132     }
133 }
134
135
136 sub FILE_WRITE {
137     my $self = shift;
138     my ($msgtype, %params) = @_;
139     $self->_assert_in_state("idle") or return;
140
141     $self->{'doing_port_write'} = 0;
142
143     $self->setup_and_start_dump($msgtype,
144         dump_cb => sub { $self->dump_cb(@_); },
145         %params);
146 }
147
148 sub PORT_WRITE {
149     my $self = shift;
150     my ($msgtype, %params) = @_;
151
152     my $read_cb;
153
154     $self->_assert_in_state("idle") or return;
155
156     $self->{'doing_port_write'} = 1;
157
158     $self->setup_and_start_dump($msgtype,
159         dump_cb => sub { $self->dump_cb(@_); },
160         %params);
161 }
162
163 sub START_SCAN {
164     my $self = shift;
165     my ($msgtype, %params) = @_;
166
167     $self->{'scribe'}->start_scan(undef);
168 }
169
170 sub NEW_TAPE {
171     my $self = shift;
172     my ($msgtype, %params) = @_;
173
174     $self->_assert_in_state("writing") or return;
175
176     $self->{'perm_cb'}->(allow => 1);
177 }
178
179 sub NO_NEW_TAPE {
180     my $self = shift;
181     my ($msgtype, %params) = @_;
182
183     $self->_assert_in_state("writing") or return;
184
185     # log the error (note that the message is intentionally not quoted)
186     log_add($L_ERROR, "no-tape config [$params{reason}]");
187
188     $self->{'perm_cb'}->(cause => "config", message => $params{'reason'});
189 }
190
191 sub TAKE_SCRIBE_FROM {
192     my $self = shift;
193     my ($worker1, $msgtype, %params) = @_;
194
195     $self->_assert_in_state("writing") or return;
196     $worker1->_assert_in_state("idle") or return;
197
198     my $scribe = $self->{'scribe'};
199     my $scribe1 = $worker1->{'scribe'};
200     $self->{'scribe'} = $scribe1;
201     $worker1->{'scribe'} = $scribe;
202     # Change the callback to call the new scribe
203     $self->{'xfer'}->set_callback(sub {
204         my ($src, $msg, $xfer) = @_;
205         $scribe1->handle_xmsg($src, $msg, $xfer);
206
207         # if this is an error message that's not from the scribe's element, then
208         # we'll need to keep track of it ourselves
209         if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $self->{'xfer_dest'}) {
210             push @{$self->{'input_errors'}}, $msg->{'message'};
211         }
212     });
213
214     $self->{'label'} = $worker1->{'label'};
215     $self->{'perm_cb'}->(scribe => $scribe1);
216     delete $worker1->{'scribe'};
217     $worker1->{'state'} = 'error';
218     $scribe->quit(finished_cb => sub {});
219  }
220
221 sub DONE {
222     my $self = shift;
223     my ($msgtype, %params) = @_;
224
225     if (!defined $self->{'dumper_status'}) {
226         $self->{'dumper_status'} = "DONE";
227         $self->{'orig_kb'} = $params{'orig_kb'};
228         if (defined $self->{'result'}) {
229             $self->result_cb(undef);
230         }
231     } else {
232         # ignore the message
233     }
234 }
235
236 sub FAILED {
237     my $self = shift;
238     my ($msgtype, %params) = @_;
239
240     $self->{'dumper_status'} = "FAILED";
241     if (defined $self->{'header_xfer'}) {
242         $self->{'header_xfer'}->cancel();
243     } elsif (defined $self->{'result'}) {
244         $self->result_cb(undef);
245     } elsif (!defined $self->{'scribe'}->{'xdt'}) {
246         # ignore, the dump is already cancelled or not yet started.
247     } elsif (!defined $self->{'scribe'}->{'xfer'}) {
248         # ignore, the dump is already cancelled or not yet started.
249     } else { # Abort the dump
250         push @{$self->{'input_errors'}}, "dumper failed";
251         $self->{'scribe'}->cancel_dump(
252                 xfer => $self->{'scribe'}->{'xfer'},
253                 dump_cb => $self->{'dump_cb'});
254     }
255 }
256
257 sub CLOSE_VOLUME {
258     my $self = shift;
259     my ($msgtype, %params) = @_;
260
261     $self->_assert_in_state("idle") or return;
262
263     $self->{'scribe'}->close_volume();
264 }
265
266 sub result_cb {
267     my $self = shift;
268     my %params = %{$self->{'dump_params'}};
269     my $msgtype;
270     my $logtype;
271
272     if ($params{'result'} eq 'DONE') {
273         if (!$self->{'doing_port_write'} or $self->{'dumper_status'} eq "DONE") {
274             $msgtype = Amanda::Taper::Protocol::DONE;
275             $logtype = $L_DONE;
276         } else {
277             $msgtype = Amanda::Taper::Protocol::DONE;
278             $logtype = $L_PARTIAL;
279         }
280     } elsif ($params{'result'} eq 'PARTIAL') {
281         $msgtype = Amanda::Taper::Protocol::PARTIAL;
282         $logtype = $L_PARTIAL;
283     } elsif ($params{'result'} eq 'FAILED') {
284         $msgtype = Amanda::Taper::Protocol::FAILED;
285         $logtype = $L_FAIL;
286     }
287
288     if ($self->{timer}) {
289         $self->{timer}->remove();
290         undef $self->{timer};
291         $self->{status_fh}->close();
292         undef $self->{status_fh};
293         unlink($self->{status_filename});
294         undef $self->{status_filename};
295     }
296
297     # note that we use total_duration here, which is the total time between
298     # start_dump and dump_cb, so the kps generated here is much less than the
299     # actual tape write speed.  Think of this as the *taper* speed, rather than
300     # the *tape* speed.
301     my $stats = make_stats($params{'size'}, $params{'total_duration'}, $self->{'orig_kb'});
302
303     # consider this a config-derived failure only if there were no errors
304     my $failure_from = (@{$params{'device_errors'}})?  'error' : 'config';
305
306     my @all_messages = (@{$params{'device_errors'}}, @{$self->{'input_errors'}});
307     push @all_messages, $params{'config_denial_message'} if $params{'config_denial_message'};
308     my $msg = quote_string(join("; ", @all_messages));
309
310     # write a DONE/PARTIAL/FAIL log line
311     if ($logtype == $L_FAIL) {
312         log_add($L_FAIL, sprintf("%s %s %s %s %s %s",
313             quote_string($self->{'hostname'}.""), # " is required for SWIG..
314             quote_string($self->{'diskname'}.""),
315             $self->{'datestamp'},
316             $self->{'level'},
317             $failure_from,
318             $msg));
319     } else {
320         log_add($logtype, sprintf("%s %s %s %s %s %s%s",
321             quote_string($self->{'hostname'}.""), # " is required for SWIG..
322             quote_string($self->{'diskname'}.""),
323             $self->{'datestamp'},
324             $params{'nparts'},
325             $self->{'level'},
326             $stats,
327             ($logtype == $L_PARTIAL and @all_messages)? " $msg" : ""));
328     }
329
330     # and send a message back to the driver
331     my %msg_params = (
332         handle => $self->{'handle'},
333     );
334
335     # reflect errors in our own elements in INPUT-ERROR or INPUT-GOOD
336     if (@{$self->{'input_errors'}}) {
337         $msg_params{'input'} = 'INPUT-ERROR';
338         $msg_params{'inputerr'} = join("; ", @{$self->{'input_errors'}});
339     } else {
340         $msg_params{'input'} = 'INPUT-GOOD';
341         $msg_params{'inputerr'} = '';
342     }
343
344     # and errors from the scribe in TAPE-ERROR or TAPE-GOOD
345     if (@{$params{'device_errors'}}) {
346         $msg_params{'taper'} = 'TAPE-ERROR';
347         $msg_params{'tapererr'} = join("; ", @{$params{'device_errors'}});
348     } elsif ($params{'config_denial_message'}) {
349         $msg_params{'taper'} = 'TAPE-ERROR';
350         $msg_params{'tapererr'} = $params{'config_denial_message'};
351     } else {
352         $msg_params{'taper'} = 'TAPE-GOOD';
353         $msg_params{'tapererr'} = '';
354     }
355
356     if ($msgtype ne Amanda::Taper::Protocol::FAILED) {
357         $msg_params{'stats'} = $stats;
358     }
359
360     # reset things to 'idle' before sending the message
361     $self->{'xfer'} = undef;
362     $self->{'xfer_source'} = undef;
363     $self->{'xfer_dest'} = undef;
364     $self->{'handle'} = undef;
365     $self->{'header'} = undef;
366     $self->{'hostname'} = undef;
367     $self->{'diskname'} = undef;
368     $self->{'datestamp'} = undef;
369     $self->{'level'} = undef;
370     $self->{'header'} = undef;
371     $self->{'state'} = 'idle';
372     delete $self->{'result'};
373     delete $self->{'dumper_status'};
374     delete $self->{'dump_params'};
375
376     $self->{'controller'}->{'proto'}->send($msgtype, %msg_params);
377 }
378
379
380 ##
381 # Scribe feedback
382
383 sub request_volume_permission {
384     my $self = shift;
385     my %params = @_;
386
387     $self->{'perm_cb'} = $params{'perm_cb'};
388     # and send the request to the driver
389     $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::REQUEST_NEW_TAPE,
390         handle => $self->{'handle'});
391 }
392
393 sub scribe_notif_new_tape {
394     my $self = shift;
395     my %params = @_;
396
397     # TODO: if $params{error} is set, report it back to the driver
398     # (this will be a change to the protocol)
399     log_add($L_INFO, "$params{'error'}") if defined $params{'error'};
400
401     if ($params{'volume_label'}) {
402         $self->{'label'} = $params{'volume_label'};
403
404         # add to the trace log
405         log_add($L_START, sprintf("datestamp %s label %s tape %s",
406                 $self->{'timestamp'},
407                 quote_string($self->{'label'}),
408                 ++$tape_num));
409
410         # and the amdump log
411         print STDERR "taper: wrote label '$self->{label}'\n";
412
413         # and inform the driver
414         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NEW_TAPE,
415             handle => $self->{'handle'},
416             label => $params{'volume_label'});
417     } else {
418         $self->{'label'} = undef;
419
420         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NO_NEW_TAPE,
421             handle => $self->{'handle'});
422     }
423 }
424
425 sub scribe_notif_part_done {
426     my $self = shift;
427     my %params = @_;
428
429     $self->_assert_in_state("writing") or return;
430
431     my $stats = make_stats($params{'size'}, $params{'duration'}, $self->{'orig_kb'});
432
433     # log the part, using PART or PARTPARTIAL
434     my $logbase = sprintf("%s %s %s %s %s %s/%s %s %s",
435         quote_string($self->{'label'}),
436         $params{'fileno'},
437         quote_string($self->{'header'}->{'name'}.""), # " is required for SWIG..
438         quote_string($self->{'header'}->{'disk'}.""),
439         $self->{'datestamp'},
440         $params{'partnum'}, -1, # totalparts is always -1
441         $self->{'level'},
442         $stats);
443     if ($params{'successful'}) {
444         log_add($L_PART, $logbase);
445     } else {
446         log_add($L_PARTPARTIAL, "$logbase \"No space left on device\"");
447     }
448
449     # only send a PARTDONE if it was successful
450     if ($params{'successful'}) {
451         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PARTDONE,
452             handle => $self->{'handle'},
453             label => $self->{'label'},
454             fileno => $params{'fileno'},
455             stats => $stats,
456             kb => $params{'size'} / 1024);
457     }
458 }
459
460 sub scribe_notif_log_info {
461     my $self = shift;
462     my %params = @_;
463
464     debug("$params{'message'}");
465     log_add($L_INFO, "$params{'message'}");
466 }
467
468 ##
469 # Utilities
470
471 sub _assert_in_state {
472     my $self = shift;
473     my ($state) = @_;
474     if ($self->{'state'} eq $state) {
475         return 1;
476     } else {
477         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::BAD_COMMAND,
478             message => "command not appropriate in state '$self->{state}' : '$state'");
479         return 0;
480     }
481 }
482
483 sub create_status_file {
484     my $self = shift;
485
486     # create temporary file
487     ($self->{status_fh}, $self->{status_filename}) =
488         File::Temp::tempfile("taper_status_file_XXXXXX",
489                                 DIR => $Amanda::Paths::AMANDA_TMPDIR,
490                                 UNLINK => 1);
491
492     # tell amstatus about it by writing it to the dump log
493     my $qdisk = Amanda::Util::quote_string($self->{'diskname'});
494     my $qhost = Amanda::Util::quote_string($self->{'hostname'});
495     print STDERR "taper: status file $qhost $qdisk:" .
496                     "$self->{status_filename}\n";
497     print {$self->{status_fh}} "0";
498
499     # create timer callback, firing every 5s (=5000msec)
500     $self->{timer} = Amanda::MainLoop::timeout_source(5000);
501     $self->{timer}->set_callback(sub {
502         my $size = $self->{scribe}->get_bytes_written();
503         seek $self->{status_fh}, 0, 0;
504         print {$self->{status_fh}} $size, '     ';
505         $self->{status_fh}->flush();
506     });
507 }
508
509 sub send_port_and_get_header {
510     my $self = shift;
511     my ($finished_cb) = @_;
512
513     my ($xsrc, $xdst);
514     my $errmsg;
515
516     my $steps = define_steps
517         cb_ref => \$finished_cb;
518
519     step send_port => sub {
520         # get the ip:port pairs for the data connection from the data xfer source,
521         # which should be an Amanda::Xfer::Source::DirectTCPListen
522         my $data_addrs = $self->{'xfer_source'}->get_addrs();
523         $data_addrs = join ";", map { $_->[0] . ':' . $_->[1] } @$data_addrs;
524
525         # and set up an xfer for the header, too, using DirectTCP as an easy
526         # way to implement a listen/accept/read process.  Note that this does
527         # not enforce a maximum size, so this portion of Amanda at least can
528         # handle any size header
529         ($xsrc, $xdst) = (
530             Amanda::Xfer::Source::DirectTCPListen->new(),
531             Amanda::Xfer::Dest::Buffer->new(0));
532         $self->{'header_xfer'} = Amanda::Xfer->new([$xsrc, $xdst]);
533         $self->{'header_xfer'}->start($steps->{'header_xfer_xmsg_cb'});
534
535         my $header_addrs = $xsrc->get_addrs();
536         my $header_port = $header_addrs->[0][1];
537
538         # and tell the driver which ports we're listening on
539         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PORT,
540             worker_name => $self->{'worker_name'},
541             handle => $self->{'handle'},
542             port => $header_port,
543             ipports => $data_addrs);
544     };
545
546     step header_xfer_xmsg_cb => sub {
547         my ($src, $xmsg, $xfer) = @_;
548         if ($xmsg->{'type'} == $XMSG_INFO) {
549             info($xmsg->{'message'});
550         } elsif ($xmsg->{'type'} == $XMSG_ERROR) {
551             $errmsg = $xmsg->{'message'};
552         } elsif ($xmsg->{'type'} == $XMSG_DONE) {
553             if ($errmsg) {
554                 $finished_cb->($errmsg);
555             } else {
556                 $steps->{'got_header'}->();
557             }
558         }
559     };
560
561     step got_header => sub {
562         my $hdr_buf = $xdst->get();
563
564         # close stuff up
565         $self->{'header_xfer'} = $xsrc = $xdst = undef;
566
567         if (!defined $hdr_buf) {
568             return $finished_cb->("Got empty header");
569         }
570
571         # parse the header, finally!
572         $self->{'header'} = Amanda::Header->from_string($hdr_buf);
573
574         $finished_cb->(undef);
575     };
576 }
577
578 # do the work of starting a new xfer; this contains the code common to
579 # msg_PORT_WRITE and msg_FILE_WRITE.
580 sub setup_and_start_dump {
581     my $self = shift;
582     my ($msgtype, %params) = @_;
583     my %get_xfer_dest_args;
584
585     $self->{'dump_cb'} = $params{'dump_cb'};
586
587     # setting up the dump is a bit complex, due to the requirements of
588     # a directtcp port_write.  This function:
589     # 1. creates and starts a transfer (make_xfer)
590     # 2. gets the header
591     # 3. calls the scribe's start_dump method with the new header
592
593     my $steps = define_steps
594         cb_ref => \$params{'dump_cb'};
595
596     step setup => sub {
597         $self->{'handle'} = $params{'handle'};
598         $self->{'hostname'} = $params{'hostname'};
599         $self->{'diskname'} = $params{'diskname'};
600         $self->{'datestamp'} = $params{'datestamp'};
601         $self->{'level'} = $params{'level'};
602         $self->{'header'} = undef; # no header yet
603         $self->{'orig_kb'} = $params{'orig_kb'};
604         $self->{'input_errors'} = [];
605
606         if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE &&
607             (my $err = $self->{'scribe'}->check_data_path($params{'data_path'}))) {
608             return $params{'dump_cb'}->(
609                 result => "FAILED",
610                 device_errors => [ ['error', "$err"] ],
611                 size => 0,
612                 duration => 0.0,
613                 total_duration => 0);
614         }
615         $steps->{'process_args'}->();
616     };
617
618     step process_args => sub {
619         # extract the splitting-related parameters, stripping out empty strings
620         my %splitting_args = map {
621             ($params{$_} ne '')? ($_, $params{$_}) : ()
622         } qw(
623             dle_tape_splitsize dle_split_diskbuffer dle_fallback_splitsize dle_allow_split
624             part_size part_cache_type part_cache_dir part_cache_max_size
625         );
626
627         # convert numeric values to BigInts
628         for (qw(dle_tape_splitsize dle_fallback_splitsize part_size part_cache_max_size)) {
629             $splitting_args{$_} = Math::BigInt->new($splitting_args{$_})
630                 if (exists $splitting_args{$_});
631         }
632
633         my $device = $self->{'scribe'}->get_device();
634         if (!defined $device) {
635             confess "no device is available to create an xfer_dest";
636         }
637         $splitting_args{'leom_supported'} = $device->property_get("leom");
638         # and convert those to get_xfer_dest args
639         %get_xfer_dest_args = get_splitting_args_from_config(
640                 %splitting_args);
641         $get_xfer_dest_args{'max_memory'} = getconf($CNF_DEVICE_OUTPUT_BUFFER_SIZE);
642         if (!getconf_seen($CNF_DEVICE_OUTPUT_BUFFER_SIZE)) {
643             my $block_size4 = $device->block_size * 4;
644             if ($block_size4 > $get_xfer_dest_args{'max_memory'}) {
645                 $get_xfer_dest_args{'max_memory'} = $block_size4;
646             }
647         }
648         $device = undef;
649         $get_xfer_dest_args{'can_cache_inform'} = ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE and $get_xfer_dest_args{'allow_split'});
650
651         # if we're unable to fulfill the user's splitting needs, we can still give
652         # the dump a shot - but we'll warn them about the problem
653         if ($get_xfer_dest_args{'warning'}) {
654             log_add($L_WARNING, sprintf("%s:%s: %s",
655                     $params{'hostname'}, $params{'diskname'},
656                     $get_xfer_dest_args{'warning'}));
657             delete $get_xfer_dest_args{'warning'};
658         }
659
660         $steps->{'make_xfer'}->();
661     };
662
663     step make_xfer => sub {
664         $self->_assert_in_state("idle") or return;
665         $self->{'state'} = 'making_xfer';
666
667         $self->{'xfer_dest'} = $self->{'scribe'}->get_xfer_dest(%get_xfer_dest_args);
668
669         my $xfer_source;
670         if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE) {
671             $xfer_source = Amanda::Xfer::Source::DirectTCPListen->new();
672         } else {
673             $xfer_source = Amanda::Xfer::Source::Holding->new($params{'filename'});
674         }
675         $self->{'xfer_source'} = $xfer_source;
676
677         $self->{'xfer'} = Amanda::Xfer->new([$xfer_source, $self->{'xfer_dest'}]);
678         $self->{'xfer'}->start(sub {
679             my ($src, $msg, $xfer) = @_;
680             $self->{'scribe'}->handle_xmsg($src, $msg, $xfer);
681
682             # if this is an error message that's not from the scribe's element, then
683             # we'll need to keep track of it ourselves
684             if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $self->{'xfer_dest'}) {
685                 push @{$self->{'input_errors'}}, $msg->{'message'};
686             }
687         });
688
689         # we've started the xfer now, but the destination won't actually write
690         # any data until we call start_dump.  And we'll need a header for that.
691
692         $steps->{'get_header'}->();
693     };
694
695     step get_header => sub {
696         $self->_assert_in_state("making_xfer") or return;
697         $self->{'state'} = 'getting_header';
698
699         if ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE) {
700             # getting the header is easy for FILE-WRITE..
701             my $hdr = $self->{'header'} = Amanda::Holding::get_header($params{'filename'});
702
703             # stip out header fields we don't need
704             $hdr->{'cont_filename'} = '';
705
706             if (!defined $hdr || $hdr->{'type'} != $Amanda::Header::F_DUMPFILE) {
707                 confess("Could not read header from '$params{filename}'");
708             }
709             $steps->{'start_dump'}->(undef);
710         } else {
711             # ..but quite a bit harder for PORT-WRITE; this method will send the
712             # proper PORT command, then read the header from the dumper and parse
713             # it, placing the result in $self->{'header'}
714             $self->send_port_and_get_header($steps->{'start_dump'});
715         }
716     };
717
718     step start_dump => sub {
719         my ($err) = @_;
720
721         $self->_assert_in_state("getting_header") or return;
722         $self->{'state'} = 'writing';
723
724         # if $err is set, cancel the dump, treating it as a input error
725         if ($err) {
726             push @{$self->{'input_errors'}}, $err;
727             return $self->{'scribe'}->cancel_dump(
728                 xfer => $self->{'xfer'},
729                 dump_cb => $params{'dump_cb'});
730         }
731
732         # sanity check the header..
733         my $hdr = $self->{'header'};
734         if ($hdr->{'dumplevel'} != $params{'level'}
735             or $hdr->{'name'} ne $params{'hostname'}
736             or $hdr->{'disk'} ne $params{'diskname'}
737             or $hdr->{'datestamp'} ne $params{'datestamp'}) {
738             confess("Header of dumpfile does not match command from driver");
739         }
740
741         # start producing status
742         $self->create_status_file();
743
744         # and fix it up before writing it
745         $hdr->{'totalparts'} = -1;
746         $hdr->{'type'} = $Amanda::Header::F_SPLIT_DUMPFILE;
747
748         $self->{'scribe'}->start_dump(
749             xfer => $self->{'xfer'},
750             dump_header => $hdr,
751             dump_cb => $params{'dump_cb'});
752     };
753 }
754
755 sub dump_cb {
756     my $self = shift;
757     my %params = @_;
758
759     $self->{'dump_params'} = \%params;
760     $self->{'result'} = $params{'result'};
761
762     # if we need to the dumper status (to differentiate a dropped network
763     # connection from a normal EOF) and have not done so yet, then send a
764     # DUMPER_STATUS message and re-call this method (dump_cb) with the result.
765     if ($params{'result'} eq "DONE"
766             and $self->{'doing_port_write'}
767             and !exists $self->{'dumper_status'}) {
768         my $controller = $self->{'controller'};
769         my $proto = $controller->{'proto'};
770         my $handle = $self->{'handle'};
771         $proto->send(Amanda::Taper::Protocol::DUMPER_STATUS,
772                 handle => "$handle");
773     } else {
774         $self->result_cb();
775     }
776 }
777
778 1;