7e6b9ea136068fdfb772a328fa236f253919d3f2
[debian/amanda] / perl / Amanda / Taper / Worker.pm
1 # Copyright (c) 2009, 2010 Zmanda Inc.  All Rights Reserved.
2 #
3 # This program is free software; you can redistribute it and/or modify it
4 # under the terms of the GNU General Public License version 2 as published
5 # by the Free Software Foundation.
6 #
7 # This program is distributed in the hope that it will be useful, but
8 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
9 # or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
10 # for more details.
11 #
12 # You should have received a copy of the GNU General Public License along
13 # with this program; if not, write to the Free Software Foundation, Inc.,
14 # 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
15 #
16 # Contact information: Zmanda Inc., 465 S. Mathilda Ave., Suite 300
17 # Sunnyvale, CA 94086, USA, or: http://www.zmanda.com
18
19 =head1 NAME
20
21 Amanda::Taper::Worker
22
23 =head1 DESCRIPTION
24
25 This package is a component of the Amanda taper, and is not intended for use by
26 other scripts or applications.
27
28 This package interface between L<Amanda::Taper::Controller> and L<Amanda::Taper::Scribe>.
29
30 The worker use an L<Amanda::Taper::Scribe> object to execute the request
31 received from the L<Amanda::Taper::Controller>.
32
33 =cut
34
35 use lib '@amperldir@';
36 use strict;
37 use warnings;
38
39 package Amanda::Taper::Worker;
40
41 use POSIX qw( :errno_h );
42 use Amanda::Changer;
43 use Amanda::Config qw( :getconf config_dir_relative );
44 use Amanda::Debug qw( :logging );
45 use Amanda::Header;
46 use Amanda::Holding;
47 use Amanda::MainLoop qw( :GIOCondition );
48 use Amanda::MainLoop;
49 use Amanda::Taper::Protocol;
50 use Amanda::Taper::Scan;
51 use Amanda::Taper::Scribe qw( get_splitting_args_from_config );
52 use Amanda::Logfile qw( :logtype_t log_add make_stats );
53 use Amanda::Xfer qw( :constants );
54 use Amanda::Util qw( quote_string );
55 use Amanda::Tapelist;
56 use File::Temp;
57
58 use base qw( Amanda::Taper::Scribe::Feedback );
59
60 our $tape_num = 0;
61
62 sub new {
63     my $class           = shift;
64     my $worker_name     = shift;
65     my $controller      = shift;
66     my $write_timestamp = shift;
67
68     my $self = bless {
69         state       => "init",
70         worker_name => $worker_name,
71         controller  => $controller,
72         scribe      => undef,
73         timestamp   => $write_timestamp,
74
75         # filled in when a write starts:
76         xfer => undef,
77         xfer_source => undef,
78         xfer_dest => undef,
79         handle => undef,
80         hostname => undef,
81         diskname => undef,
82         datestamp => undef,
83         level => undef,
84         header => undef,
85         doing_port_write => undef,
86         input_errors => [],
87
88         # periodic status updates
89         timer => undef,
90         status_filename => undef,
91         status_fh => undef,
92
93         # filled in after the header is available
94         header => undef,
95
96         # filled in when a new tape is started:
97         label => undef
98     }, $class;
99
100     my $scribe = Amanda::Taper::Scribe->new(
101         taperscan => $controller->{'taperscan'},
102         feedback => $self,
103         debug => $Amanda::Config::debug_taper,
104         eject_volume => getconf($CNF_EJECT_VOLUME));
105
106     $self->{'scribe'} = $scribe;
107     $self->{'scribe'}->start(write_timestamp => $write_timestamp,
108         finished_cb => sub { $self->_scribe_started_cb(@_); });
109
110     return $self;
111 }
112
113 # called when the scribe is fully started up and ready to go
114 sub _scribe_started_cb {
115     my $self = shift;
116     my ($err) = @_;
117
118     if ($err) {
119         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPE_ERROR,
120                 worker_name  => $self->{'worker_name'},
121                 message => "$err");
122         $self->{'state'} = "error";
123
124         # log the error (note that the message is intentionally not quoted)
125         log_add($L_ERROR, "no-tape error [$err]");
126
127     } else {
128         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::TAPER_OK,
129                 worker_name => $self->{'worker_name'});
130         $self->{'state'} = "idle";
131     }
132 }
133
134
135 sub FILE_WRITE {
136     my $self = shift;
137     my ($msgtype, %params) = @_;
138     $self->_assert_in_state("idle") or return;
139
140     $self->{'doing_port_write'} = 0;
141
142     $self->setup_and_start_dump($msgtype,
143         dump_cb => sub { $self->dump_cb(@_); },
144         %params);
145 }
146
147 sub PORT_WRITE {
148     my $self = shift;
149     my ($msgtype, %params) = @_;
150
151     my $read_cb;
152
153     $self->_assert_in_state("idle") or return;
154
155     $self->{'doing_port_write'} = 1;
156
157     $self->setup_and_start_dump($msgtype,
158         dump_cb => sub { $self->dump_cb(@_); },
159         %params);
160 }
161
162 sub START_SCAN {
163     my $self = shift;
164     my ($msgtype, %params) = @_;
165
166     $self->{'scribe'}->start_scan(undef);
167 }
168
169 sub NEW_TAPE {
170     my $self = shift;
171     my ($msgtype, %params) = @_;
172
173     $self->_assert_in_state("writing") or return;
174
175     $self->{'perm_cb'}->(allow => 1);
176 }
177
178 sub NO_NEW_TAPE {
179     my $self = shift;
180     my ($msgtype, %params) = @_;
181
182     $self->_assert_in_state("writing") or return;
183
184     # log the error (note that the message is intentionally not quoted)
185     log_add($L_ERROR, "no-tape config [$params{reason}]");
186
187     $self->{'perm_cb'}->(cause => "config", message => $params{'reason'});
188 }
189
190 sub TAKE_SCRIBE_FROM {
191     my $self = shift;
192     my ($worker1, $msgtype, %params) = @_;
193
194     $self->_assert_in_state("writing") or return;
195     $worker1->_assert_in_state("idle") or return;
196
197     my $scribe = $self->{'scribe'};
198     my $scribe1 = $worker1->{'scribe'};
199     $self->{'scribe'} = $scribe1;
200     $worker1->{'scribe'} = $scribe;
201     # Change the callback to call the new scribe
202     $self->{'xfer'}->set_callback(sub {
203         my ($src, $msg, $xfer) = @_;
204         $scribe1->handle_xmsg($src, $msg, $xfer);
205
206         # if this is an error message that's not from the scribe's element, then
207         # we'll need to keep track of it ourselves
208         if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $self->{'xfer_dest'}) {
209             push @{$self->{'input_errors'}}, $msg->{'message'};
210         }
211     });
212
213     $self->{'label'} = $worker1->{'label'};
214     $self->{'perm_cb'}->(scribe => $scribe1);
215     delete $worker1->{'scribe'};
216     $worker1->{'state'} = 'error';
217     $scribe->quit(finished_cb => sub {});
218  }
219
220 sub DONE {
221     my $self = shift;
222     my ($msgtype, %params) = @_;
223
224     $self->_assert_in_state("writing") or return;
225     $self->{'dumper_status'} = "DONE";
226     $self->{'orig_kb'} = $params{'orig_kb'};
227     if (defined $self->{'result'}) {
228         $self->result_cb(undef);
229     }
230 }
231
232 sub FAILED {
233     my $self = shift;
234     my ($msgtype, %params) = @_;
235
236     $self->_assert_in_state("writing") or return;
237
238     $self->{'dumper_status'} = "FAILED";
239     if (defined $self->{'result'}) {
240         $self->result_cb(undef);
241     }
242 }
243
244 sub CLOSE_VOLUME {
245     my $self = shift;
246     my ($msgtype, %params) = @_;
247
248     $self->_assert_in_state("idle") or return;
249
250     $self->{'scribe'}->close_volume();
251 }
252
253 sub result_cb {
254     my $self = shift;
255     my %params = %{$self->{'dump_params'}};
256     my $msgtype;
257     my $logtype;
258
259     if ($params{'result'} eq 'DONE') {
260         if (!$self->{'doing_port_write'} or $self->{'dumper_status'} eq "DONE") {
261             $msgtype = Amanda::Taper::Protocol::DONE;
262             $logtype = $L_DONE;
263         } else {
264             $msgtype = Amanda::Taper::Protocol::DONE;
265             $logtype = $L_PARTIAL;
266         }
267     } elsif ($params{'result'} eq 'PARTIAL') {
268         $msgtype = Amanda::Taper::Protocol::PARTIAL;
269         $logtype = $L_PARTIAL;
270     } elsif ($params{'result'} eq 'FAILED') {
271         $msgtype = Amanda::Taper::Protocol::FAILED;
272         $logtype = $L_FAIL;
273     }
274
275     if ($self->{timer}) {
276         $self->{timer}->remove();
277         undef $self->{timer};
278         $self->{status_fh}->close();
279         undef $self->{status_fh};
280         unlink($self->{status_filename});
281         undef $self->{status_filename};
282     }
283
284     # note that we use total_duration here, which is the total time between
285     # start_dump and dump_cb, so the kps generated here is much less than the
286     # actual tape write speed.  Think of this as the *taper* speed, rather than
287     # the *tape* speed.
288     my $stats = make_stats($params{'size'}, $params{'total_duration'}, $self->{'orig_kb'});
289
290     # consider this a config-derived failure only if there were no errors
291     my $failure_from = (@{$params{'device_errors'}})?  'error' : 'config';
292
293     my @all_messages = (@{$params{'device_errors'}}, @{$self->{'input_errors'}});
294     push @all_messages, $params{'config_denial_message'} if $params{'config_denial_message'};
295     my $msg = quote_string(join("; ", @all_messages));
296
297     # write a DONE/PARTIAL/FAIL log line
298     if ($logtype == $L_FAIL) {
299         log_add($L_FAIL, sprintf("%s %s %s %s %s %s",
300             quote_string($self->{'hostname'}.""), # " is required for SWIG..
301             quote_string($self->{'diskname'}.""),
302             $self->{'datestamp'},
303             $self->{'level'},
304             $failure_from,
305             $msg));
306     } else {
307         log_add($logtype, sprintf("%s %s %s %s %s %s%s",
308             quote_string($self->{'hostname'}.""), # " is required for SWIG..
309             quote_string($self->{'diskname'}.""),
310             $self->{'datestamp'},
311             $params{'nparts'},
312             $self->{'level'},
313             $stats,
314             ($logtype == $L_PARTIAL and @all_messages)? " $msg" : ""));
315     }
316
317     # and send a message back to the driver
318     my %msg_params = (
319         handle => $self->{'handle'},
320     );
321
322     # reflect errors in our own elements in INPUT-ERROR or INPUT-GOOD
323     if (@{$self->{'input_errors'}}) {
324         $msg_params{'input'} = 'INPUT-ERROR';
325         $msg_params{'inputerr'} = join("; ", @{$self->{'input_errors'}});
326     } else {
327         $msg_params{'input'} = 'INPUT-GOOD';
328         $msg_params{'inputerr'} = '';
329     }
330
331     # and errors from the scribe in TAPE-ERROR or TAPE-GOOD
332     if (@{$params{'device_errors'}}) {
333         $msg_params{'taper'} = 'TAPE-ERROR';
334         $msg_params{'tapererr'} = join("; ", @{$params{'device_errors'}});
335     } elsif ($params{'config_denial_message'}) {
336         $msg_params{'taper'} = 'TAPE-ERROR';
337         $msg_params{'tapererr'} = $params{'config_denial_message'};
338     } else {
339         $msg_params{'taper'} = 'TAPE-GOOD';
340         $msg_params{'tapererr'} = '';
341     }
342
343     if ($msgtype ne Amanda::Taper::Protocol::FAILED) {
344         $msg_params{'stats'} = $stats;
345     }
346
347     # reset things to 'idle' before sending the message
348     $self->{'xfer'} = undef;
349     $self->{'xfer_source'} = undef;
350     $self->{'xfer_dest'} = undef;
351     $self->{'handle'} = undef;
352     $self->{'header'} = undef;
353     $self->{'hostname'} = undef;
354     $self->{'diskname'} = undef;
355     $self->{'datestamp'} = undef;
356     $self->{'level'} = undef;
357     $self->{'header'} = undef;
358     $self->{'state'} = 'idle';
359     delete $self->{'result'};
360     delete $self->{'dumper_status'};
361     delete $self->{'dump_params'};
362
363     $self->{'controller'}->{'proto'}->send($msgtype, %msg_params);
364 }
365
366
367 ##
368 # Scribe feedback
369
370 sub request_volume_permission {
371     my $self = shift;
372     my %params = @_;
373
374     $self->{'perm_cb'} = $params{'perm_cb'};
375     # and send the request to the driver
376     $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::REQUEST_NEW_TAPE,
377         handle => $self->{'handle'});
378 }
379
380 sub scribe_notif_new_tape {
381     my $self = shift;
382     my %params = @_;
383
384     # TODO: if $params{error} is set, report it back to the driver
385     # (this will be a change to the protocol)
386     log_add($L_INFO, "$params{'error'}") if defined $params{'error'};
387
388     if ($params{'volume_label'}) {
389         $self->{'label'} = $params{'volume_label'};
390
391         # add to the trace log
392         log_add($L_START, sprintf("datestamp %s label %s tape %s",
393                 $self->{'timestamp'},
394                 quote_string($self->{'label'}),
395                 ++$tape_num));
396
397         # and the amdump log
398         print STDERR "taper: wrote label '$self->{label}'\n";
399
400         # and inform the driver
401         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NEW_TAPE,
402             handle => $self->{'handle'},
403             label => $params{'volume_label'});
404     } else {
405         $self->{'label'} = undef;
406
407         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::NO_NEW_TAPE,
408             handle => $self->{'handle'});
409     }
410 }
411
412 sub scribe_notif_part_done {
413     my $self = shift;
414     my %params = @_;
415
416     $self->_assert_in_state("writing") or return;
417
418     my $stats = make_stats($params{'size'}, $params{'duration'}, $self->{'orig_kb'});
419
420     # log the part, using PART or PARTPARTIAL
421     my $logbase = sprintf("%s %s %s %s %s %s/%s %s %s",
422         quote_string($self->{'label'}),
423         $params{'fileno'},
424         quote_string($self->{'header'}->{'name'}.""), # " is required for SWIG..
425         quote_string($self->{'header'}->{'disk'}.""),
426         $self->{'datestamp'},
427         $params{'partnum'}, -1, # totalparts is always -1
428         $self->{'level'},
429         $stats);
430     if ($params{'successful'}) {
431         log_add($L_PART, $logbase);
432     } else {
433         log_add($L_PARTPARTIAL, "$logbase \"No space left on device\"");
434     }
435
436     # only send a PARTDONE if it was successful
437     if ($params{'successful'}) {
438         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PARTDONE,
439             handle => $self->{'handle'},
440             label => $self->{'label'},
441             fileno => $params{'fileno'},
442             stats => $stats,
443             kb => $params{'size'} / 1024);
444     }
445 }
446
447 sub scribe_notif_log_info {
448     my $self = shift;
449     my %params = @_;
450
451     debug("$params{'message'}");
452     log_add($L_INFO, "$params{'message'}");
453 }
454
455 ##
456 # Utilities
457
458 sub _assert_in_state {
459     my $self = shift;
460     my ($state) = @_;
461     if ($self->{'state'} eq $state) {
462         return 1;
463     } else {
464         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::BAD_COMMAND,
465             message => "command not appropriate in state '$self->{state}' : '$state'");
466         return 0;
467     }
468 }
469
470 sub create_status_file {
471     my $self = shift;
472
473     # create temporary file
474     ($self->{status_fh}, $self->{status_filename}) =
475         File::Temp::tempfile("taper_status_file_XXXXXX",
476                                 DIR => $Amanda::Paths::AMANDA_TMPDIR,
477                                 UNLINK => 1);
478
479     # tell amstatus about it by writing it to the dump log
480     my $qdisk = Amanda::Util::quote_string($self->{'diskname'});
481     my $qhost = Amanda::Util::quote_string($self->{'hostname'});
482     print STDERR "taper: status file $qhost $qdisk:" .
483                     "$self->{status_filename}\n";
484     print {$self->{status_fh}} "0";
485
486     # create timer callback, firing every 5s (=5000msec)
487     $self->{timer} = Amanda::MainLoop::timeout_source(5000);
488     $self->{timer}->set_callback(sub {
489         my $size = $self->{scribe}->get_bytes_written();
490         seek $self->{status_fh}, 0, 0;
491         print {$self->{status_fh}} $size;
492         $self->{status_fh}->flush();
493     });
494 }
495
496 sub send_port_and_get_header {
497     my $self = shift;
498     my ($finished_cb) = @_;
499
500     my $header_xfer;
501     my ($xsrc, $xdst);
502     my $errmsg;
503
504     my $steps = define_steps
505         cb_ref => \$finished_cb;
506
507     step send_port => sub {
508         # get the ip:port pairs for the data connection from the data xfer source,
509         # which should be an Amanda::Xfer::Source::DirectTCPListen
510         my $data_addrs = $self->{'xfer_source'}->get_addrs();
511         $data_addrs = join ";", map { $_->[0] . ':' . $_->[1] } @$data_addrs;
512
513         # and set up an xfer for the header, too, using DirectTCP as an easy
514         # way to implement a listen/accept/read process.  Note that this does
515         # not enforce a maximum size, so this portion of Amanda at least can
516         # handle any size header
517         ($xsrc, $xdst) = (
518             Amanda::Xfer::Source::DirectTCPListen->new(),
519             Amanda::Xfer::Dest::Buffer->new(0));
520         $header_xfer = Amanda::Xfer->new([$xsrc, $xdst]);
521         $header_xfer->start($steps->{'header_xfer_xmsg_cb'});
522
523         my $header_addrs = $xsrc->get_addrs();
524         my $header_port = $header_addrs->[0][1];
525
526         # and tell the driver which ports we're listening on
527         $self->{'controller'}->{'proto'}->send(Amanda::Taper::Protocol::PORT,
528             worker_name => $self->{'worker_name'},
529             handle => $self->{'handle'},
530             port => $header_port,
531             ipports => $data_addrs);
532     };
533
534     step header_xfer_xmsg_cb => sub {
535         my ($src, $xmsg, $xfer) = @_;
536         if ($xmsg->{'type'} == $XMSG_INFO) {
537             info($xmsg->{'message'});
538         } elsif ($xmsg->{'type'} == $XMSG_ERROR) {
539             $errmsg = $xmsg->{'message'};
540         } elsif ($xmsg->{'type'} == $XMSG_DONE) {
541             if ($errmsg) {
542                 $finished_cb->($errmsg);
543             } else {
544                 $steps->{'got_header'}->();
545             }
546         }
547     };
548
549     step got_header => sub {
550         my $hdr_buf = $xdst->get();
551
552         # close stuff up
553         $header_xfer = $xsrc = $xdst = undef;
554
555         if (!defined $hdr_buf) {
556             return $finished_cb->("Got empty header");
557         }
558
559         # parse the header, finally!
560         $self->{'header'} = Amanda::Header->from_string($hdr_buf);
561
562         $finished_cb->(undef);
563     };
564 }
565
566 # do the work of starting a new xfer; this contains the code common to
567 # msg_PORT_WRITE and msg_FILE_WRITE.
568 sub setup_and_start_dump {
569     my $self = shift;
570     my ($msgtype, %params) = @_;
571     my %get_xfer_dest_args;
572
573     # setting up the dump is a bit complex, due to the requirements of
574     # a directtcp port_write.  This function:
575     # 1. creates and starts a transfer (make_xfer)
576     # 2. gets the header
577     # 3. calls the scribe's start_dump method with the new header
578
579     my $steps = define_steps
580         cb_ref => \$params{'dump_cb'};
581
582     step setup => sub {
583         $self->{'handle'} = $params{'handle'};
584         $self->{'hostname'} = $params{'hostname'};
585         $self->{'diskname'} = $params{'diskname'};
586         $self->{'datestamp'} = $params{'datestamp'};
587         $self->{'level'} = $params{'level'};
588         $self->{'header'} = undef; # no header yet
589         $self->{'orig_kb'} = $params{'orig_kb'};
590         $self->{'input_errors'} = [];
591
592         if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE &&
593             (my $err = $self->{'scribe'}->check_data_path($params{'data_path'}))) {
594             return $params{'dump_cb'}->(
595                 result => "FAILED",
596                 device_errors => [ ['error', "$err"] ],
597                 size => 0,
598                 duration => 0.0,
599                 total_duration => 0);
600         }
601         $steps->{'process_args'}->();
602     };
603
604     step process_args => sub {
605         # extract the splitting-related parameters, stripping out empty strings
606         my %splitting_args = map {
607             ($params{$_} ne '')? ($_, $params{$_}) : ()
608         } qw(
609             dle_tape_splitsize dle_split_diskbuffer dle_fallback_splitsize dle_allow_split
610             part_size part_cache_type part_cache_dir part_cache_max_size
611         );
612
613         # convert numeric values to BigInts
614         for (qw(dle_tape_splitsize dle_fallback_splitsize part_size part_cache_max_size)) {
615             $splitting_args{$_} = Math::BigInt->new($splitting_args{$_})
616                 if (exists $splitting_args{$_});
617         }
618
619         my $device = $self->{'scribe'}->get_device();
620         if (!defined $device) {
621             die "no device is available to create an xfer_dest";
622         }
623         $splitting_args{'leom_supported'} = $device->property_get("leom");
624         # and convert those to get_xfer_dest args
625         %get_xfer_dest_args = get_splitting_args_from_config(
626                 %splitting_args);
627         $get_xfer_dest_args{'max_memory'} = getconf($CNF_DEVICE_OUTPUT_BUFFER_SIZE);
628         if (!getconf_seen($CNF_DEVICE_OUTPUT_BUFFER_SIZE)) {
629             my $block_size4 = $device->block_size * 4;
630             if ($block_size4 > $get_xfer_dest_args{'max_memory'}) {
631                 $get_xfer_dest_args{'max_memory'} = $block_size4;
632             }
633         }
634         $device = undef;
635         $get_xfer_dest_args{'can_cache_inform'} = ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE and $get_xfer_dest_args{'allow_split'});
636
637         # if we're unable to fulfill the user's splitting needs, we can still give
638         # the dump a shot - but we'll warn them about the problem
639         if ($get_xfer_dest_args{'warning'}) {
640             log_add($L_WARNING, sprintf("%s:%s: %s",
641                     $params{'hostname'}, $params{'diskname'},
642                     $get_xfer_dest_args{'warning'}));
643             delete $get_xfer_dest_args{'warning'};
644         }
645
646         $steps->{'make_xfer'}->();
647     };
648
649     step make_xfer => sub {
650         $self->_assert_in_state("idle") or return;
651         $self->{'state'} = 'making_xfer';
652
653         $self->{'xfer_dest'} = $self->{'scribe'}->get_xfer_dest(%get_xfer_dest_args);
654
655         my $xfer_source;
656         if ($msgtype eq Amanda::Taper::Protocol::PORT_WRITE) {
657             $xfer_source = Amanda::Xfer::Source::DirectTCPListen->new();
658         } else {
659             $xfer_source = Amanda::Xfer::Source::Holding->new($params{'filename'});
660         }
661         $self->{'xfer_source'} = $xfer_source;
662
663         $self->{'xfer'} = Amanda::Xfer->new([$xfer_source, $self->{'xfer_dest'}]);
664         $self->{'xfer'}->start(sub {
665             my ($src, $msg, $xfer) = @_;
666             $self->{'scribe'}->handle_xmsg($src, $msg, $xfer);
667
668             # if this is an error message that's not from the scribe's element, then
669             # we'll need to keep track of it ourselves
670             if ($msg->{'type'} == $XMSG_ERROR and $msg->{'elt'} != $self->{'xfer_dest'}) {
671                 push @{$self->{'input_errors'}}, $msg->{'message'};
672             }
673         });
674
675         # we've started the xfer now, but the destination won't actually write
676         # any data until we call start_dump.  And we'll need a header for that.
677
678         $steps->{'get_header'}->();
679     };
680
681     step get_header => sub {
682         $self->_assert_in_state("making_xfer") or return;
683         $self->{'state'} = 'getting_header';
684
685         if ($msgtype eq Amanda::Taper::Protocol::FILE_WRITE) {
686             # getting the header is easy for FILE-WRITE..
687             my $hdr = $self->{'header'} = Amanda::Holding::get_header($params{'filename'});
688
689             # stip out header fields we don't need
690             $hdr->{'cont_filename'} = '';
691
692             if (!defined $hdr || $hdr->{'type'} != $Amanda::Header::F_DUMPFILE) {
693                 die("Could not read header from '$params{filename}'");
694             }
695             $steps->{'start_dump'}->(undef);
696         } else {
697             # ..but quite a bit harder for PORT-WRITE; this method will send the
698             # proper PORT command, then read the header from the dumper and parse
699             # it, placing the result in $self->{'header'}
700             $self->send_port_and_get_header($steps->{'start_dump'});
701         }
702     };
703
704     step start_dump => sub {
705         my ($err) = @_;
706
707         $self->_assert_in_state("getting_header") or return;
708         $self->{'state'} = 'writing';
709
710         # if $err is set, cancel the dump, treating it as a input error
711         if ($err) {
712             push @{$self->{'input_errors'}}, $err;
713             return $self->{'scribe'}->cancel_dump(
714                 xfer => $self->{'xfer'},
715                 dump_cb => $params{'dump_cb'});
716         }
717
718         # sanity check the header..
719         my $hdr = $self->{'header'};
720         if ($hdr->{'dumplevel'} != $params{'level'}
721             or $hdr->{'name'} ne $params{'hostname'}
722             or $hdr->{'disk'} ne $params{'diskname'}
723             or $hdr->{'datestamp'} ne $params{'datestamp'}) {
724             die("Header of dumpfile does not match command from driver");
725         }
726
727         # start producing status
728         $self->create_status_file();
729
730         # and fix it up before writing it
731         $hdr->{'totalparts'} = -1;
732         $hdr->{'type'} = $Amanda::Header::F_SPLIT_DUMPFILE;
733
734         $self->{'scribe'}->start_dump(
735             xfer => $self->{'xfer'},
736             dump_header => $hdr,
737             dump_cb => $params{'dump_cb'});
738     };
739 }
740
741 sub dump_cb {
742     my $self = shift;
743     my %params = @_;
744
745     $self->{'dump_params'} = \%params;
746     $self->{'result'} = $params{'result'};
747
748     # if we need to the dumper status (to differentiate a dropped network
749     # connection from a normal EOF) and have not done so yet, then send a
750     # DUMPER_STATUS message and re-call this method (dump_cb) with the result.
751     if ($params{'result'} eq "DONE"
752             and $self->{'doing_port_write'}
753             and !exists $self->{'dumper_status'}) {
754         my $controller = $self->{'controller'};
755         my $proto = $controller->{'proto'};
756         my $handle = $self->{'handle'};
757         $proto->send(Amanda::Taper::Protocol::DUMPER_STATUS,
758                 handle => "$handle");
759     } else {
760         $self->result_cb();
761     }
762 }
763
764 1;